Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion main_2024.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ def main() -> int:
mp_manager,
QUEUE_MAX_SIZE,
)
# Queue size of latest odometry data must be 1
flight_interface_to_decision_queue = queue_proxy_wrapper.QueueProxyWrapper(
mp_manager,
1,
)
data_merge_to_geolocation_queue = queue_proxy_wrapper.QueueProxyWrapper(
mp_manager,
QUEUE_MAX_SIZE,
Expand Down Expand Up @@ -214,7 +219,7 @@ def main() -> int:
FLIGHT_INTERFACE_WORKER_PERIOD,
),
input_queues=[],
output_queues=[flight_interface_to_data_merge_queue],
output_queues=[flight_interface_to_data_merge_queue, flight_interface_to_decision_queue],
controller=controller,
local_logger=main_logger,
)
Expand Down Expand Up @@ -382,6 +387,7 @@ def main() -> int:
video_input_to_detect_target_queue.fill_and_drain_queue()
detect_target_to_data_merge_queue.fill_and_drain_queue()
flight_interface_to_data_merge_queue.fill_and_drain_queue()
flight_interface_to_decision_queue.fill_and_drain_queue()
data_merge_to_geolocation_queue.fill_and_drain_queue()
geolocation_to_main_queue.fill_and_drain_queue()

Expand Down
74 changes: 74 additions & 0 deletions modules/decision/decision_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""
Retrieves list of landing pads from cluster estimation and outputs decision to the flight controller.
"""

from utilities.workers import queue_proxy_wrapper
from utilities.workers import worker_controller
from . import decision
from . import landing_pad_tracking
from . import search_pattern


def decision_worker(
distance_squared_threshold: float,
tolerance: float,
camera_fov_forwards: float,
camera_fov_sideways: float,
search_height: float,
search_overlap: float,
small_adjustment: float,
odometry_input_queue: queue_proxy_wrapper.QueueProxyWrapper,
cluster_input_queue: queue_proxy_wrapper.QueueProxyWrapper,
output_queue: queue_proxy_wrapper.QueueProxyWrapper,
controller: worker_controller.WorkerController,
) -> None:
"""
Worker process.

PARAMETERS
----------
- camera_fov_forwards, camera_fov_sideways, search_height, search_overlap, distance_squared_threshold,
and small_adjustment are arguments for the constructors below.
- cluster_input_queue and output_queue are the data queues.
- controller is how the main process communicates to this worker.
"""
# TODO: Need to rework how we get odometry data

landing_pads = landing_pad_tracking.LandingPadTracking(distance_squared_threshold)
decision_maker = decision.Decision(tolerance)
search = search_pattern.SearchPattern(
camera_fov_forwards,
camera_fov_sideways,
search_height,
search_overlap,
current_position_x=0.0,
current_position_y=0.0,
distance_squared_threshold=distance_squared_threshold,
small_adjustment=small_adjustment,
)

while not controller.is_exit_requested():
controller.check_pause()

curr_state = odometry_input_queue.queue.get()

if curr_state is None:
continue

input_data = cluster_input_queue.queue.get()

if input_data is None:
continue

is_found, best_landing_pads = landing_pads.run(input_data)

# Runs decision only if there exists a landing pad
if not is_found:
result, value = search.continue_search(curr_state)
else:
result, value = decision_maker.run(curr_state, best_landing_pads)

if not result:
continue

output_queue.queue.put(value)
10 changes: 5 additions & 5 deletions modules/decision/landing_pad_tracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def mark_confirmed_positive(self, detection: object_in_world.ObjectInWorld) -> N

def run(
self, detections: "list[object_in_world.ObjectInWorld]"
) -> "tuple[bool, object_in_world.ObjectInWorld | None]":
) -> "tuple[bool, list[object_in_world.ObjectInWorld] | None]":
"""
Updates the list of unconfirmed positives and returns the a first confirmed positive if
one exists, else the unconfirmed positive with the lowest variance.
Expand Down Expand Up @@ -82,9 +82,9 @@ def run(
# If new landing pad, add to list of unconfirmed positives
self.__unconfirmed_positives.append(detection)

# If there are confirmed positives, return the first one
# If there are confirmed positives, return the entire list
if len(self.__confirmed_positives) > 0:
return True, self.__confirmed_positives[0]
return True, self.__confirmed_positives

# If the list is empty, all landing pads have been visited, none are viable
if len(self.__unconfirmed_positives) == 0:
Expand All @@ -93,5 +93,5 @@ def run(
# Sort list by variance in ascending order
self.__unconfirmed_positives.sort(key=lambda x: x.spherical_variance)

# Return detection with lowest variance
return True, self.__unconfirmed_positives[0]
# Return all detections
return True, self.__unconfirmed_positives
13 changes: 13 additions & 0 deletions modules/flight_interface/flight_interface_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import inspect
import os
import pathlib
import queue
import time

from utilities.workers import queue_proxy_wrapper
Expand All @@ -19,6 +20,7 @@ def flight_interface_worker(
baud_rate: int,
period: float,
output_queue: queue_proxy_wrapper.QueueProxyWrapper,
most_recent_odometry_queue: queue_proxy_wrapper.QueueProxyWrapper,
controller: worker_controller.WorkerController,
) -> None:
"""
Expand All @@ -29,6 +31,10 @@ def flight_interface_worker(
output_queue is the data queue.
controller is how the main process communicates to this worker process.
"""
if most_recent_odometry_queue.maxsize != 1:
print("ERROR: most_recent_odometry_queue must have a maximum size of 1")
return

# TODO: Error handling

worker_name = pathlib.Path(__file__).stem
Expand Down Expand Up @@ -64,4 +70,11 @@ def flight_interface_worker(
if not result:
continue

# Replace any existing odometry data with the latest odometry data
try:
most_recent_odometry_queue.queue.get_nowait()
except queue.Empty:
pass

most_recent_odometry_queue.queue.put(value)
output_queue.queue.put(value)
4 changes: 2 additions & 2 deletions requirements-pytorch.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# For any non-Jetson computer (i.e. all developers)
--extra-index-url https://download.pytorch.org/whl/cu117
torch==1.13.1+cu117
torchvision==0.14.1+cu117
torch==2.0.1+cu117
torchvision==0.15.2+cu117
ultralytics
149 changes: 149 additions & 0 deletions tests/integration/test_decision_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
"""
Test worker process.
"""

import multiprocessing as mp
import time

from modules import drone_odometry_local
from modules import object_in_world
from modules import odometry_and_time
from modules.decision import decision
from modules.decision import decision_worker
from utilities.workers import worker_controller
from utilities.workers import queue_proxy_wrapper


DISTANCE_SQUARED_THRESHOLD = 25 # Distance is in meters

TOLERANCE = 0.1 # meters

CAMERA_FOV_FORWARDS = 90 # degrees
CAMERA_FOV_SIDEWAYS = 120 # degrees

SEARCH_HEIGHT = 10 # meters
SEARCH_OVERLAP = 0.2

SMALL_ADJUSTMENT = 0.5 # meters

WORK_COUNT = 5


def simulate_cluster_estimation_worker(
input_queue: queue_proxy_wrapper.QueueProxyWrapper,
) -> None:
"""
Places list of detected landing pads into the queue.
"""

result, object1 = object_in_world.ObjectInWorld.create(1.0, 2.0, 0.9)
assert result
assert object1 is not None

result, object2 = object_in_world.ObjectInWorld.create(4.5, 3.0, 2.0)
assert result
assert object2 is not None

result, object3 = object_in_world.ObjectInWorld.create(7.2, 2.9, 4.6)
assert result
assert object3 is not None

objects = [object1, object2, object3]

input_queue.queue.put(objects)


def simulate_flight_interface_worker(
timestamp: float, odometry_queue: queue_proxy_wrapper.QueueProxyWrapper
) -> None:
"""
Place odometry data into queue of size 1.
"""

result, drone_position = drone_odometry_local.DronePositionLocal.create(0.0, 2.0, -1.0)
assert result
assert drone_position is not None

result, drone_orientation = drone_odometry_local.DroneOrientationLocal.create_new(0.0, 0.0, 0.0)
assert result
assert drone_orientation is not None

result, drone_odometry = drone_odometry_local.DroneOdometryLocal.create(
drone_position, drone_orientation
)
assert result
assert drone_odometry is not None

result, drone_odometry_and_time = odometry_and_time.OdometryAndTime.create(drone_odometry)
assert result
assert drone_odometry_and_time is not None

drone_odometry_and_time.timestamp = timestamp

odometry_queue.queue.put(drone_odometry_and_time)


def main() -> int:
"""
Main function.
"""
controller = worker_controller.WorkerController()
mp_manager = mp.Manager()

cluster_input_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager)
odometry_input_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager, maxsize=1)
decision_output_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager)

