Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pufferlib/config/ocean/drive.ini
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,14 @@ reward_timestep = 0.000025
reward_overspeed = 0.05
reward_ade = 0.0

; --- Logging ---
; EMA coefficient for per-agent log aggregation. Each agent slot tracks a
; smoothed estimate of its completed-episode metrics; on every completion
; slot = alpha*slot + (1 - alpha)*new_episode. alpha=0 collapses to "most
; recent episode only", alpha->1 freezes the slot at its first observation.
; alpha=0.707 has a half-life of ~2 episodes per agent.
log_ema_alpha = 0.707

; --- Map ---
; Path to map used for training
map_dir = "pufferlib/resources/drive/binaries/carla"
Expand Down
18 changes: 17 additions & 1 deletion pufferlib/ocean/drive/binding.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,29 @@ static PyObject *map_cache_live_count_py(
return PyLong_FromLong(live);
}

static void prepare_log(Drive *env);
static PyObject *vec_prepare_log_py(PyObject *self __attribute__((unused)), PyObject *args);

// clang-format off
#define MY_METHODS \
{"map_cache_size", map_cache_size_py, METH_NOARGS, "Map cache slot count."}, \
{"map_cache_live_count", map_cache_live_count_py, METH_NOARGS, "Map cache live count."}
{"map_cache_live_count", map_cache_live_count_py, METH_NOARGS, "Map cache live count."}, \
{"vec_prepare_log", vec_prepare_log_py, METH_VARARGS, "Aggregate per-agent log buffers into each env->log."}
// clang-format on

#include "../env_binding.h"

static PyObject *vec_prepare_log_py(PyObject *self __attribute__((unused)), PyObject *args) {
VecEnv *vec = unpack_vecenv(args);
if (!vec) {
return NULL;
}
for (int i = 0; i < vec->num_envs; i++) {
prepare_log((Drive *) vec->envs[i]);
}
Py_RETURN_NONE;
}

