-
Notifications
You must be signed in to change notification settings - Fork 0
/
CA.py
135 lines (115 loc) · 4.92 KB
/
CA.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
from numpy.random import choice
from floorfield import *
from plotting import *
from IO import save_floor_field
class CA:
""" Simulator for waiting pedestrians with a CA.
:param static_ff: The underlying static floor field for all pedestrians
:param simulation_parameters: The used simulation parameters (see class:SimulationParameters)
"""
static_ff = None
simulation_parameters: SimulationParameters
def __init__(self, simulation_parameters: SimulationParameters, geometry: Geometry, grid: Grid):
"""
Init the CA.
:param simulation_parameters: simulation parameters to use
:param geometry: geometry to use
:param grid: grid to use
"""
self.simulation_parameters = simulation_parameters
self.static_ff = compute_static_ff(geometry, grid, self.simulation_parameters)
def compute_step(self, geometry: Geometry, grid: Grid):
"""
Computes one time step of the simulations.
:param geometry: geometry to use
:param grid: grid to use
"""
next_step = {}
prob_next_step = {}
for ped_id, ped in geometry.pedestrians.items():
if ped.standing:
next_step[ped_id] = [Neighbors.self]
else:
individual_ff = compute_individual_ff(geometry, grid, ped, self.simulation_parameters)
combined = compute_overall_ff(geometry, grid, self.static_ff[ped.exit_id], individual_ff)
prob_neighbor = compute_prob_neighbors(geometry, grid, ped, combined, self.simulation_parameters.w_direction)
step = self.compute_next_step(prob_neighbor)
next_step[ped_id] = step
prob_next_step[ped_id] = prob_neighbor[step]
self.apply_step(geometry, grid, next_step, prob_next_step)
@staticmethod
def compute_next_step(prob):
"""
Chooses the next step based on *prob*.
:param prob: Probabilities for the corresponding neighboring fields
:return: Next step
"""
keys = list(prob.keys())
probs = list(prob.values())
return choice(keys, 1, p=probs)[0]
@staticmethod
def apply_step(geometry: Geometry, grid: Grid, next_step, prob_next_step):
"""
Applies the current step to pedestrians and resolves occurring conflicts.
:param geometry: geometry to use
:param grid: grid to use
:param next_step: next steps of all pedestrians
:param prob_next_step: probabilities of next steps
"""
targets = {}
for key, step in next_step.items():
neighbors = grid.get_neighbors(geometry, geometry.pedestrians[key].pos)
targets[key] = neighbors[step]
conflicts = CA.find_conflicts(targets)
CA.solve_conflicts(geometry, targets, next_step, prob_next_step, conflicts)
for key, target in targets.items():
geometry.pedestrians[key].set_pos(target)
@staticmethod
def solve_conflicts(geometry: Geometry, targets, next_step, prob_next_step, conflicts):
"""
Solves occurring conflicts by using relative probabilities
:param geometry: geometry to use
:param targets: target fields for each pedestrian
:param next_step: targeted field for each pedestrian
:param prob_next_step: probabilities of each targeted field
:param conflicts: conflicting targets
:return:
"""
for conflict in conflicts:
probs = []
for ped_id in conflict:
if next_step[ped_id] == Neighbors.self:
probs.append(1000000)
else:
probs.append(prob_next_step[ped_id])
probs = [x / sum(probs) for x in probs]
c = choice(conflict, 1, p=probs)[0]
for ped_id in conflict:
if ped_id != c:
targets[ped_id] = [geometry.pedestrians[ped_id].i(), geometry.pedestrians[ped_id].j()]
@staticmethod
def find_conflicts(targets):
"""
Finds conflicts the *targets*, checks if two or more pedestrians target the same cell.
:param targets: Targeted cells
:return: Conflicting target cells
"""
targets_list = list(targets.values())
duplicates = []
for key, target in targets.items():
if targets_list.count(target) > 1:
if duplicates.count(target) == 0:
duplicates.append(target)
conflicts = []
for duplicate in duplicates:
indices = [i for i, x in enumerate(targets_list) if x == duplicate]
conflicts.append(indices)
return conflicts
def save(self, output_path):
"""
Saves the underlying static floor field.
:param output_path:
:return:
"""
for id, ff in self.static_ff.items():
save_floor_field(ff, output_path, 'static_ff_{}.txt'.format(id))