Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 22 additions & 30 deletions competition_code/competition_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,33 +120,34 @@ async def evaluate_solution(

# Spawn vehicle and sensors to receive data
waypoints = world.maneuverable_waypoints
yaw_vector = waypoints[1].location - waypoints[0].location
initial_yaw = np.arctan2(yaw_vector[1], yaw_vector[0])
initial_rpy = np.array([0, 0, initial_yaw])

vehicle = world.spawn_vehicle(
"vehicle.tesla.model3",
waypoints[0].location + np.array([0,0,1]),
waypoints[0].roll_pitch_yaw,
True,
"vehicle.audi.tt",
waypoints[0].location + np.array([0, 0, 2.5]),
initial_rpy, # ← aligned with path
True
)


assert vehicle is not None

print(f" Vehicle spawned: {vehicle}")

camera = vehicle.attach_camera_sensor(
roar_py_interface.RoarPyCameraSensorDataRGB,
np.array([-2.0 * vehicle.bounding_box.extent[0], 0.0, 3.0 * vehicle.bounding_box.extent[2]]), # relative position
np.array([0, 10/180.0*np.pi, 0]), # relative rotation
np.array([-2.0 * vehicle.bounding_box.extent[0], 0.0, 3.0 * vehicle.bounding_box.extent[2]]),
np.array([0, 10/180.0*np.pi, 0]),
image_width=1024,
image_height=768
)
location_sensor = vehicle.attach_location_in_world_sensor()
velocity_sensor = vehicle.attach_velocimeter_sensor()
rpy_sensor = vehicle.attach_roll_pitch_yaw_sensor()
occupancy_map_sensor = vehicle.attach_occupancy_map_sensor(
50,
50,
2.0,
2.0
)
collision_sensor = vehicle.attach_collision_sensor(
np.zeros(3),
np.zeros(3)
)
occupancy_map_sensor = vehicle.attach_occupancy_map_sensor(50, 50, 2.0, 2.0)
collision_sensor = vehicle.attach_collision_sensor(np.zeros(3), np.zeros(3))

assert camera is not None
assert location_sensor is not None
Expand All @@ -156,7 +157,6 @@ async def evaluate_solution(
assert collision_sensor is not None


# Start to run solution
solution : RoarCompetitionSolution = solution_constructor(
waypoints,
RoarCompetitionAgentWrapper(vehicle),
Expand All @@ -167,40 +167,31 @@ async def evaluate_solution(
occupancy_map_sensor,
collision_sensor
)
rule = RoarCompetitionRule(waypoints * 3,vehicle,world) # 3 laps

rule = RoarCompetitionRule(waypoints * 3, vehicle, world) # 3 laps

for _ in range(20):
await world.step()

rule.initialize_race()
# vehicle.close()
# exit()

# Timer starts here
start_time = world.last_tick_elapsed_seconds
current_time = start_time
await vehicle.receive_observation()
await solution.initialize()


while True:
# terminate if time out
current_time = world.last_tick_elapsed_seconds
if current_time - start_time > max_seconds:
vehicle.close()
return None

# receive sensors' data
await vehicle.receive_observation()

await rule.tick()

# terminate if there is major collision
collision_impulse_norm = np.linalg.norm(collision_sensor.get_last_observation().impulse_normal)
if collision_impulse_norm > 100.0:
# vehicle.close()
print(f"major collision of tensity {collision_impulse_norm}")
# return None
print(f"Major collision! Intensity: {collision_impulse_norm:.2f}")
await rule.respawn()

if rule.lap_finished():
Expand All @@ -214,7 +205,7 @@ async def evaluate_solution(
await solution.step()
await world.step()

print("end of the loop")
print("🏁 End of the loop")
end_time = world.last_tick_elapsed_seconds
vehicle.close()
if enable_visualization:
Expand All @@ -224,6 +215,7 @@ async def evaluate_solution(
"elapsed_time" : end_time - start_time,
}


async def main():
carla_client = carla.Client('127.0.0.1', 2000)
carla_client.set_timeout(5.0)
Expand Down
162 changes: 106 additions & 56 deletions competition_code/submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,37 @@
Competition instructions:
Please do not change anything else but fill out the to-do sections.
"""
print("submission.py loaded")

from typing import List, Tuple, Dict, Optional
from typing import List
import roar_py_interface
import numpy as np

def normalize_rad(rad : float):

def normalize_rad(rad: float):
return (rad + np.pi) % (2 * np.pi) - np.pi

def filter_waypoints(location : np.ndarray, current_idx: int, waypoints : List[roar_py_interface.RoarPyWaypoint]) -> int:
def dist_to_waypoint(waypoint : roar_py_interface.RoarPyWaypoint):
return np.linalg.norm(
location[:2] - waypoint.location[:2]
)

def filter_waypoints(location: np.ndarray, current_idx: int, waypoints: List[roar_py_interface.RoarPyWaypoint]) -> int:
def dist_to_waypoint(waypoint: roar_py_interface.RoarPyWaypoint):
return np.linalg.norm(location[:2] - waypoint.location[:2])
for i in range(current_idx, len(waypoints) + current_idx):
if dist_to_waypoint(waypoints[i%len(waypoints)]) < 3:
if dist_to_waypoint(waypoints[i % len(waypoints)]) < 3:
return i % len(waypoints)
return current_idx


class RoarCompetitionSolution:
def __init__(
self,
maneuverable_waypoints: List[roar_py_interface.RoarPyWaypoint],
vehicle : roar_py_interface.RoarPyActor,
camera_sensor : roar_py_interface.RoarPyCameraSensor = None,
location_sensor : roar_py_interface.RoarPyLocationInWorldSensor = None,
velocity_sensor : roar_py_interface.RoarPyVelocimeterSensor = None,
rpy_sensor : roar_py_interface.RoarPyRollPitchYawSensor = None,
occupancy_map_sensor : roar_py_interface.RoarPyOccupancyMapSensor = None,
collision_sensor : roar_py_interface.RoarPyCollisionSensor = None,
vehicle: roar_py_interface.RoarPyActor,
camera_sensor: roar_py_interface.RoarPyCameraSensor = None,
location_sensor: roar_py_interface.RoarPyLocationInWorldSensor = None,
velocity_sensor: roar_py_interface.RoarPyVelocimeterSensor = None,
rpy_sensor: roar_py_interface.RoarPyRollPitchYawSensor = None,
occupancy_map_sensor: roar_py_interface.RoarPyOccupancyMapSensor = None,
collision_sensor: roar_py_interface.RoarPyCollisionSensor = None,
) -> None:
self.maneuverable_waypoints = maneuverable_waypoints
self.vehicle = vehicle
Expand All @@ -40,69 +42,117 @@ def __init__(
self.rpy_sensor = rpy_sensor
self.occupancy_map_sensor = occupancy_map_sensor
self.collision_sensor = collision_sensor


self._prev_steer = 0.0
self._steer_alpha = 0.5 # stronger smoothing
self._base_speed = 20.0 # normal cruising speed

async def initialize(self) -> None:
# TODO: You can do some initial computation here if you want to.
# For example, you can compute the path to the first waypoint.
self._prev_steer = 0.0
self._last_yaw = None
self._last_dt = 0.05

# Receive location, rotation and velocity data
vehicle_location = self.location_sensor.get_last_gym_observation()
vehicle_rotation = self.rpy_sensor.get_last_gym_observation()
vehicle_velocity = self.velocity_sensor.get_last_gym_observation()
vehicle_heading = np.array([np.cos(vehicle_rotation[2]), np.sin(vehicle_rotation[2])])

self.current_waypoint_idx = 10
self.current_waypoint_idx = filter_waypoints(
vehicle_location,
self.current_waypoint_idx,
self.maneuverable_waypoints
)
# Pick a waypoint ahead of the car
min_cost = np.inf
best_idx = 0
for i, wp in enumerate(self.maneuverable_waypoints):
vec_to_wp = wp.location[:2] - vehicle_location[:2]
if np.dot(vec_to_wp, vehicle_heading) > 0: # must be in front
cost = np.linalg.norm(vec_to_wp)
if cost < min_cost:
min_cost = cost
best_idx = i

# 🔹 Skip ahead to avoid clipping first corner
self.current_waypoint_idx = (best_idx + 10) % len(self.maneuverable_waypoints)
self._startup_counter = 50 # ~50 steps (~2–3 seconds) of cautious driving

async def step(
self
) -> None:
"""
This function is called every world step.
Note: You should not call receive_observation() on any sensor here, instead use get_last_observation() to get the last received observation.
You can do whatever you want here, including apply_action() to the vehicle.
"""
# TODO: Implement your solution here.

# Receive location, rotation and velocity data
async def step(self) -> None:
vehicle_location = self.location_sensor.get_last_gym_observation()
vehicle_rotation = self.rpy_sensor.get_last_gym_observation()
vehicle_velocity = self.velocity_sensor.get_last_gym_observation()
vehicle_velocity_norm = np.linalg.norm(vehicle_velocity)

# Find the waypoint closest to the vehicle
v = float(np.linalg.norm(vehicle_velocity))
yaw = float(vehicle_rotation[2])
pos_xy = vehicle_location[:2]

if self._last_yaw is None:
self._last_yaw = yaw

# Update waypoint index
self.current_waypoint_idx = filter_waypoints(
vehicle_location,
self.current_waypoint_idx,
self.maneuverable_waypoints
)
# We use the 3rd waypoint ahead of the current waypoint as the target waypoint
waypoint_to_follow = self.maneuverable_waypoints[(self.current_waypoint_idx + 3) % len(self.maneuverable_waypoints)]

# Calculate delta vector towards the target waypoint
vector_to_waypoint = (waypoint_to_follow.location - vehicle_location)[:2]
heading_to_waypoint = np.arctan2(vector_to_waypoint[1],vector_to_waypoint[0])
# --- PARAMETERS FOR SAFE DRIVING ---
num_fit_points = 15
lookahead_distance = 12.0
max_speed = 20.0
min_speed = 5.0

# Gather waypoints ahead
wp_xy = np.array([
self.maneuverable_waypoints[(self.current_waypoint_idx + i) % len(self.maneuverable_waypoints)].location[:2]
for i in range(num_fit_points)
])

rot = np.array([[np.cos(-yaw), -np.sin(-yaw)], [np.sin(-yaw), np.cos(-yaw)]])
local_wp = (wp_xy - pos_xy) @ rot.T

# Ensure most waypoints are in front
forward_points = [pt for pt in local_wp if pt[0] > 1.0]
if len(forward_points) < len(local_wp) // 2:
print("Too many waypoints behind car — braking to re-center")
await self.vehicle.apply_action({
"throttle": 0.0, "steer": 0.0, "brake": 1.0,
"hand_brake": 0.0, "reverse": 0, "target_gear": 0
})
return

# Fit curve
x = local_wp[:, 0]
y = local_wp[:, 1]
coeffs = np.polyfit(x, y, 2)

# Target farther point
x_target = lookahead_distance
y_target = np.polyval(coeffs, x_target)

# Steering
steer_angle = np.arctan2(2.0 * y_target, lookahead_distance)
raw_steer = float(np.clip(steer_angle / 0.7, -1.0, 1.0))
steer_control = (1 - self._steer_alpha) * self._prev_steer + self._steer_alpha * raw_steer
self._prev_steer = steer_control

# --- SPEED CONTROL ---
curvature = abs(coeffs[0])
target_speed = max(min_speed, max_speed * (1.0 - 12.0 * curvature))
target_speed = np.clip(target_speed, min_speed, max_speed)

# Calculate delta angle towards the target waypoint
delta_heading = normalize_rad(heading_to_waypoint - vehicle_rotation[2])
# During startup, keep speed capped
if self._startup_counter > 0:
self._startup_counter -= 1
target_speed = min(target_speed, 8.0)

# Proportional controller to steer the vehicle towards the target waypoint
steer_control = (
-8.0 / np.sqrt(vehicle_velocity_norm) * delta_heading / np.pi
) if vehicle_velocity_norm > 1e-2 else -np.sign(delta_heading)
steer_control = np.clip(steer_control, -1.0, 1.0)
speed_error = target_speed - v
accel = 0.1 * speed_error
throttle = np.clip(accel, 0.0, 1.0)
brake = np.clip(-accel, 0.0, 1.0)

# Proportional controller to control the vehicle's speed towards 40 m/s
throttle_control = 0.05 * (20 - vehicle_velocity_norm)
# --- EMERGENCY COLLISION STOP ---
if self.collision_sensor and self.collision_sensor.get_last_gym_observation():
print("Collision detected — emergency stop")
throttle, brake = 0.0, 1.0

control = {
"throttle": np.clip(throttle_control, 0.0, 1.0),
"throttle": throttle,
"steer": steer_control,
"brake": np.clip(-throttle_control, 0.0, 1.0),
"brake": brake,
"hand_brake": 0.0,
"reverse": 0,
"target_gear": 0
Expand Down