static int my_put(Env *env, PyObject *args, PyObject *kwargs) {
PyObject *obs = PyDict_GetItemString(kwargs, "observations");
if (!PyObject_TypeCheck(obs, &PyArray_Type)) {
Expand Down Expand Up @@ -1979,6 +1994,7 @@ static int my_init(Env *env, PyObject *args, PyObject *kwargs) {
env->reward_randomization = (bool) unpack(kwargs, "reward_randomization");
env->compute_eval_metrics = (bool) unpack(kwargs, "compute_eval_metrics");
env->eval_mode = (int) unpack(kwargs, "eval_mode");
env->log_ema_alpha = (float) unpack(kwargs, "log_ema_alpha");
env->obs_norm_goal_offset_m = (float) unpack(kwargs, "obs_norm_goal_offset_m");
env->obs_norm_xy_offset_m = (float) unpack(kwargs, "obs_norm_xy_offset_m");
env->obs_norm_veh_length_m = (float) unpack(kwargs, "obs_norm_veh_length_m");
Expand Down
156 changes: 104 additions & 52 deletions pufferlib/ocean/drive/drive.h
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,13 @@ struct Drive {
int next_episode_index;
int completed_episodes_count;
CompletedEpisodeSummary completed_episodes[COMPLETED_EPISODE_QUEUE_CAPACITY];

// Per-agent EMA state, one slot per active controllable agent. Allocated
// in init() once active_agent_count is known.
Log *per_agent_log_ema;
int *per_agent_log_count;
int *per_agent_has_data;
float log_ema_alpha;
};

typedef struct {
Expand Down Expand Up @@ -2693,100 +2700,135 @@ static float calculate_puffer_score(Log *log_agent, float duration_steps, float
return log_agent->puffer_score;
}

// Sum each agent's current EMA slot into env->log and set env->log.n to the
// number of slots that have ever been seeded. vec_log divides by aggregate.n
// downstream to produce the cross-agent mean.
static void prepare_log(Drive *env) {
memset(&env->log, 0, sizeof(Log));
int num_keys = sizeof(Log) / sizeof(float);
int num_with_data = 0;
for (int a = 0; a < env->active_agent_count; a++) {
if (!env->per_agent_has_data[a]) {
continue;
}
float *slot = (float *) &env->per_agent_log_ema[a];
float *dst = (float *) &env->log;
for (int j = 0; j < num_keys; j++) {
dst[j] += slot[j];
}
num_with_data++;
}
env->log.n = (float) num_with_data;
}

static void add_log(Drive *env) {
int safe_timestep = (env->timestep > 0) ? env->timestep : 1;
float alpha = env->log_ema_alpha;
float one_minus_alpha = 1.0f - alpha;
int num_log_keys = sizeof(Log) / sizeof(float);
for (int i = 0; i < env->active_agent_count; i++) {
Agent *agent = &env->agents[env->active_agent_indices[i]];
float episode_duration_s = env->logs[i].episode_length * env->dt;
float reference_progress_distance = PUFFER_PROGRESS_REFERENCE_SPEED * episode_duration_s;
reference_progress_distance = fmaxf(reference_progress_distance, 1.0f);
env->logs[i].progress_ratio = agent->distance_since_spawn / reference_progress_distance;

Log episode_log = {0};

int offroad = env->logs[i].offroad_rate;
env->log.offroad_rate += offroad;
episode_log.offroad_rate = offroad;
int collided = env->logs[i].collision_rate;
env->log.collision_rate += collided;
episode_log.collision_rate = collided;
int red_light_violations = env->logs[i].red_light_violation_rate;
env->log.red_light_violation_rate += red_light_violations;
episode_log.red_light_violation_rate = red_light_violations;
int total_infractions = (offroad || collided || red_light_violations) ? 1 : 0;
float avg_speed_per_agent = env->logs[i].avg_speed_per_agent;
env->log.avg_speed_per_agent += avg_speed_per_agent / safe_timestep;
episode_log.avg_speed_per_agent = env->logs[i].avg_speed_per_agent / safe_timestep;
int num_waypoints_reached = env->logs[i].num_waypoints_reached;
env->log.num_waypoints_reached += num_waypoints_reached;
episode_log.num_waypoints_reached = num_waypoints_reached;
int num_goals_reached = env->logs[i].num_goals_reached;
env->log.num_goals_reached += num_goals_reached;
// Score: 1 per agent that reached all 3 target waypoints without
// being removed/stopped. Was hardcoded to >=4, unreachable given
// num_target_waypoints=3 in the ini, so score was always 0.
episode_log.num_goals_reached = num_goals_reached;
// Score: 1 if the agent reached all 3 target waypoints without
// being removed/stopped, else 0. Was hardcoded to >=4, unreachable
// given num_target_waypoints=3 in the ini, so score was always 0.
if (num_goals_reached >= 3 && !agent->removed && !agent->stopped) {
env->log.score += 1.0f;
episode_log.score = 1.0f;
}
if (!offroad && !collided && !red_light_violations && num_waypoints_reached < 1) {
env->log.dnf_rate += 1.0f;
episode_log.dnf_rate = 1.0f;
}
env->log.total_distance_travelled += agent->distance_since_spawn;
episode_log.total_distance_travelled = agent->distance_since_spawn;
if (total_infractions > 0) {
env->log.total_infractions += 1.0f;
episode_log.total_infractions = 1.0f;
}
float displacement_error = env->logs[i].avg_displacement_error;
env->log.avg_displacement_error += displacement_error;
env->log.episode_length += env->logs[i].episode_length;
env->log.episode_return += env->logs[i].episode_return;
episode_log.avg_displacement_error = env->logs[i].avg_displacement_error;
episode_log.episode_length = env->logs[i].episode_length;
episode_log.episode_return = env->logs[i].episode_return;
// Per-component reward sums (mirrors compute_rewards' env->rewards[i]+= sites).
env->log.reward_collision += env->logs[i].reward_collision;
env->log.reward_offroad += env->logs[i].reward_offroad;
env->log.reward_red_light += env->logs[i].reward_red_light;
env->log.reward_goal += env->logs[i].reward_goal;
env->log.reward_lane_align += env->logs[i].reward_lane_align;
env->log.reward_lane_center += env->logs[i].reward_lane_center;
env->log.reward_comfort += env->logs[i].reward_comfort;
env->log.reward_velocity += env->logs[i].reward_velocity;
env->log.reward_timestep += env->logs[i].reward_timestep;
env->log.reward_reverse += env->logs[i].reward_reverse;
env->log.reward_overspeed += env->logs[i].reward_overspeed;
env->log.reward_ade += env->logs[i].reward_ade;
episode_log.reward_collision = env->logs[i].reward_collision;
episode_log.reward_offroad = env->logs[i].reward_offroad;
episode_log.reward_red_light = env->logs[i].reward_red_light;
episode_log.reward_goal = env->logs[i].reward_goal;
episode_log.reward_lane_align = env->logs[i].reward_lane_align;
episode_log.reward_lane_center = env->logs[i].reward_lane_center;
episode_log.reward_comfort = env->logs[i].reward_comfort;
episode_log.reward_velocity = env->logs[i].reward_velocity;
episode_log.reward_timestep = env->logs[i].reward_timestep;
episode_log.reward_reverse = env->logs[i].reward_reverse;
episode_log.reward_overspeed = env->logs[i].reward_overspeed;
episode_log.reward_ade = env->logs[i].reward_ade;
// Comfort and velocity metrics (normalized per timestep)
env->log.comfort_violation_count += env->logs[i].comfort_violation_count / safe_timestep;
env->log.velocity_progress_sum += env->logs[i].velocity_progress_sum / safe_timestep;
episode_log.comfort_violation_count = env->logs[i].comfort_violation_count / safe_timestep;
episode_log.velocity_progress_sum = env->logs[i].velocity_progress_sum / safe_timestep;
// Lane metrics (normalized per timestep for average per episode)
env->log.lane_center_rate += env->logs[i].lane_center_rate / safe_timestep;
env->log.lane_heading_aligned_rate += env->logs[i].lane_heading_aligned_rate / safe_timestep;
episode_log.lane_center_rate = env->logs[i].lane_center_rate / safe_timestep;
episode_log.lane_heading_aligned_rate = env->logs[i].lane_heading_aligned_rate / safe_timestep;
if (env->compute_eval_metrics) {
env->logs[i].progress_ratio = agent->distance_since_spawn / reference_progress_distance;
env->logs[i].comfort_score = calculate_duration_scaled_violation_score(
env->logs[i].comfort_violation_timestep_count,
env->logs[i].episode_length,
env->dt);
calculate_puffer_score(&env->logs[i], env->logs[i].episode_length, env->dt);
env->log.at_fault_collision_rate += env->logs[i].at_fault_collision_rate;
env->log.ttc_within_bound_rate += env->logs[i].ttc_within_bound_rate;
env->log.wrong_way_distance += env->logs[i].wrong_way_distance;
env->log.speed_violation_sum += env->logs[i].speed_violation_sum;
env->log.progress_ratio += env->logs[i].progress_ratio;
env->log.comfort_score += env->logs[i].comfort_score;
env->log.ttc_violations += env->logs[i].ttc_violations;
env->log.ttc_samples += env->logs[i].ttc_samples;
env->log.multi_lane_time += env->logs[i].multi_lane_time;
env->log.multi_lane_score += env->logs[i].multi_lane_score;
episode_log.at_fault_collision_rate = env->logs[i].at_fault_collision_rate;
episode_log.ttc_within_bound_rate = env->logs[i].ttc_within_bound_rate;
episode_log.wrong_way_distance = env->logs[i].wrong_way_distance;
episode_log.speed_violation_sum = env->logs[i].speed_violation_sum;
episode_log.progress_ratio = env->logs[i].progress_ratio;
episode_log.comfort_score = env->logs[i].comfort_score;
episode_log.ttc_violations = env->logs[i].ttc_violations;
episode_log.ttc_samples = env->logs[i].ttc_samples;
episode_log.multi_lane_time = env->logs[i].multi_lane_time;
episode_log.multi_lane_score = env->logs[i].multi_lane_score;

float wrong_dist = env->logs[i].wrong_way_distance;
float direction_score = (wrong_dist <= 2.0f) ? 1.0f : (wrong_dist <= 6.0f) ? 0.5f : 0.0f;
env->log.driving_direction_score += direction_score;
episode_log.driving_direction_score = direction_score;

float T = safe_timestep * env->dt;
float speed_compliance = fmaxf(0.0f, 1.0f - env->logs[i].speed_violation_sum / fmaxf(T, 1e-3f));
env->log.speed_limit_compliance += speed_compliance;
episode_log.speed_limit_compliance = speed_compliance;

float making_progress = (env->logs[i].progress_ratio > 0.2f) ? 1.0f : 0.0f;
env->log.making_progress_rate += making_progress;
env->log.puffer_score += env->logs[i].puffer_score;
episode_log.making_progress_rate = making_progress;
episode_log.puffer_score = env->logs[i].puffer_score;
}

env->log.n += 1;
episode_log.expert_static_car_count = env->expert_static_agent_count;
episode_log.static_car_count = env->static_agent_count;

Log *slot = &env->per_agent_log_ema[i];
if (!env->per_agent_has_data[i]) {
*slot = episode_log;
env->per_agent_has_data[i] = 1;
} else {
float *slot_f = (float *) slot;
float *ep_f = (float *) &episode_log;
for (int j = 0; j < num_log_keys; j++) {
slot_f[j] = alpha * slot_f[j] + one_minus_alpha * ep_f[j];
}
}
env->per_agent_log_count[i] += 1;
}
// Log composition counts per agent so vec_log averaging recovers the per-env value
env->log.expert_static_car_count += env->expert_static_agent_count;
env->log.static_car_count += env->static_agent_count;

if (env->emit_completed_episodes && env->completed_episodes_count < COMPLETED_EPISODE_QUEUE_CAPACITY) {
// Snapshot per-episode aggregates from env->logs[] before c_reset
Expand Down Expand Up @@ -3649,6 +3691,9 @@ void init(Drive *env) {
env->human_agent_idx = 0;
env->timestep = 0;
env->shared_map = NULL;
env->per_agent_log_ema = NULL;
env->per_agent_log_count = NULL;
env->per_agent_has_data = NULL;

struct SharedMapData *shared = env->use_map_cache ? map_cache_lookup(env->map_name) : NULL;
if (shared != NULL) {
Expand Down Expand Up @@ -3698,6 +3743,10 @@ void init(Drive *env) {
env->logs_capacity = 0;
set_active_agents(env);
env->logs_capacity = env->active_agent_count;

env->per_agent_log_ema = (Log *) calloc(env->active_agent_count, sizeof(Log));
env->per_agent_log_count = (int *) calloc(env->active_agent_count, sizeof(int));
env->per_agent_has_data = (int *) calloc(env->active_agent_count, sizeof(int));
if (env->simulation_mode == SIMULATION_REPLAY) {
remove_bad_trajectories(env);
}
Expand Down Expand Up @@ -3812,6 +3861,9 @@ void c_close(Drive *env) {
free(env->tracks_to_predict);
free(env->map_name);
free(env->ini_file);
free(env->per_agent_log_ema);
free(env->per_agent_log_count);
free(env->per_agent_has_data);
}

static int compute_observation_size(Drive *env) {
Expand Down
8 changes: 8 additions & 0 deletions pufferlib/ocean/drive/drive.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def __init__(
init_step=0,
eval_mode=0,
num_eval_scenarios=16,
log_ema_alpha=0.707,
init_mode="create_all_valid",
control_mode="control_vehicles",
map_dir=None,
Expand Down Expand Up @@ -169,6 +170,7 @@ def __init__(
raise ValueError(f"dynamics_model must be 'classic' or 'jerk'. Got: {dynamics_model}")
self.eval_mode = eval_mode
self.num_eval_scenarios = num_eval_scenarios
self.log_ema_alpha = log_ema_alpha
self.termination_mode = termination_mode
self.inactive_agent_threshold = inactive_agent_threshold
self.rng = np.random.default_rng(seed)
Expand Down Expand Up @@ -418,6 +420,7 @@ def _env_init_kwargs(self, map_file, max_agents):
"reward_randomization": self.reward_randomization,
"compute_eval_metrics": self.compute_eval_metrics,
"eval_mode": self.eval_mode,
"log_ema_alpha": self.log_ema_alpha,
"obs_norm_goal_offset_m": self.obs_norm_goal_offset_m,
"obs_norm_xy_offset_m": self.obs_norm_xy_offset_m,
"obs_norm_veh_length_m": self.obs_norm_veh_length_m,
Expand Down Expand Up @@ -477,6 +480,11 @@ def step(self, actions):
self._reset_compact_replay_buffer(env_slot, scenarios_after[env_slot])
info.append(tagged)
if self.tick % self.report_interval == 0:
# Drain per-agent accumulators into each env->log so the shared
# vec_log produces a per-agent population mean (every agent slot
# contributes one term, regardless of how many episodes it
# completed within the window).
binding.vec_prepare_log(self.c_envs)
log = binding.vec_log(self.c_envs, self.num_agents)
if log:
info.append(log)
Expand Down
50 changes: 19 additions & 31 deletions pufferlib/ocean/env_binding.h
Original file line number Diff line number Diff line change
Expand Up @@ -704,40 +704,33 @@ static PyObject *vec_log(PyObject *self, PyObject *args) {

// Iterates over logs one float at a time. Will break
// horribly if Log has non-float data.
PyObject *num_agents_arg = PyTuple_GetItem(args, 1);
float num_agents = (float) PyLong_AsLong(num_agents_arg);
int num_keys = sizeof(Log) / sizeof(float);

Env *env = vec->envs[0];
if (env->eval_mode) {
PyObject *list = PyList_New(vec->num_envs);
PyObject *dict = PyDict_New();

if (env->log.n == 0) {
return dict;
}
PyObject *list = PyList_New(0);

// Got enough data. Reset logs and return metrics
for (int i = 0; i < vec->num_envs; i++) {
PyObject *dict = PyDict_New();
Env *env = vec->envs[i];
float n = env->log.n;
// Average across agents
for (int i = 0; i < num_keys; i++) {
((float *) &env->log)[i] /= n;
Env *env_i = vec->envs[i];
float n = env_i->log.n;
if (n == 0) {
continue;
}
my_log(dict, env, &env->log, n);
for (int j = 0; j < num_keys; j++) {
((float *) &env_i->log)[j] /= n;
}
PyObject *dict = PyDict_New();
my_log(dict, env_i, &env_i->log, n);
assign_to_dict(dict, "n", n);
// Add map_name to dict
if (env->map_name) {
PyObject *s = PyUnicode_FromString(env->map_name);
if (env_i->map_name) {
PyObject *s = PyUnicode_FromString(env_i->map_name);
if (s != NULL) {
PyDict_SetItemString(dict, "map_name", s);
Py_DECREF(s);
}
}

PyList_SetItem(list, i, dict);
PyList_Append(list, dict);
Py_DECREF(dict);
}
// Reset logs to 0 after extracting metrics (prevents accumulation across episodes)
for (int i = 0; i < vec->num_envs; i++) {
Expand All @@ -758,16 +751,11 @@ static PyObject *vec_log(PyObject *self, PyObject *args) {

PyObject *dict = PyDict_New();

// Only log if we have at least num_agents worth of data
Env *env = vec->envs[0];
if (env->eval_mode) {
if (aggregate.n == 0) {
return dict;
}
} else {
if (aggregate.n < num_agents) {
return dict;
}
// aggregate.n is the divisor for every field below; skip the emission
// when no env has contributed any data (n=0 would divide by zero and
// the dict would carry no signal anyway).
if (aggregate.n < 1) {
return dict;
}

// Got enough data. Reset logs and return metrics
Expand Down
Loading
Loading