-
Notifications
You must be signed in to change notification settings - Fork 0
/
trajectory.py
72 lines (60 loc) · 2.41 KB
/
trajectory.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
from dataclasses import dataclass
from typing import Dict
import pandas as pd
from pedestrian import Pedestrian
from grid import Grid
import numpy as np
import os
@dataclass
class Trajectory:
"""
Class containing the trajectory data
"""
traj = pd.DataFrame(columns=['step', 'id', 'x', 'y', 'exit'])
space_usage = None
def __init__(self, grid: Grid, num_steps: int):
"""
Constructor
:param grid: grid to use
:param num_steps: number of compute steps
"""
shape = (num_steps, grid.gridX.shape[0], grid.gridX.shape[1])
self.space_usage = np.zeros(shape)
def add_step(self, step: int, grid: Grid, peds: Dict[int, Pedestrian], output_path):
"""
Append trajectory data.
:param step: Current time step
:param grid: Grid to use
:param peds: Pedestrians in simulation
:param output_path: output_path
"""
step_frame = pd.DataFrame(columns=['step', 'id', 'x', 'y', 'exit'])
for key, ped in peds.items():
# add trajectory
x, y = grid.get_coordinates(ped.pos[0], ped.pos[1])
step_frame_ped = {'step': step, 'id': key, 'x': x, 'y': y, 'exit': ped.exit_id}
step_frame = step_frame.append(step_frame_ped, ignore_index=True)
# update space usage
if step != 0:
self.space_usage[step][ped.pos[0]][ped.pos[1]] = self.space_usage[step - 1][ped.pos[0]][ped.pos[1]] + 1
else:
self.space_usage[step][ped.pos[0]][ped.pos[1]] = self.space_usage[0][ped.pos[0]][ped.pos[1]] + 1
traj_filename = os.path.join(output_path, 'traj.csv')
if step == 0:
with open(traj_filename, 'w') as f:
step_frame.to_csv(f, header=True, index=False)
else:
with open(traj_filename, 'a') as f:
step_frame.to_csv(f, header=False, index=False)
self.traj = self.traj.append(step_frame, ignore_index=True)
def save(self, output_path):
"""
Saves the current state of the trajectory
:param output_path: output directory
"""
traj_filename = os.path.join(output_path, 'traj.csv')
self.traj.to_csv(traj_filename)
step = self.space_usage.shape[0] - 1
suffix = 'space_usage.txt'
su_filename = os.path.join(output_path, suffix)
np.savetxt(su_filename, self.space_usage[step])