This script is to extract initial positions from trajectory files and use them to initialize a jupedsim
simulation.
We need the geometry and the trajectory files
geometry = from_wkt(
"POLYGON ((-8.88 -7.63, 8.3 -7.63, 8.3 27.95, -8.88 27.95, -8.88 -7.63), (-3.54 -1.13, -3.57 19.57, -1.52 19.57, -1.37 19.71, -1.37 21.09, -1.52 21.23, -1.67 21.23, -1.67 21.18, -1.545 21.18, -1.4200000000000002 21.065, -1.4200000000000002 19.735, -1.545 19.62, -3.6199999999999997 19.62, -3.59 -1.13, -3.54 -1.13), (3.57 -0.89, 3.64 19.64, 1.47 19.57, 1.32 19.71, 1.32 21.09, 1.47 21.23, 1.62 21.23, 1.62 21.18, 1.4949999999999999 21.18, 1.37 21.065, 1.37 19.735, 1.4949999999999999 19.62, 3.69 19.69, 3.6199999999999997 -0.89, 3.57 -0.89), (0.67 19.57, 0.82 19.71, 0.82 21.09, 0.67 21.23, 0.38 21.23, 0.23 21.09, 0.23 19.71, 0.38 19.57, 0.67 19.57), (-0.42 19.57, -0.27 19.71, -0.27 21.09, -0.42 21.23, -0.72 21.23, -0.87 21.09, -0.87 19.71, -0.72 19.57, -0.42 19.57))"
)
filenames = [
"../trajectories_croma/1C060_cam6_cam5_frameshift0_Combined.txt",
"../trajectories_croma/1C070_cam6_cam5_frameshift0_Combined.txt",
"../trajectories_croma/2C070_cam6_cam5_frameshift0_Combined.txt",
"../trajectories_croma/2C120_cam6_cam5_frameshift0_Combined.txt",
"../trajectories_croma/2C130_cam6_cam5_frameshift0_Combined.txt",
"../trajectories_croma/2C150_cam6_cam5_frameshift0_Combined.txt",
]
print(f"Got {len(filenames)} files.")
Got 6 files.
def read_json_file(file_path):
with open(file_path, 'r') as file:
data = json.load(file)
return data
def get_first_frame_after_max_reached(filename):
"""Return the first frame after the number of pedestrians starts decreasing from max_ids."""
df = pd.read_csv(
filename, sep="\t", names=["id", "frame", "x", "y", "z", "m"], comment="#"
)
frames = np.unique(df["frame"])
ids = np.unique(df["id"])
max_ids = len(ids) - 2 # I don't knnow why 2 are missing
max_reached = False
for frame in range(frames.min(), frames.max() + 1):
frame_data = df[df["frame"] == frame]
count_in_frame = frame_data["id"].nunique()
if count_in_frame >= max_ids:
max_reached = True
return df, frame - 1
# Once max_ids has been reached, check if the number of pedestrians decreases
if max_reached and count_in_frame == max_ids - 1:
return df, frame - 1
return pd.DataFrame(), None
def plot_trajectories_around_frame(
df: pd.DataFrame,
target_frame: int,
num_frames: int = 10,
title: str = "",
geometry: Optional[shapely.Geometry] = None,
inifile: str = 'files/inifile.json'
) -> None:
"""Plot trajectories for num_frames before and after the target frame."""
fig, axes = create_figure()
plot_pedestrian_trajectories(df, target_frame)
plot_walkable_area(geometry)
plot_doors_and_destinations(inifile)
set_plot_properties(title, df)
figname = "plot.png"
plt.savefig(figname)
plt.show()
return figname
This is a minor function.
def filter_dataframe(df: pd.DataFrame, target_frame: int, num_frames: int) -> pd.DataFrame:
"""
Filter the dataframe to include only the frames around the target frame.
Parameters:
df (pd.DataFrame): The input DataFrame containing trajectory data.
target_frame (int): The frame number to center the filter around.
num_frames (int): The number of frames to include before and after the target frame.
Returns:
pd.DataFrame: A filtered DataFrame containing only the frames within the specified range.
"""
start_frame = max(target_frame - num_frames, df["frame"].min())
end_frame = min(target_frame + num_frames, df["frame"].max())
return df[(df["frame"] >= start_frame) & (df["frame"] <= end_frame)]
def set_plot_properties(title: str, filtered_df: pd.DataFrame):
"""Set the properties of the plot."""
plt.xlim(-7, 7)
plt.xlabel("X Position")
plt.ylabel("Y Position")
start_frame, end_frame = filtered_df["frame"].min(), filtered_df["frame"].max()
plt.title(f"{title}. Frame {start_frame} to {end_frame}")
plt.gca().set_aspect('equal')
def create_figure(num_rows=1, num_cols=1, fig_width=5, fig_height=5):
"""
Create and return a figure and axes for plotting.
Parameters:
num_rows (int): Number of rows in the subplot grid. Default is 1.
num_cols (int): Number of columns in the subplot grid. Default is 1.
fig_width (int): Width of the figure in inches. Default is 20.
fig_height (int): Height of the figure in inches. Default is 20.
Returns:
tuple: A tuple containing the figure and axes objects.
"""
return plt.subplots(num_rows, num_cols, figsize=(fig_width, fig_height))
def plot_pedestrian_trajectories(df: pd.DataFrame, target_frame: int, num_frames=10):
"""Plot the trajectories of all pedestrians in the filtered dataframe."""
start_frame = max(target_frame - num_frames, df["frame"].min())
end_frame = min(target_frame + num_frames, df["frame"].max())
filtered_df = df[(df["frame"] >= start_frame) & (df["frame"] <= end_frame)]
ids = np.unique(filtered_df["id"])
for ped in ids:
ped_data = df[df["id"] == ped]
for frame in range(start_frame, end_frame + 1):
color = "red" if frame == target_frame else "gray"
frame_data = ped_data[ped_data["frame"] == frame]
if not frame_data.empty:
plt.plot(
frame_data["x"],
frame_data["y"],
".",
color=color,
label=f"Ped {ped} at frame {frame}",
)
plt.plot(
ped_data["x"],
ped_data["y"],
label=f"Ped {ped}",
color="black",
alpha=0.5,
lw=0.2,
)
def plot_walkable_area(geometry: shapely.Geometry):
"""Plot the walkable area defined by the given geometry."""
walkable_area = pedpy.WalkableArea(geometry)
pedpy.plot_walkable_area(walkable_area=walkable_area)
def plot_doors_and_destinations(inifile: str):
"""Plot doors and destinations from the inifile."""
data = read_json_file(inifile)
plot_doors(data)
plot_destinations(data)
def plot_doors(data: dict):
"""Plot doors from the inifile data."""
motivation_doors = data["motivation_parameters"]["motivation_doors"]
width = data["motivation_parameters"]["width"]
for door in motivation_doors:
vertices = door["vertices"]
x_vals, y_vals = zip(*vertices)
plt.plot(x_vals, y_vals, label=f"Door {door['id']}", color='red', lw=2)
center_x, center_y = np.mean(x_vals), np.mean(y_vals)
plt.plot(center_x, center_y, marker='o', color='b')
circle = plt.Circle((center_x, center_y), width, color='b', alpha=0.3, fill=True)
plt.gca().add_artist(circle)
def plot_destinations(data: dict):
"""Plot destinations from the inifile data."""
for destination in data["destinations"]:
vertices = destination["vertices"]
polygon = plt.Polygon(vertices, closed=True, fill=True, edgecolor='r', facecolor='lightblue', label=f"Exit {destination['id']}")
plt.gca().add_patch(polygon)
centroid = [sum(x)/len(vertices) for x in zip(*vertices)]
plt.text(centroid[0], centroid[1], f"{destination['id']}", ha='center', va='center', fontsize=10, color='black')
def print_agent_info(df: pd.DataFrame, filename: str, frame_after_decrease: Optional[int]) -> None:
"""Print information about the number of agents and the frame after the decrease."""
print(f"Agents: {len(df['id'].unique())}")
print(rf"${filename}$")
print(f"{frame_after_decrease = }")
def extract_title(filename: str) -> str:
"""Extract the title from the filename (before the first underscore)."""
return Path(filename).stem.split("_")[0]
This file will be consumed by the simulation script.
def write_frame_data_to_file(df: pd.DataFrame, frame_after_decrease: int, filename: str) -> None:
"""Write the frame data to a CSV file."""
frame_data: pd.DataFrame = df[df["frame"] == frame_after_decrease]
tuple_list: List[Tuple[float, float]] = [tuple(x) for x in frame_data[['x', 'y']].to_numpy()]
title: str = extract_title(filename)
output_filename: str = f"../trajectories_croma/{title}_frame_{frame_after_decrease}.csv"
with open(output_filename, 'w') as f:
writer: csv.writer = csv.writer(f)
writer.writerows(tuple_list)
print(f"Data written to {output_filename}")
def process_file(filename: str, geometry: shapely.geometry.Polygon) -> None:
"""Process the file and handle data extraction, plotting, and file output."""
df: pd.DataFrame
frame_after_decrease: Optional[int]
df, frame_after_decrease = get_first_frame_after_max_reached(filename)
print_agent_info(df, filename, frame_after_decrease)
if frame_after_decrease is not None:
write_frame_data_to_file(df, frame_after_decrease, filename)
figname = plot_trajectories_around_frame(
df, frame_after_decrease, num_frames=10, title=extract_title(filename), geometry=geometry
)
else:
print("No valid frame found for visualization.")
return figname