Unscented Kalman filter implementation

from numpy.random import randn
import pyproj
import matplotlib.pyplot as plt
import numpy as np
import cv2
import os
import math
import xml.etree.ElementTree as ET
import pandas as pd
from filterpy.kalman import MerweScaledSigmaPoints
from filterpy.kalman import UnscentedKalmanFilter as UKF
from filterpy.common import Q_discrete_white_noise
from Functions import fx, hx

# Define the coordinate reference systems
src_crs = "EPSG:4326"  # WGS84
dst_crs = "EPSG:32632"  # UTM Zone 32N

# Create the coordinate transformation object
transformer = pyproj.Transformer.from_crs(src_crs, dst_crs, always_xy=True)

# Directories
filename = "Desktop/LABSF/Data/dataset.csv"
directory = "RGB/2022-07-15_08-58-03/"
radar_folder = "Desktop/LABSF/Data/Radar/XBR_imgs/"
xml_file = "Desktop/LABSF/annotations.xml"
image_directory = "Desktop/LABSF/Data/RGB/5min/"

# Timestamps interval
timestamp_1 = 1657876048.9269060
timestamp_2 = 1657876321.1003320

# Read the CSV file into a DataFrame
df = pd.read_csv(filename)

# Extract the directory column
image_column = df["rgb2"]
radar_column = df["image_data"]
xbr_max_column = df["xbr_max_range"]
latitude_column = df["latitude"]
longitude_column = df["longitude"]
heading_column = df["heading"]

# Create a list to store the filtered data
filtered_data = []

# Iterate over each directory path
for image, radar, xbr_max, lat, lon, heading in zip(image_column,radar_column,xbr_max_column,latitude_column,longitude_column,heading_column):
    # Extract the filename from the image directory path
    image_filename = os.path.basename(image)
    timestamp = float(os.path.splitext(image_filename)[0])

    # Check if the timestamp falls within the specified range
    if timestamp_1 <= timestamp <= timestamp_2:
        # Extract the filename from the radar directory path
        radar_filename = os.path.basename(radar)

        # Create the full paths for the image and radar files
        image_path = os.path.join(image_directory, image_filename)
        radar_path = os.path.join(radar_folder, radar_filename)

        # Check if the image and radar files exist
        if os.path.isfile(image_path) and os.path.isfile(radar_path):
            # Create dictionaries to store the data
            data = {
                "Image_path": image_path,
                "Radar_path": radar_path,
                "xbr_max_range": xbr_max,
                "Latitude": lat,
                "Longitude": lon,
                "Heading": heading,
            }

            # Append the data to the filtered_data list
            filtered_data.append(data)

# Display the filtered data
for data in filtered_data:
    print("Image:", data["Image_path"])
    print("Radar:", data["Radar_path"])
    print("xbr_max_range:", data["xbr_max_range"])
    print("Latitude:", data["Latitude"])
    print("Longitude:", data["Longitude"])
    print("Heading:", data["Heading"])

# Create a list to store the boat positions in the NE frame
boat_positions = []

# Iterate over each data point
for data in filtered_data:
    # Extract the latitude, longitude, and heading from the data
    lat = data["Latitude"]
    lon = data["Longitude"]
    heading = data["Heading"]

    # Convert latitude and longitude to UTM coordinates
    utm_coords = transformer.transform(lon, lat)

    # Calculate the position in the NE frame
    # Assuming altitude is 0
    x = 0.0
    y = 0.0
    angle = math.radians(
        90 - heading
    )  # Convert heading to radians and convert to NE frame

    # Calculate the position in the NE frame
    x_ne = x + math.cos(angle) * utm_coords[0] - math.sin(angle) * utm_coords[1]
    y_ne = y + math.sin(angle) * utm_coords[0] + math.cos(angle) * utm_coords[1]

    # Create a dictionary to store the boat position
    boat_position = {
        "Latitude": lat,
        "Longitude": lon,
        "Heading": heading,
        "NE_X": x_ne,
        "NE_Y": y_ne,
    }

    # Append the boat position to the list
    boat_positions.append(boat_position)

