Skip to content
162 changes: 162 additions & 0 deletions library/drone.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
"""
Copyright MIT and Harvey Mudd College
MIT License
Summer 2022

Defines the interface of the Drone module of the racecar_core library.
"""

import abc
import copy
import numpy as np
from nptyping import NDArray


class Drone(abc.ABC):
"""
Returns the color image captured by the drone's camera.
"""

# The dimensions of the image in pixels
_WIDTH: int = 640
_HEIGHT: int = 480


def get_image(self) -> NDArray[(480, 640, 3), np.uint8]:
"""
Returns a deep copy of the current color image captured by the drone camera.

Returns:
An array representing the pixels in the image, organized as follows
0th dimension: pixel rows, indexed from top to bottom.
1st dimension: pixel columns, indexed from left to right.
2nd dimension: pixel color channels, in the blue-green-red format.

Note:
Each color value ranges from 0 to 255.

This function returns a deep copy of the captured image. This means that
we can modify the returned image and it will not change the image returned
by future calls to get_image().

Example::

# Initialize image with a deep copy of the most recent color image captured
# by the camera
image = rc.drone.get_image()

# Store the amount of blue in the pixel on row 3, column 5
blue = image[3][5][0]
"""
return copy.deepcopy(self.get_image_no_copy())

@abc.abstractmethod
def get_image_no_copy(self) -> NDArray[(480, 640, 3), np.uint8]:
"""
Returns a direct reference to the color image captured by the drone's camera.

Returns:
An array representing the pixels in the image, organized as follows
0th dimension: pixel rows, indexed from top to bottom.
1st dimension: pixel columns, indexed from left to right.
2nd dimension: pixel color channels, in the blue-green-red format.

Warning:
Do not modify the returned image. The returned image is a reference to the
captured image, so any changes will also affect the images returned by any
future calls to get_image() or get_image_no_copy().

Note:
Each color value ranges from 0 to 255.

Unlike get_image(), this function does not create a deep copy of the
captured image, and is therefore more efficient.

Example::

# Initialize image with a direct reference to the recent color image
# captured by the drone's camera
image = rc.drone.get_image()

# Store the amount of blue in the pixel on row 3, column 5
blue = image[3][5][0]

# We can safely call crop because it does not modify the source image
cropped_image = rc_utils.crop(image, (0, 0), (10, 10))

# However, if we wish to draw on the image, we must first create a manual
# copy with copy.deepcopy()
image_copy = copy.deepcopy(image)
modified_image = rc_utils.draw_circle(image_copy, (50, 50))
"""
pass

@abc.abstractmethod
def get_image_async(self) -> NDArray[(480, 640, 3), np.uint8]:
"""
Returns the current color image without the car in "go" mode.

Returns:
An array representing the pixels in the image, organized as follows
0th dimension: pixel rows, indexed from top to bottom.
1st dimension: pixel columns, indexed from left to right.
2nd dimension: pixel color channels, in the blue-green-red format.

Note:
Each color value ranges from 0 to 255.

Warning:
This function breaks the start-update paradigm and should only be used in
Jupyter Notebook.

Example::

# Initialize image with the most recent color image captured by the camera
image = rc.drone.get_image_async()

# Store the amount of blue in the pixel on row 3, column 5
blue = image[3][5][0]
"""
pass

@abc.abstractmethod
def get_height(self) -> float:
"""
Returns the height of the drone.

Returns:
A float that represents the height of the drone.

Example::

# Store the drone's height in the variable drone_height
drone_height = rc.drone.get_height()
"""
pass

@abc.abstractmethod
def set_height(self, height: float) -> None:
"""
Sets the drone's height.

Args:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add an Example:: here.

height: The vertical height the drone should fly to.

Example::

# Have the drone fly to an altitude of 100 meters.
rc.drone.set_height(100)
"""
pass

@abc.abstractmethod
def return_to_car(self) -> None:
"""
Lands the drone back on the car

Example::

# Have the drone land back on the car.
rc.drone.return_to_car()
"""
pass
26 changes: 26 additions & 0 deletions library/physics.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,29 @@ def get_angular_velocity(self) -> NDArray[3, np.float32]:
yaw = ang_vel[1]
"""
pass

@abc.abstractmethod
def get_position(self) -> NDArray[3, np.float32]:
"""
Returns a 3D vector containing the car's position relative to the Unity scene's global origin
as x, y, and z coordinates.

Returns:
The position of the car along the (x, y, z) axes during
the last frame in meters.

Note:
The x axis points East.
The y axis points directly up, perpendicular to the ground (corresponds to altitude).
The z axis points North.

Example::

# position_vector stores the position during the previous frame
position_vector = rc.physics.get_position()

