Skip to content

Commit

Permalink
Init Value based on positions
Browse files Browse the repository at this point in the history
Agents near motivation door are more likely to have higher Value
  • Loading branch information
chraibi committed Oct 24, 2024
1 parent 03c3822 commit be8e8b8
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 9 deletions.
18 changes: 12 additions & 6 deletions simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ def profile_function(name: str) -> Iterator[None]:


def init_motivation_model(
_data: Dict[str, Any], ped_ids: List[int]
_data: Dict[str, Any],
ped_ids: List[int],
ped_positions: List[Point],
) -> mm.MotivationModel:
"""Init motivation model based on parsed strategy."""
width = _data["motivation_parameters"]["width"]
Expand All @@ -73,6 +75,12 @@ def init_motivation_model(
if not motivation_doors:
logging.info("json file does not contain any motivation door.")

door_point1 = (motivation_doors[0][0][0], motivation_doors[0][0][1])
door_point2 = (motivation_doors[0][1][0], motivation_doors[0][1][1])
x_door = 0.5 * (door_point1[0] + door_point2[0])
y_door = 0.5 * (door_point1[1] + door_point2[1])
motivation_door_center: Point = (x_door, y_door)

normal_v_0 = parse_normal_v_0(_data)
normal_time_gap = parse_normal_time_gap(_data)
choose_motivation_strategy = parse_motivation_strategy(_data)
Expand Down Expand Up @@ -100,6 +108,8 @@ def init_motivation_model(
number_high_value=int(_data["motivation_parameters"]["number_high_value"]),
nagents=number_agents,
agent_ids=ped_ids,
agent_positions=ped_positions,
motivation_door_center=motivation_door_center,
competition_decay_reward=competition_decay_reward,
competition_max=competition_max,
percent=percent,
Expand Down Expand Up @@ -409,10 +419,6 @@ def get_agent_positions(_data: Dict[str, Any]) -> Tuple[List[Point], int]:
return positions, num_agents


def print_hello(msg):
print(f"{msg}")


def init_and_run_simulation(
_fps: int,
_time_step: float,
Expand Down Expand Up @@ -448,7 +454,7 @@ def init_and_run_simulation(
positions=positions,
exit_positions=exit_positions,
)
motivation_model = init_motivation_model(_data, ped_ids)
motivation_model = init_motivation_model(_data, ped_ids, positions)
x_door = 0.5 * (motivation_model.door_point1[0] + motivation_model.door_point2[0])
y_door = 0.5 * (motivation_model.door_point1[1] + motivation_model.door_point2[1])
motivation_door: Point = (x_door, y_door)
Expand Down
54 changes: 51 additions & 3 deletions src/motivation_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from matplotlib.figure import Figure

from .logger_config import log_debug
import math

Point: TypeAlias = Tuple[float, float]

Expand Down Expand Up @@ -91,7 +92,9 @@ class EVCStrategy(MotivationStrategy):
"""Motivation theory based on E.V.C (model4)."""

agent_ids: List[int] = field(default_factory=list)
agent_positions: List[Point] = (field(default_factory=list),)
pedestrian_value: Dict[int, float] = field(default_factory=dict)
motivation_door_center: Point = tuple()
width: float = 1.0
height: float = 1.0
max_reward: int = 0
Expand All @@ -107,6 +110,30 @@ class EVCStrategy(MotivationStrategy):
nagents: int = 10
evc: bool = True
alpha = 1.0
spatial_bias: float = 5.0 # To account for absolute distances
distance_scale: float = 2.0 # Scale factor for distance normalization

def calculate_exit_distance(self, pos: Point) -> float:
"""Calculate distance from a position to the door center.
Returns absolute distance in spatial units."""
dx = pos[0] - self.motivation_door_center[0]
dy = pos[1] - self.motivation_door_center[1]
return math.sqrt(dx * dx + dy * dy)

def get_high_value_probability(self, pos: Point) -> float:
"""Calculate probability of being high value based on position.
Returns higher probability for positions closer to door."""
distance = self.calculate_exit_distance(pos)
# Convert distance to probability using exponential decay
# Scale the distance to control the rate of probability decay
probability = math.exp(-self.spatial_bias * (distance / self.distance_scale))
return probability

@staticmethod
def get_derived_seed(base_seed: int, operation_id: int) -> int:
"""Create a new seed based on the base seed and operation type."""
return base_seed * 1000 + operation_id

def __post_init__(self) -> None:
"""Initialize array pedestrian_value with random values in min max interval."""
Expand All @@ -119,17 +146,38 @@ def __post_init__(self) -> None:
)
self.number_high_value = self.nagents

high_value_agents = set(random.sample(self.agent_ids, self.number_high_value))
# Calculate probabilities for each agent based on position
agent_probabilities = [
(agent_id, self.get_high_value_probability(pos))
for agent_id, pos in zip(self.agent_ids, self.agent_positions)
]
# Sort agents by their probability of being high value
sorted_agents = sorted(
agent_probabilities,
key=lambda x: x[1]
* (1 + random.uniform(0, 0.2)), # Multiplicative randomness
reverse=True,
)

# Take the top number_high_value agents as high value agents
high_value_agents = set(
agent_id for agent_id, _ in sorted_agents[: self.number_high_value]
)
# high_value_agents = set(random.sample(self.agent_ids, self.number_high_value))
for n in self.agent_ids:
if n in high_value_agents:
# This agent gets a high value
self.pedestrian_value[n] = self.value(
self.min_value_high, self.max_value_high
self.min_value_high,
self.max_value_high,
self.get_derived_seed(self.seed, n),
)
else:
# This agent gets a low value
self.pedestrian_value[n] = self.value(
self.min_value_low, self.max_value_low
self.min_value_low,
self.max_value_low,
self.get_derived_seed(self.seed, n),
)

@staticmethod
Expand Down

0 comments on commit be8e8b8

Please sign in to comment.