# Display the boat positions in the NE frame
for position in boat_positions:
    print("Latitude:", position["Latitude"])
    print("Longitude:", position["Longitude"])
    print("Heading:", position["Heading"])
    print("NE_X:", position["NE_X"])
    print("NE_Y:", position["NE_Y"])
    print()

# Create lists to store the NE coordinates
ne_x_coords = []
ne_y_coords = []

# Iterate over each data point
for position in boat_positions:
    ne_x_coords.append(position["NE_X"])
    ne_y_coords.append(position["NE_Y"])

# Plot the boat's movement in the NE frame
plt.plot(ne_x_coords, ne_y_coords, marker='o')
plt.xlabel('NE_X')
plt.ylabel('NE_Y')
plt.title('Boat Movement in NE Frame')
plt.grid(True)
plt.show()

# Read the XML file
tree = ET.parse(xml_file)
root = tree.getroot()

# Camera calibration
camera_yaw = 80
camera_yaw_rad = math.radians(camera_yaw)
camera_fov_w = 1.802212
camera_fov_h = 1.5708
image_width = 1920
image_height = 1080

# Calculate the horizontal and vertical center of the image
center_x = image_width / 2
center_y = image_height / 2

# Create empty dictionary to store image data
image_data = {}

### ITERATE OVER THE IMAGE ELEMENTS IN THE XML FILE ###
for data in filtered_data:
    # Extract the image filename from the Image_path
    _, image_path_filename = os.path.split(data["Image_path"])
    
    # Find the corresponding image element in the XML file
    image_element = root.find(f".//image[@name='{image_path_filename}']")
    
    # Check if the image element exists
    if image_element is None:
        print("Image element not found for:", image_path_filename)
        continue
    
    # Get the filename of the image from the image element
    name = image_element.get("name")
    
    # Check if the image file exists
    if not os.path.isfile(data["Image_path"]):
        print("Image file does not exist:", data["Image_path"])

    else:
        # Load the image
        image = cv2.imread(data["Image_path"])
        
        # Check if the image was loaded successfully
        if image is None:
            print("Failed to load the image:", data["Image_path"])

        else:
            # Create empty list to store box data for the image
            box_data_image = []
            
            # Iterate over the box elements in the image element
            for box_element in image_element.iter("box"):
                # Get the label of the box
                box_label = box_element.get("label")

                # Extract the bounding box coordinates
                xmin = int(float(box_element.get("xtl")))
                ymin = int(float(box_element.get("ytl")))
                xmax = int(float(box_element.get("xbr")))
                ymax = int(float(box_element.get("ybr")))
                
                # Draw the bounding box on the image
                cv2.rectangle(image, (xmin, ymin), (xmax, ymax), (0, 255, 0), 2)
                
                # Display the label text near the bounding box
                label_text = f"{box_label}"
                label_text = f"{box_label.replace('Bouy', 'Buoy')}"
                cv2.putText(
                    image,
                    label_text,
                    (xmin, ymin - 10),
                    cv2.FONT_HERSHEY_SIMPLEX,
                    0.9,
                    (0, 255, 0),
                    2,
                )
                
                # Calculate the center of the box
                box_center_x = (xmin + xmax) / 2

                # Calculate the normalized coordinates (-1 to 1) based on the image size and center
                normalized_x = (box_center_x - center_x) / image_width * camera_fov_w
                
                # Transform the angle into the radar frame summing the yaw angle of rotation between the camera and the radar frame
                box_bearing = normalized_x - camera_yaw_rad

                # Store the box data in the box_data list
                # Add box data to the box_data_image list
                box_data_image.append(
                    {
                        "bearing": box_bearing,
                        "label": box_label,
                    }
                )
            # Add the box_data_image to the image_data dictionary with image name as key
            image_data[name] = box_data_image
        
        # Display the image with the bounding boxes and annotations
        cv2.imshow('Image with Annotations', image)
        cv2.waitKey(delay=1)

cv2.destroyAllWindows()

