diff --git a/library/drone.py b/library/drone.py new file mode 100644 index 0000000..600f0c1 --- /dev/null +++ b/library/drone.py @@ -0,0 +1,162 @@ +""" +Copyright MIT and Harvey Mudd College +MIT License +Summer 2022 + +Defines the interface of the Drone module of the racecar_core library. +""" + +import abc +import copy +import numpy as np +from nptyping import NDArray + + +class Drone(abc.ABC): + """ + Returns the color image captured by the drone's camera. + """ + + # The dimensions of the image in pixels + _WIDTH: int = 640 + _HEIGHT: int = 480 + + + def get_image(self) -> NDArray[(480, 640, 3), np.uint8]: + """ + Returns a deep copy of the current color image captured by the drone camera. + + Returns: + An array representing the pixels in the image, organized as follows + 0th dimension: pixel rows, indexed from top to bottom. + 1st dimension: pixel columns, indexed from left to right. + 2nd dimension: pixel color channels, in the blue-green-red format. + + Note: + Each color value ranges from 0 to 255. + + This function returns a deep copy of the captured image. This means that + we can modify the returned image and it will not change the image returned + by future calls to get_image(). + + Example:: + + # Initialize image with a deep copy of the most recent color image captured + # by the camera + image = rc.drone.get_image() + + # Store the amount of blue in the pixel on row 3, column 5 + blue = image[3][5][0] + """ + return copy.deepcopy(self.get_image_no_copy()) + + @abc.abstractmethod + def get_image_no_copy(self) -> NDArray[(480, 640, 3), np.uint8]: + """ + Returns a direct reference to the color image captured by the drone's camera. + + Returns: + An array representing the pixels in the image, organized as follows + 0th dimension: pixel rows, indexed from top to bottom. + 1st dimension: pixel columns, indexed from left to right. + 2nd dimension: pixel color channels, in the blue-green-red format. + + Warning: + Do not modify the returned image. The returned image is a reference to the + captured image, so any changes will also affect the images returned by any + future calls to get_image() or get_image_no_copy(). + + Note: + Each color value ranges from 0 to 255. + + Unlike get_image(), this function does not create a deep copy of the + captured image, and is therefore more efficient. + + Example:: + + # Initialize image with a direct reference to the recent color image + # captured by the drone's camera + image = rc.drone.get_image() + + # Store the amount of blue in the pixel on row 3, column 5 + blue = image[3][5][0] + + # We can safely call crop because it does not modify the source image + cropped_image = rc_utils.crop(image, (0, 0), (10, 10)) + + # However, if we wish to draw on the image, we must first create a manual + # copy with copy.deepcopy() + image_copy = copy.deepcopy(image) + modified_image = rc_utils.draw_circle(image_copy, (50, 50)) + """ + pass + + @abc.abstractmethod + def get_image_async(self) -> NDArray[(480, 640, 3), np.uint8]: + """ + Returns the current color image without the car in "go" mode. + + Returns: + An array representing the pixels in the image, organized as follows + 0th dimension: pixel rows, indexed from top to bottom. + 1st dimension: pixel columns, indexed from left to right. + 2nd dimension: pixel color channels, in the blue-green-red format. + + Note: + Each color value ranges from 0 to 255. + + Warning: + This function breaks the start-update paradigm and should only be used in + Jupyter Notebook. + + Example:: + + # Initialize image with the most recent color image captured by the camera + image = rc.drone.get_image_async() + + # Store the amount of blue in the pixel on row 3, column 5 + blue = image[3][5][0] + """ + pass + + @abc.abstractmethod + def get_height(self) -> float: + """ + Returns the height of the drone. + + Returns: + A float that represents the height of the drone. + + Example:: + + # Store the drone's height in the variable drone_height + drone_height = rc.drone.get_height() + """ + pass + + @abc.abstractmethod + def set_height(self, height: float) -> None: + """ + Sets the drone's height. + + Args: + height: The vertical height the drone should fly to. + + Example:: + + # Have the drone fly to an altitude of 100 meters. + rc.drone.set_height(100) + """ + pass + + @abc.abstractmethod + def return_to_car(self) -> None: + """ + Lands the drone back on the car + + Example:: + + # Have the drone land back on the car. + rc.drone.return_to_car() + """ + pass diff --git a/library/physics.py b/library/physics.py index 5ad59cb..b48ca93 100644 --- a/library/physics.py +++ b/library/physics.py @@ -68,3 +68,29 @@ def get_angular_velocity(self) -> NDArray[3, np.float32]: yaw = ang_vel[1] """ pass + + @abc.abstractmethod + def get_position(self) -> NDArray[3, np.float32]: + """ + Returns a 3D vector containing the car's position relative to the Unity scene's global origin + as x, y, and z coordinates. + + Returns: + The position of the car along the (x, y, z) axes during + the last frame in meters. + + Note: + The x axis points East. + The y axis points directly up, perpendicular to the ground (corresponds to altitude). + The z axis points North. + + Example:: + + # position_vector stores the position during the previous frame + position_vector = rc.physics.get_position() + + x_position = position_vector[0] + y_position = position_vector[1] + z_position = position_vector[2] + """ + pass diff --git a/library/racecar_core.py b/library/racecar_core.py index 56b4bd6..0dda9b8 100644 --- a/library/racecar_core.py +++ b/library/racecar_core.py @@ -16,6 +16,7 @@ import drive import lidar import physics +import drone import racecar_utils as rc_utils @@ -33,6 +34,7 @@ def __init__(self) -> None: self.drive: drive.Drive self.lidar: lidar.Lidar self.physics: physics.Physics + self.drone: drone.Drone @abc.abstractmethod def go(self) -> None: diff --git a/library/simulation/drone_sim.py b/library/simulation/drone_sim.py new file mode 100644 index 0000000..db97368 --- /dev/null +++ b/library/simulation/drone_sim.py @@ -0,0 +1,61 @@ +import sys +import numpy as np +import cv2 as cv +from nptyping import NDArray +import struct + +from drone import Drone + +class DroneSim(Drone): + def __init__(self, racecar) -> None: + self.__racecar = racecar + self.__drone_image: NDArray[(480, 640, 3), np.uint8] = None + self.__is_drone_image_current: bool = False + + def get_image_no_copy(self) -> NDArray[(480, 640, 3), np.uint8]: + if not self.__is_drone_image_current: + self.__drone_image = self.__request_drone_image(False) + self.__is_drone_image_current = True + + return self.__drone_image + + def get_image_async(self) -> NDArray[(480, 640, 3), np.uint8]: + return self.__request_drone_image(True) + + def get_height(self) -> float: + self.__racecar._RacecarSim__send_header( + self.__racecar.Header.drone_get_height + ) + value = struct.unpack("f", self.__racecar._RacecarSim__receive_data())[0] + return value + + def set_height(self, height: float) -> None: + self.__racecar._RacecarSim__send_data( + struct.pack( + "Bf", self.__racecar.Header.drone_set_height.value, height, + ) + ) + + def return_to_car(self) -> None: + self.__racecar._RacecarSim__send_header( + self.__racecar.Header.drone_return_to_car + ) + + def __update(self) -> None: + self.__is_drone_image_current = False + + def __request_drone_image(self, isAsync: bool) -> NDArray[(480, 640), np.uint8]: + # Ask for the current color image + self.__racecar._RacecarSim__send_header( + self.__racecar.Header.drone_get_image, isAsync + ) + + # Read the color image as 32 packets + raw_bytes = self.__racecar._RacecarSim__receive_fragmented( + 32, self._WIDTH * self._HEIGHT * 4, isAsync + ) + drone_image = np.frombuffer(raw_bytes, dtype=np.uint8) + drone_image = np.reshape(drone_image, (self._HEIGHT, self._WIDTH, 4), "C") + + drone_image = cv.cvtColor(drone_image, cv.COLOR_RGB2BGR) + return drone_image diff --git a/library/simulation/physics_sim.py b/library/simulation/physics_sim.py index 6dfe710..3c2fe68 100644 --- a/library/simulation/physics_sim.py +++ b/library/simulation/physics_sim.py @@ -22,3 +22,10 @@ def get_angular_velocity(self) -> NDArray[3, np.float32]: ) values = struct.unpack("fff", self.__racecar._RacecarSim__receive_data(12)) return np.array(values) + + def get_position(self) -> NDArray[3, np.float32]: + self.__racecar._RacecarSim__send_header( + self.__racecar.Header.physics_get_position + ) + values = struct.unpack("fff", self.__racecar._RacecarSim__receive_data(12)) + return np.array(values) diff --git a/library/simulation/racecar_core_sim.py b/library/simulation/racecar_core_sim.py index f4ce960..f76a37f 100644 --- a/library/simulation/racecar_core_sim.py +++ b/library/simulation/racecar_core_sim.py @@ -20,6 +20,7 @@ import drive_sim import lidar_sim import physics_sim +import drone_sim from racecar_core import Racecar import racecar_utils as rc_utils @@ -29,7 +30,7 @@ class RacecarSim(Racecar): __IP = "127.0.0.1" __UNITY_PORT = (__IP, 5065) __UNITY_ASYNC_PORT = (__IP, 5064) - __VERSION = 1 + __VERSION = 2 class Header(IntEnum): """ @@ -65,6 +66,11 @@ class Header(IntEnum): lidar_get_samples = 26 physics_get_linear_acceleration = 27 physics_get_angular_velocity = 28 + physics_get_position = 29 + drone_get_image = 30 + drone_get_height = 31 + drone_set_height = 32 + drone_return_to_car = 33 class Error(IntEnum): """ @@ -112,6 +118,7 @@ def __init__(self, isHeadless: bool = False) -> None: self.drive = drive_sim.DriveSim(self) self.physics = physics_sim.PhysicsSim(self) self.lidar = lidar_sim.LidarSim(self) + self.drone = drone_sim.DroneSim(self) self.__start: Callable[[], None] self.__update: Callable[[], None] @@ -230,6 +237,7 @@ def __handle_update(self) -> None: self.camera._CameraSim__update() self.controller._ControllerSim__update() self.lidar._LidarSim__update() + self.drone._DroneSim__update() def __handle_sigint(self, signal_received: int, frame) -> None: # Send exit command to sync port if we are in the middle of servicing a start