worker = mp.Process(
target=decision_worker.decision_worker,
args=(
DISTANCE_SQUARED_THRESHOLD,
TOLERANCE,
CAMERA_FOV_FORWARDS,
CAMERA_FOV_SIDEWAYS,
SEARCH_HEIGHT,
SEARCH_OVERLAP,
SMALL_ADJUSTMENT,
odometry_input_queue,
cluster_input_queue,
decision_output_queue,
controller,
),
)

# Starts the decision worker
worker.start()

# Simulate odometry data and cluster estimation
for i in range(0, WORK_COUNT):
simulate_flight_interface_worker(i, odometry_input_queue)
simulate_cluster_estimation_worker(cluster_input_queue)

time.sleep(1)

for i in range(0, WORK_COUNT):
simulate_flight_interface_worker(i, odometry_input_queue)
simulate_cluster_estimation_worker(cluster_input_queue)

controller.request_exit()

# Test
for i in range(0, WORK_COUNT * 2):
decision_output: decision.Decision = decision_output_queue.queue.get_nowait()
print(f"Decision output: {decision_output}")
assert decision_output is not None

# Teardown
odometry_input_queue.fill_and_drain_queue()
cluster_input_queue.fill_and_drain_queue()
worker.join()

return 0


if __name__ == "__main__":
result_main = main()
if result_main < 0:
print(f"ERROR: Status code: {result_main}")

print("Done!")
5 changes: 5 additions & 0 deletions tests/integration/test_flight_interface_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
FLIGHT_INTERFACE_TIMEOUT = 10.0 # seconds
FLIGHT_INTERFACE_BAUD_RATE = 57600 # symbol rate
FLIGHT_INTERFACE_WORKER_PERIOD = 0.1 # seconds
LATEST_ODOMETRY_QUEUE_MAX_SIZE = 1 # Max items allowed in latest odometry queue


def main() -> int:
Expand All @@ -28,6 +29,9 @@ def main() -> int:
mp_manager = mp.Manager()

out_queue = queue_proxy_wrapper.QueueProxyWrapper(mp_manager)
latest_odometry_queue = queue_proxy_wrapper.QueueProxyWrapper(
mp_manager, LATEST_ODOMETRY_QUEUE_MAX_SIZE
)

worker = mp.Process(
target=flight_interface_worker.flight_interface_worker,
Expand All @@ -37,6 +41,7 @@ def main() -> int:
FLIGHT_INTERFACE_BAUD_RATE,
FLIGHT_INTERFACE_WORKER_PERIOD,
out_queue,
latest_odometry_queue,
controller,
),
)
Expand Down
Loading