### ITERATE OVER THE RADAR ELEMENTS IN THE XML FILE ###
for data in filtered_data:
    # Load the radar image
    radar_image_path = data["Radar_path"]
    radar_image = cv2.imread(radar_image_path)

    # Get the image name of the corresponding camera image
    image_name = os.path.basename(data["Image_path"])

    # Check if the image name exists in the image_data dictionary
    if image_name in image_data:
        # Get the box data associated with the image
        box_data = image_data[image_name]

        # Draw bearing lines and highlight points on the same side of the lines
        for box in box_data:
            bearing = box["bearing"]
            # Calculate the center point of the radar image
            center_x = radar_image.shape[1] // 2
            center_y = radar_image.shape[0] // 2

            # Calculate the line length based on the width of the radar image
            line_length = int(0.4 * radar_image.shape[1])

            # Calculate the endpoint coordinates of the line
            end_x = center_x + int(line_length * math.cos(bearing))
            end_y = center_y + int(line_length * math.sin(bearing))
            # Draw a line from the center to the endpoint
            cv2.line(radar_image, (center_x, center_y), (end_x, end_y), (0, 255, 0), 2)

            # Find the white spots in the radar image
            white_spots = np.where(radar_image[:, :, 2] > 200)

            # Calculate the distances between the white spots and the bearing line
            distances = np.abs(
                (end_y - center_y) * white_spots[1]
                - (end_x - center_x) * white_spots[0]
                + end_x * center_y
                - end_y * center_x
            ) / np.sqrt((end_y - center_y) ** 2 + (end_x - center_x) ** 2)

            # Threshold for considering a point on the bearing line
            threshold = 5  # Adjust as needed

            # Initialize variables to keep track of the last highlighted point
            last_highlighted_point = None
            highlighted_points = []
            
            # Highlight the points on the same side of the line
            for spot_y, spot_x, distance in zip(white_spots[0], white_spots[1], distances):
                dot_product = (spot_x - center_x) * (end_x - center_x) + (spot_y - center_y) * (end_y - center_y)
                if distance <= threshold and dot_product >= 0:
                    # Check if the point is different from the last highlighted point
                    if (spot_x, spot_y) != last_highlighted_point:
                        cv2.circle(radar_image, (spot_x, spot_y), 1, (0, 0, 255), -1)
                        highlighted_points.append((spot_x, spot_y))
                        last_highlighted_point = (spot_x, spot_y)

            # Extract the second highlighted point or the point following the last bearing
            if len(highlighted_points) >= 2:
                second_point = highlighted_points[1]
            elif len(highlighted_points) == 1:
                second_point = highlighted_points[0]
            else:
                # No highlighted points, set a default value
                second_point = None

            if second_point:
                # Extract max range for the radar image
                max_range = data["xbr_max_range"]

                # Calculate the distance of the second highlighted point
                x, y = second_point
                distance = (
                    max_range
                    * math.sqrt((x - center_x) ** 2 + (y - center_y) ** 2)
                    / line_length
                )

                # Store the distance in the box data
                box.setdefault("distance", []).append(distance)

        # Update the box data in the image_data dictionary
        image_data[image_name] = box_data

        # Display the radar image
        cv2.imshow('Radar Image with Points on Bearing Line', radar_image)
        cv2.waitKey(0)
    else:
        print("Image name not found in image_data:", image_name)

cv2.destroyAllWindows()

# Get the first image name from the image_data dictionary
first_image_name = next(iter(image_data.keys()))

# Get the box data for the first image
first_image_box_data = image_data[first_image_name]

# Initialize empty lists for bearings, labels, and distances
bearings = []
labels = []
distances = []

# Iterate over each box in the first image's box data
for box in first_image_box_data:
    # Check if the box has a 'bearing' and 'label' key
    if "bearing" in box and "label" in box:
        # Append the bearing and label to the respective lists
        bearings.append(box["bearing"])
        labels.append(box["label"])

        # Check if the box has a 'distance' key
        if "distance" in box:
            # Append the distance to the distances list
            distances.append(
                box["distance"][0]
            )  # Assuming there's only one distance value per box

# Create a dictionary with the extracted data for the first image
initial = {"bearing": bearings[0], "label": labels[0], "distance": distances[0]}

# Print the result dictionary
print(initial)

