diff --git a/main_2024.py b/main_2024.py index d68adaa0..6d9ff098 100644 --- a/main_2024.py +++ b/main_2024.py @@ -122,6 +122,11 @@ def main() -> int: mp_manager, QUEUE_MAX_SIZE, ) + # Queue size of latest odometry data must be 1 + flight_interface_to_decision_queue = queue_proxy_wrapper.QueueProxyWrapper( + mp_manager, + 1, + ) data_merge_to_geolocation_queue = queue_proxy_wrapper.QueueProxyWrapper( mp_manager, QUEUE_MAX_SIZE, @@ -214,7 +219,7 @@ def main() -> int: FLIGHT_INTERFACE_WORKER_PERIOD, ), input_queues=[], - output_queues=[flight_interface_to_data_merge_queue], + output_queues=[flight_interface_to_data_merge_queue, flight_interface_to_decision_queue], controller=controller, local_logger=main_logger, ) @@ -382,6 +387,7 @@ def main() -> int: video_input_to_detect_target_queue.fill_and_drain_queue() detect_target_to_data_merge_queue.fill_and_drain_queue() flight_interface_to_data_merge_queue.fill_and_drain_queue() + flight_interface_to_decision_queue.fill_and_drain_queue() data_merge_to_geolocation_queue.fill_and_drain_queue() geolocation_to_main_queue.fill_and_drain_queue() diff --git a/modules/decision/decision_worker.py b/modules/decision/decision_worker.py new file mode 100644 index 00000000..6421831f --- /dev/null +++ b/modules/decision/decision_worker.py @@ -0,0 +1,74 @@ +""" +Retrieves list of landing pads from cluster estimation and outputs decision to the flight controller. +""" + +from utilities.workers import queue_proxy_wrapper +from utilities.workers import worker_controller +from . import decision +from . import landing_pad_tracking +from . import search_pattern + + +def decision_worker( + distance_squared_threshold: float, + tolerance: float, + camera_fov_forwards: float, + camera_fov_sideways: float, + search_height: float, + search_overlap: float, + small_adjustment: float, + odometry_input_queue: queue_proxy_wrapper.QueueProxyWrapper, + cluster_input_queue: queue_proxy_wrapper.QueueProxyWrapper, + output_queue: queue_proxy_wrapper.QueueProxyWrapper, + controller: worker_controller.WorkerController, +) -> None: + """ + Worker process. + + PARAMETERS + ---------- + - camera_fov_forwards, camera_fov_sideways, search_height, search_overlap, distance_squared_threshold, + and small_adjustment are arguments for the constructors below. + - cluster_input_queue and output_queue are the data queues. + - controller is how the main process communicates to this worker. + """ + # TODO: Need to rework how we get odometry data + + landing_pads = landing_pad_tracking.LandingPadTracking(distance_squared_threshold) + decision_maker = decision.Decision(tolerance) + search = search_pattern.SearchPattern( + camera_fov_forwards, + camera_fov_sideways, + search_height, + search_overlap, + current_position_x=0.0, + current_position_y=0.0, + distance_squared_threshold=distance_squared_threshold, + small_adjustment=small_adjustment, + ) + + while not controller.is_exit_requested(): + controller.check_pause() + + curr_state = odometry_input_queue.queue.get() + + if curr_state is None: + continue + + input_data = cluster_input_queue.queue.get() + + if input_data is None: + continue + + is_found, best_landing_pads = landing_pads.run(input_data) + + # Runs decision only if there exists a landing pad + if not is_found: + result, value = search.continue_search(curr_state) + else: + result, value = decision_maker.run(curr_state, best_landing_pads) + + if not result: + continue + + output_queue.queue.put(value) diff --git a/modules/decision/landing_pad_tracking.py b/modules/decision/landing_pad_tracking.py index c5b0e796..15090cbb 100644 --- a/modules/decision/landing_pad_tracking.py +++ b/modules/decision/landing_pad_tracking.py @@ -54,7 +54,7 @@ def mark_confirmed_positive(self, detection: object_in_world.ObjectInWorld) -> N def run( self, detections: "list[object_in_world.ObjectInWorld]" - ) -> "tuple[bool, object_in_world.ObjectInWorld | None]": + ) -> "tuple[bool, list[object_in_world.ObjectInWorld] | None]": """ Updates the list of unconfirmed positives and returns the a first confirmed positive if one exists, else the unconfirmed positive with the lowest variance. @@ -82,9 +82,9 @@ def run( # If new landing pad, add to list of unconfirmed positives self.__unconfirmed_positives.append(detection) - # If there are confirmed positives, return the first one + # If there are confirmed positives, return the entire list if len(self.__confirmed_positives) > 0: - return True, self.__confirmed_positives[0] + return True, self.__confirmed_positives # If the list is empty, all landing pads have been visited, none are viable if len(self.__unconfirmed_positives) == 0: @@ -93,5 +93,5 @@ def run( # Sort list by variance in ascending order self.__unconfirmed_positives.sort(key=lambda x: x.spherical_variance) - # Return detection with lowest variance - return True, self.__unconfirmed_positives[0] + # Return all detections + return True, self.__unconfirmed_positives diff --git a/modules/flight_interface/flight_interface_worker.py b/modules/flight_interface/flight_interface_worker.py index a4175dd7..f766dc97 100644 --- a/modules/flight_interface/flight_interface_worker.py +++ b/modules/flight_interface/flight_interface_worker.py @@ -5,6 +5,7 @@ import inspect import os import pathlib +import queue import time from utilities.workers import queue_proxy_wrapper @@ -19,6 +20,7 @@ def flight_interface_worker( baud_rate: int, period: float, output_queue: queue_proxy_wrapper.QueueProxyWrapper, + most_recent_odometry_queue: queue_proxy_wrapper.QueueProxyWrapper, controller: worker_controller.WorkerController, ) -> None: """ @@ -29,6 +31,10 @@ def flight_interface_worker( output_queue is the data queue. controller is how the main process communicates to this worker process. """ + if most_recent_odometry_queue.maxsize != 1: + print("ERROR: most_recent_odometry_queue must have a maximum size of 1") + return + # TODO: Error handling worker_name = pathlib.Path(__file__).stem @@ -64,4 +70,11 @@ def flight_interface_worker( if not result: continue + # Replace any existing odometry data with the latest odometry data + try: + most_recent_odometry_queue.queue.get_nowait() + except queue.Empty: + pass + + most_recent_odometry_queue.queue.put(value) output_queue.queue.put(value) diff --git a/requirements-pytorch.txt b/requirements-pytorch.txt index 1a8037b1..f9bacd7a 100644 --- a/requirements-pytorch.txt +++ b/requirements-pytorch.txt @@ -1,5 +1,5 @@ # For any non-Jetson computer (i.e. all developers) --extra-index-url https://download.pytorch.org/whl/cu117 -torch==1.13.1+cu117 -torchvision==0.14.1+cu117 +torch==2.0.1+cu117 +torchvision==0.15.2+cu117 ultralytics diff --git a/tests/integration/test_decision_worker.py b/tests/integration/test_decision_worker.py new file mode 100644 index 00000000..6f402bd0 --- /dev/null +++ b/tests/integration/test_decision_worker.py @@ -0,0 +1,149 @@ +""" +Test worker process. +""" + +import multiprocessing as mp +import time + +from modules import drone_odometry_local +from modules import object_in_world +from modules import odometry_and_time +from modules.decision import decision +from modules.decision import decision_worker +from utilities.workers import worker_controller +from utilities.workers import queue_proxy_wrapper + + +DISTANCE_SQUARED_THRESHOLD = 25 # Distance is in meters + +TOLERANCE = 0.1 # meters + +CAMERA_FOV_FORWARDS = 90 # degrees +CAMERA_FOV_SIDEWAYS = 120 # degrees + +SEARCH_HEIGHT = 10 # meters +SEARCH_OVERLAP = 0.2 + +SMALL_ADJUSTMENT = 0.5 # meters + +WORK_COUNT = 5 + + +def simulate_cluster_estimation_worker( + input_queue: queue_proxy_wrapper.QueueProxyWrapper, +) -> None: + """ + Places list of detected landing pads into the queue. + """ + + result, object1 = object_in_world.ObjectInWorld.create(1.0, 2.0, 0.9) + assert result + assert object1 is not None + + result, object2 = object_in_world.ObjectInWorld.create(4.5, 3.0, 2.0) + assert result + assert object2 is not None + + result, object3 = object_in_world.ObjectInWorld.create(7.2, 2.9, 4.6) + assert result + assert object3 is not None + + objects = [object1, object2, object3] + + input_queue.queue.put(objects) + + +def simulate_flight_interface_worker( + timestamp: float, odometry_queue: queue_proxy_wrapper.QueueProxyWrapper +) -> None: + """ + Place odometry data into queue of size 1. + """ + + result, drone_position = drone_odometry_local.DronePositionLocal.create(0.0, 2.0, -1.0) + assert result + assert drone_position is not None + + result, drone_orientation = drone_odometry_local.DroneOrientationLocal.create_new(0.0, 0.0, 0.0) + assert result + assert drone_orientation is not None + + result, drone_odometry = drone_odometry_local.DroneOdometryLocal.create( + drone_position, drone_orientation + ) + assert result + assert drone_odometry is not None + + result, drone_odometry_and_time = odometry_and_time.OdometryAndTime.create(drone_odometry) + assert result + assert drone_odometry_and_time is not None + + drone_odometry_and_time.timestamp = timestamp + + odometry_queue.queue.put(drone_odometry_and_time) + + +def main() -> int: + """ + Main function. + """ + controller = worker_controller.WorkerController() + mp_manager = mp.Manager() + + cluster_input_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) + odometry_input_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager, maxsize=1) + decision_output_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) + + worker = mp.Process( + target=decision_worker.decision_worker, + args=( + DISTANCE_SQUARED_THRESHOLD, + TOLERANCE, + CAMERA_FOV_FORWARDS, + CAMERA_FOV_SIDEWAYS, + SEARCH_HEIGHT, + SEARCH_OVERLAP, + SMALL_ADJUSTMENT, + odometry_input_queue, + cluster_input_queue, + decision_output_queue, + controller, + ), + ) + + # Starts the decision worker + worker.start() + + # Simulate odometry data and cluster estimation + for i in range(0, WORK_COUNT): + simulate_flight_interface_worker(i, odometry_input_queue) + simulate_cluster_estimation_worker(cluster_input_queue) + + time.sleep(1) + + for i in range(0, WORK_COUNT): + simulate_flight_interface_worker(i, odometry_input_queue) + simulate_cluster_estimation_worker(cluster_input_queue) + + controller.request_exit() + + # Test + for i in range(0, WORK_COUNT * 2): + decision_output: decision.Decision = decision_output_queue.queue.get_nowait() + print(f"Decision output: {decision_output}") + assert decision_output is not None + + # Teardown + odometry_input_queue.fill_and_drain_queue() + cluster_input_queue.fill_and_drain_queue() + worker.join() + + return 0 + + +if __name__ == "__main__": + result_main = main() + if result_main < 0: + print(f"ERROR: Status code: {result_main}") + + print("Done!") diff --git a/tests/integration/test_flight_interface_worker.py b/tests/integration/test_flight_interface_worker.py index 10d767fe..703bc150 100644 --- a/tests/integration/test_flight_interface_worker.py +++ b/tests/integration/test_flight_interface_worker.py @@ -16,6 +16,7 @@ FLIGHT_INTERFACE_TIMEOUT = 10.0 # seconds FLIGHT_INTERFACE_BAUD_RATE = 57600 # symbol rate FLIGHT_INTERFACE_WORKER_PERIOD = 0.1 # seconds +LATEST_ODOMETRY_QUEUE_MAX_SIZE = 1 # Max items allowed in latest odometry queue def main() -> int: @@ -28,6 +29,9 @@ def main() -> int: mp_manager = mp.Manager() out_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager) + latest_odometry_queue = queue_proxy_wrapper.QueueProxyWrapper( + mp_manager, LATEST_ODOMETRY_QUEUE_MAX_SIZE + ) worker = mp.Process( target=flight_interface_worker.flight_interface_worker, @@ -37,6 +41,7 @@ def main() -> int: FLIGHT_INTERFACE_BAUD_RATE, FLIGHT_INTERFACE_WORKER_PERIOD, out_queue, + latest_odometry_queue, controller, ), )