from numpy.random import randn
import pyproj
import matplotlib.pyplot as plt
import numpy as np
import cv2
import os
import math
import xml.etree.ElementTree as ET
import pandas as pd
from filterpy.kalman import MerweScaledSigmaPoints
from filterpy.kalman import UnscentedKalmanFilter as UKF
from filterpy.common import Q_discrete_white_noise
from Functions import fx, hx
# Define the coordinate reference systems
src_crs = "EPSG:4326" # WGS84
dst_crs = "EPSG:32632" # UTM Zone 32N
# Create the coordinate transformation object
transformer = pyproj.Transformer.from_crs(src_crs, dst_crs, always_xy=True)
# Directories
filename = "Desktop/LABSF/Data/dataset.csv"
directory = "RGB/2022-07-15_08-58-03/"
radar_folder = "Desktop/LABSF/Data/Radar/XBR_imgs/"
xml_file = "Desktop/LABSF/annotations.xml"
image_directory = "Desktop/LABSF/Data/RGB/5min/"
# Timestamps interval
timestamp_1 = 1657876048.9269060
timestamp_2 = 1657876321.1003320
# Read the CSV file into a DataFrame
df = pd.read_csv(filename)
# Extract the directory column
image_column = df["rgb2"]
radar_column = df["image_data"]
xbr_max_column = df["xbr_max_range"]
latitude_column = df["latitude"]
longitude_column = df["longitude"]
heading_column = df["heading"]
# Create a list to store the filtered data
filtered_data = []
# Iterate over each directory path
for image, radar, xbr_max, lat, lon, heading in zip(image_column,radar_column,xbr_max_column,latitude_column,longitude_column,heading_column):
# Extract the filename from the image directory path
image_filename = os.path.basename(image)
timestamp = float(os.path.splitext(image_filename)[0])
# Check if the timestamp falls within the specified range
if timestamp_1 <= timestamp <= timestamp_2:
# Extract the filename from the radar directory path
radar_filename = os.path.basename(radar)
# Create the full paths for the image and radar files
image_path = os.path.join(image_directory, image_filename)
radar_path = os.path.join(radar_folder, radar_filename)
# Check if the image and radar files exist
if os.path.isfile(image_path) and os.path.isfile(radar_path):
# Create dictionaries to store the data
data = {
"Image_path": image_path,
"Radar_path": radar_path,
"xbr_max_range": xbr_max,
"Latitude": lat,
"Longitude": lon,
"Heading": heading,
}
# Append the data to the filtered_data list
filtered_data.append(data)
# Display the filtered data
for data in filtered_data:
print("Image:", data["Image_path"])
print("Radar:", data["Radar_path"])
print("xbr_max_range:", data["xbr_max_range"])
print("Latitude:", data["Latitude"])
print("Longitude:", data["Longitude"])
print("Heading:", data["Heading"])
# Create a list to store the boat positions in the NE frame
boat_positions = []
# Iterate over each data point
for data in filtered_data:
# Extract the latitude, longitude, and heading from the data
lat = data["Latitude"]
lon = data["Longitude"]
heading = data["Heading"]
# Convert latitude and longitude to UTM coordinates
utm_coords = transformer.transform(lon, lat)
# Calculate the position in the NE frame
# Assuming altitude is 0
x = 0.0
y = 0.0
angle = math.radians(
90 - heading
) # Convert heading to radians and convert to NE frame
# Calculate the position in the NE frame
x_ne = x + math.cos(angle) * utm_coords[0] - math.sin(angle) * utm_coords[1]
y_ne = y + math.sin(angle) * utm_coords[0] + math.cos(angle) * utm_coords[1]
# Create a dictionary to store the boat position
boat_position = {
"Latitude": lat,
"Longitude": lon,
"Heading": heading,
"NE_X": x_ne,
"NE_Y": y_ne,
}
# Append the boat position to the list
boat_positions.append(boat_position)
# Display the boat positions in the NE frame
for position in boat_positions:
print("Latitude:", position["Latitude"])
print("Longitude:", position["Longitude"])
print("Heading:", position["Heading"])
print("NE_X:", position["NE_X"])
print("NE_Y:", position["NE_Y"])
print()
# Create lists to store the NE coordinates
ne_x_coords = []
ne_y_coords = []
# Iterate over each data point
for position in boat_positions:
ne_x_coords.append(position["NE_X"])
ne_y_coords.append(position["NE_Y"])
# Plot the boat's movement in the NE frame
plt.plot(ne_x_coords, ne_y_coords, marker='o')
plt.xlabel('NE_X')
plt.ylabel('NE_Y')
plt.title('Boat Movement in NE Frame')
plt.grid(True)
plt.show()
# Read the XML file
tree = ET.parse(xml_file)
root = tree.getroot()
# Camera calibration
camera_yaw = 80
camera_yaw_rad = math.radians(camera_yaw)
camera_fov_w = 1.802212
camera_fov_h = 1.5708
image_width = 1920
image_height = 1080
# Calculate the horizontal and vertical center of the image
center_x = image_width / 2
center_y = image_height / 2
# Create empty dictionary to store image data
image_data = {}
### ITERATE OVER THE IMAGE ELEMENTS IN THE XML FILE ###
for data in filtered_data:
# Extract the image filename from the Image_path
_, image_path_filename = os.path.split(data["Image_path"])
# Find the corresponding image element in the XML file
image_element = root.find(f".//image[@name='{image_path_filename}']")
# Check if the image element exists
if image_element is None:
print("Image element not found for:", image_path_filename)
continue
# Get the filename of the image from the image element
name = image_element.get("name")
# Check if the image file exists
if not os.path.isfile(data["Image_path"]):
print("Image file does not exist:", data["Image_path"])
else:
# Load the image
image = cv2.imread(data["Image_path"])
# Check if the image was loaded successfully
if image is None:
print("Failed to load the image:", data["Image_path"])
else:
# Create empty list to store box data for the image
box_data_image = []
# Iterate over the box elements in the image element
for box_element in image_element.iter("box"):
# Get the label of the box
box_label = box_element.get("label")
# Extract the bounding box coordinates
xmin = int(float(box_element.get("xtl")))
ymin = int(float(box_element.get("ytl")))
xmax = int(float(box_element.get("xbr")))
ymax = int(float(box_element.get("ybr")))
# Draw the bounding box on the image
cv2.rectangle(image, (xmin, ymin), (xmax, ymax), (0, 255, 0), 2)
# Display the label text near the bounding box
label_text = f"{box_label}"
label_text = f"{box_label.replace('Bouy', 'Buoy')}"
cv2.putText(
image,
label_text,
(xmin, ymin - 10),
cv2.FONT_HERSHEY_SIMPLEX,
0.9,
(0, 255, 0),
2,
)
# Calculate the center of the box
box_center_x = (xmin + xmax) / 2
# Calculate the normalized coordinates (-1 to 1) based on the image size and center
normalized_x = (box_center_x - center_x) / image_width * camera_fov_w
# Transform the angle into the radar frame summing the yaw angle of rotation between the camera and the radar frame
box_bearing = normalized_x - camera_yaw_rad
# Store the box data in the box_data list
# Add box data to the box_data_image list
box_data_image.append(
{
"bearing": box_bearing,
"label": box_label,
}
)
# Add the box_data_image to the image_data dictionary with image name as key
image_data[name] = box_data_image
# Display the image with the bounding boxes and annotations
cv2.imshow('Image with Annotations', image)
cv2.waitKey(delay=1)
cv2.destroyAllWindows()
### ITERATE OVER THE RADAR ELEMENTS IN THE XML FILE ###
for data in filtered_data:
# Load the radar image
radar_image_path = data["Radar_path"]
radar_image = cv2.imread(radar_image_path)
# Get the image name of the corresponding camera image
image_name = os.path.basename(data["Image_path"])
# Check if the image name exists in the image_data dictionary
if image_name in image_data:
# Get the box data associated with the image
box_data = image_data[image_name]
# Draw bearing lines and highlight points on the same side of the lines
for box in box_data:
bearing = box["bearing"]
# Calculate the center point of the radar image
center_x = radar_image.shape[1] // 2
center_y = radar_image.shape[0] // 2
# Calculate the line length based on the width of the radar image
line_length = int(0.4 * radar_image.shape[1])
# Calculate the endpoint coordinates of the line
end_x = center_x + int(line_length * math.cos(bearing))
end_y = center_y + int(line_length * math.sin(bearing))
# Draw a line from the center to the endpoint
cv2.line(radar_image, (center_x, center_y), (end_x, end_y), (0, 255, 0), 2)
# Find the white spots in the radar image
white_spots = np.where(radar_image[:, :, 2] > 200)
# Calculate the distances between the white spots and the bearing line
distances = np.abs(
(end_y - center_y) * white_spots[1]
- (end_x - center_x) * white_spots[0]
+ end_x * center_y
- end_y * center_x
) / np.sqrt((end_y - center_y) ** 2 + (end_x - center_x) ** 2)
# Threshold for considering a point on the bearing line
threshold = 5 # Adjust as needed
# Initialize variables to keep track of the last highlighted point
last_highlighted_point = None
highlighted_points = []
# Highlight the points on the same side of the line
for spot_y, spot_x, distance in zip(white_spots[0], white_spots[1], distances):
dot_product = (spot_x - center_x) * (end_x - center_x) + (spot_y - center_y) * (end_y - center_y)
if distance <= threshold and dot_product >= 0:
# Check if the point is different from the last highlighted point
if (spot_x, spot_y) != last_highlighted_point:
cv2.circle(radar_image, (spot_x, spot_y), 1, (0, 0, 255), -1)
highlighted_points.append((spot_x, spot_y))
last_highlighted_point = (spot_x, spot_y)
# Extract the second highlighted point or the point following the last bearing
if len(highlighted_points) >= 2:
second_point = highlighted_points[1]
elif len(highlighted_points) == 1:
second_point = highlighted_points[0]
else:
# No highlighted points, set a default value
second_point = None
if second_point:
# Extract max range for the radar image
max_range = data["xbr_max_range"]
# Calculate the distance of the second highlighted point
x, y = second_point
distance = (
max_range
* math.sqrt((x - center_x) ** 2 + (y - center_y) ** 2)
/ line_length
)
# Store the distance in the box data
box.setdefault("distance", []).append(distance)
# Update the box data in the image_data dictionary
image_data[image_name] = box_data
# Display the radar image
cv2.imshow('Radar Image with Points on Bearing Line', radar_image)
cv2.waitKey(0)
else:
print("Image name not found in image_data:", image_name)
cv2.destroyAllWindows()
# Get the first image name from the image_data dictionary
first_image_name = next(iter(image_data.keys()))
# Get the box data for the first image
first_image_box_data = image_data[first_image_name]
# Initialize empty lists for bearings, labels, and distances
bearings = []
labels = []
distances = []
# Iterate over each box in the first image's box data
for box in first_image_box_data:
# Check if the box has a 'bearing' and 'label' key
if "bearing" in box and "label" in box:
# Append the bearing and label to the respective lists
bearings.append(box["bearing"])
labels.append(box["label"])
# Check if the box has a 'distance' key
if "distance" in box:
# Append the distance to the distances list
distances.append(
box["distance"][0]
) # Assuming there's only one distance value per box
# Create a dictionary with the extracted data for the first image
initial = {"bearing": bearings[0], "label": labels[0], "distance": distances[0]}
# Print the result dictionary
print(initial)
### UKF ###
# parameters
initial_bearing = bearings[0] # First bearing
initial_distance = distances[0] # First distance
# Convert bearing and distance to x and y coordinates
initial_x = initial_distance * np.cos(initial_bearing)
initial_y = initial_distance * np.sin(initial_bearing)
x = np.array([initial["bearing"], initial["distance"], 0, 0]) # initial state
P = np.diag([1, 1]) # initial uncertainty
points = MerweScaledSigmaPoints(n=2, alpha=1e-3, beta=2, kappa=0)
ukf = UKF(dim_x=2, dim_z=2, dt=1.5, fx=fx, hx=hx, points=points)
ukf.x = x # initial state
ukf.P = P # initial uncertainty
z_std1 = np.radians(5) # degrees
z_std2 = 20 # meters
ukf.R = np.diag([z_std1**2, z_std2**2]) # measurement noise covariance matrix
ukf.Q = Q_discrete_white_noise(dim=2, var=0.01**2, dt=1.5, block_size=2) # process noise covariance matrix
zs = [[data[1], data[3]] for data in image_data]
for i in range(1, len(boat_position)):
z = zs[i]
ukf.predict()
ukf.update(z, boat_position[i]) # Pass the boat's position for the i-th measurement
print(ukf.x, 'log-likelihood', ukf.log_likelihood)```
I have some issue implementing the UKF, some ideas?
Please Tell us what’s happening in your own words.
Learning to describe problems is hard, but it is an important part of learning how to code.
Also, the more you say, the more we can help!
Sure, What I did is to take the information from the camera sensor and identify with bounding boxes the objects: Buoys or Boats. I calculated than their bearings referred to the camera. Furthermore I used the bearing to identify the objects into the radar images. However the distance obtained could not be accurate, as a matter of fact the radar doesn’t tell me what object I am looking at. In conclusion with all the measurements of bearings and distance I wanted to implement the UKF in my code, in order to estimate the objects and than compare the resoults with the measurements. The algorithm is not on-line, but based on a preorder set of frames.
Are you looking for some sort of help with this code?
Yes, I am a bit lost in the end of it. I have no idea if my implementation is correct, and if there are other things that i need to add
are you able to help me? I have no rush, but if you need more info lmk
I have this code:
def fx(x, dt):
# state transition functioin - predict next state based
# on constant velocity model x = vt + x_0
F = np.array([[1, 0, dt, 0],
[0, 1, 0, dt]])
return np.dot(F,x)
def hx(x):
# Extract the relative NE_X and NE_Y coordinates from the state vector x
x, y = x[0], x[1]
# Calculate distance
distance = math.sqrt((x)**2 + (y)**2)
# Calculate bearing in radians
bearing = math.atan2(y, x)
return np.array([[bearing], [distance]])
### UKF ###
### Preparation of the data ###
first_bearing = bearings[0]
first_distance = distances[0]
# Calculate the position of the object related to the boat frame
relative_ne_x = first_distance * math.cos(first_bearing)
relative_ne_y = first_distance * math.sin(first_bearing)
### Define the initial values and the functions ###
# Store the values in variable x if needed
x = np.array([relative_ne_x, relative_ne_y]) # Initial state
P = np.diag([1, 1]) # initial uncertainty
However, I get this error, even if everything seems correct:
Traceback (most recent call last):
File "Desktop/LABSF/Main.py", line 354, in <module>
ukf.predict()
File "Desktop/LABSF/env/lib/python3.9/site-packages/filterpy/kalman/UKF.py", line 388, in predict
self.compute_process_sigmas(dt, fx, **fx_args)
File "Desktop/LABSF/env/lib/python3.9/site-packages/filterpy/kalman/UKF.py", line 503, in compute_process_sigmas
self.sigmas_f[i] = fx(s, dt, **fx_args)
TypeError: fx() takes 1 positional argument but 2 were given
I tried to modify all the code increasing the number of initial states with the x and y velocities, but the problem remains, instead of having 1 and passing 2 I will have 2 and passing 4.