### UKF ###
# parameters
initial_bearing = bearings[0]  # First bearing
initial_distance = distances[0]  # First distance

# Convert bearing and distance to x and y coordinates
initial_x = initial_distance * np.cos(initial_bearing)
initial_y = initial_distance * np.sin(initial_bearing)


x = np.array([initial["bearing"], initial["distance"], 0, 0])  # initial state
P = np.diag([1, 1])  # initial uncertainty

points = MerweScaledSigmaPoints(n=2, alpha=1e-3, beta=2, kappa=0)
ukf = UKF(dim_x=2, dim_z=2, dt=1.5, fx=fx, hx=hx, points=points)

ukf.x = x  # initial state
ukf.P = P  # initial uncertainty
z_std1 = np.radians(5)  # degrees
z_std2 = 20  # meters
ukf.R = np.diag([z_std1**2, z_std2**2])  # measurement noise covariance matrix
ukf.Q = Q_discrete_white_noise(dim=2, var=0.01**2, dt=1.5, block_size=2)  # process noise covariance matrix

zs = [[data[1], data[3]] for data in image_data]
for i in range(1, len(boat_position)):
    z = zs[i]
    ukf.predict()
    ukf.update(z, boat_position[i])  # Pass the boat's position for the i-th measurement
    print(ukf.x, 'log-likelihood', ukf.log_likelihood)```

I have some issue implementing the UKF, some ideas?

Please Tell us what’s happening in your own words.

Learning to describe problems is hard, but it is an important part of learning how to code.

Also, the more you say, the more we can help!

Sure, What I did is to take the information from the camera sensor and identify with bounding boxes the objects: Buoys or Boats. I calculated than their bearings referred to the camera. Furthermore I used the bearing to identify the objects into the radar images. However the distance obtained could not be accurate, as a matter of fact the radar doesn’t tell me what object I am looking at. In conclusion with all the measurements of bearings and distance I wanted to implement the UKF in my code, in order to estimate the objects and than compare the resoults with the measurements. The algorithm is not on-line, but based on a preorder set of frames.

Are you looking for some sort of help with this code?

1 Like

Yes, I am a bit lost in the end of it. I have no idea if my implementation is correct, and if there are other things that i need to add

are you able to help me? I have no rush, but if you need more info lmk

I have this code:

def fx(x, dt):
    # state transition functioin - predict next state based
    # on constant velocity model x = vt + x_0
    F = np.array([[1, 0, dt, 0],
                  [0, 1, 0, dt]])
    return np.dot(F,x)

def hx(x):
    # Extract the relative NE_X and NE_Y coordinates from the state vector x
    x, y = x[0], x[1]
    
    # Calculate distance
    distance = math.sqrt((x)**2 + (y)**2)

    # Calculate bearing in radians
    bearing = math.atan2(y, x)

    return np.array([[bearing], [distance]])

### UKF ###
### Preparation of the data ###
first_bearing = bearings[0]
first_distance = distances[0]

# Calculate the position of the object related to the boat frame
relative_ne_x = first_distance * math.cos(first_bearing)
relative_ne_y = first_distance * math.sin(first_bearing)

### Define the initial values and the functions ###

# Store the values in variable x if needed
x = np.array([relative_ne_x, relative_ne_y]) # Initial state
P = np.diag([1, 1])  # initial uncertainty

However, I get this error, even if everything seems correct:

Traceback (most recent call last):
  File "Desktop/LABSF/Main.py", line 354, in <module>
    ukf.predict()
  File "Desktop/LABSF/env/lib/python3.9/site-packages/filterpy/kalman/UKF.py", line 388, in predict
    self.compute_process_sigmas(dt, fx, **fx_args)
  File "Desktop/LABSF/env/lib/python3.9/site-packages/filterpy/kalman/UKF.py", line 503, in compute_process_sigmas
    self.sigmas_f[i] = fx(s, dt, **fx_args)
TypeError: fx() takes 1 positional argument but 2 were given

I tried to modify all the code increasing the number of initial states with the x and y velocities, but the problem remains, instead of having 1 and passing 2 I will have 2 and passing 4.

This topic was automatically closed 182 days after the last reply. New replies are no longer allowed.