x_position = position_vector[0]
y_position = position_vector[1]
z_position = position_vector[2]
"""
pass
2 changes: 2 additions & 0 deletions library/racecar_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import drive
import lidar
import physics
import drone

import racecar_utils as rc_utils

Expand All @@ -33,6 +34,7 @@ def __init__(self) -> None:
self.drive: drive.Drive
self.lidar: lidar.Lidar
self.physics: physics.Physics
self.drone: drone.Drone

@abc.abstractmethod
def go(self) -> None:
Expand Down
61 changes: 61 additions & 0 deletions library/simulation/drone_sim.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import sys
import numpy as np
import cv2 as cv
from nptyping import NDArray
import struct

from drone import Drone

class DroneSim(Drone):
def __init__(self, racecar) -> None:
self.__racecar = racecar
self.__drone_image: NDArray[(480, 640, 3), np.uint8] = None
self.__is_drone_image_current: bool = False

def get_image_no_copy(self) -> NDArray[(480, 640, 3), np.uint8]:
if not self.__is_drone_image_current:
self.__drone_image = self.__request_drone_image(False)
self.__is_drone_image_current = True

return self.__drone_image

def get_image_async(self) -> NDArray[(480, 640, 3), np.uint8]:
return self.__request_drone_image(True)

def get_height(self) -> float:
self.__racecar._RacecarSim__send_header(
self.__racecar.Header.drone_get_height
)
value = struct.unpack("f", self.__racecar._RacecarSim__receive_data())[0]
return value

def set_height(self, height: float) -> None:
self.__racecar._RacecarSim__send_data(
struct.pack(
"Bf", self.__racecar.Header.drone_set_height.value, height,
)
)

def return_to_car(self) -> None:
self.__racecar._RacecarSim__send_header(
self.__racecar.Header.drone_return_to_car
)

def __update(self) -> None:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Organize by public and private methods (ie move __update and __request_drone_image to the bottom)

self.__is_drone_image_current = False

def __request_drone_image(self, isAsync: bool) -> NDArray[(480, 640), np.uint8]:
# Ask for the current color image
self.__racecar._RacecarSim__send_header(
self.__racecar.Header.drone_get_image, isAsync
)

# Read the color image as 32 packets
raw_bytes = self.__racecar._RacecarSim__receive_fragmented(
32, self._WIDTH * self._HEIGHT * 4, isAsync
)
drone_image = np.frombuffer(raw_bytes, dtype=np.uint8)
drone_image = np.reshape(drone_image, (self._HEIGHT, self._WIDTH, 4), "C")

drone_image = cv.cvtColor(drone_image, cv.COLOR_RGB2BGR)
return drone_image
7 changes: 7 additions & 0 deletions library/simulation/physics_sim.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,10 @@ def get_angular_velocity(self) -> NDArray[3, np.float32]:
)
values = struct.unpack("fff", self.__racecar._RacecarSim__receive_data(12))
return np.array(values)

def get_position(self) -> NDArray[3, np.float32]:
self.__racecar._RacecarSim__send_header(
self.__racecar.Header.physics_get_position
)
values = struct.unpack("fff", self.__racecar._RacecarSim__receive_data(12))
return np.array(values)
10 changes: 9 additions & 1 deletion library/simulation/racecar_core_sim.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import drive_sim
import lidar_sim
import physics_sim
import drone_sim
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to change __VERSION = 1 to 2 since you are updating the communication protocol.


from racecar_core import Racecar
import racecar_utils as rc_utils
Expand All @@ -29,7 +30,7 @@ class RacecarSim(Racecar):
__IP = "127.0.0.1"
__UNITY_PORT = (__IP, 5065)
__UNITY_ASYNC_PORT = (__IP, 5064)
__VERSION = 1
__VERSION = 2

class Header(IntEnum):
"""
Expand Down Expand Up @@ -65,6 +66,11 @@ class Header(IntEnum):
lidar_get_samples = 26
physics_get_linear_acceleration = 27
physics_get_angular_velocity = 28
physics_get_position = 29
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any changes in Simulation/PythonInterface.cs will need to be made here too

drone_get_image = 30
drone_get_height = 31
drone_set_height = 32
drone_return_to_car = 33

class Error(IntEnum):
"""
Expand Down Expand Up @@ -112,6 +118,7 @@ def __init__(self, isHeadless: bool = False) -> None:
self.drive = drive_sim.DriveSim(self)
self.physics = physics_sim.PhysicsSim(self)
self.lidar = lidar_sim.LidarSim(self)
self.drone = drone_sim.DroneSim(self)

self.__start: Callable[[], None]
self.__update: Callable[[], None]
Expand Down Expand Up @@ -230,6 +237,7 @@ def __handle_update(self) -> None:
self.camera._CameraSim__update()
self.controller._ControllerSim__update()
self.lidar._LidarSim__update()
self.drone._DroneSim__update()

def __handle_sigint(self, signal_received: int, frame) -> None:
# Send exit command to sync port if we are in the middle of servicing a start
Expand Down