Skip to content

Commit

Permalink
Update simulation with new variation mechanism
Browse files Browse the repository at this point in the history
1. Generate variations
2. than run simulation

use --help for help
  • Loading branch information
chraibi committed Oct 28, 2024
1 parent 356c089 commit 9840f3e
Show file tree
Hide file tree
Showing 2 changed files with 280 additions and 38 deletions.
133 changes: 133 additions & 0 deletions generage_variations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import json
import logging
import pathlib
from typing import Any, Dict, List, Union
import typer
from datetime import datetime

def get_nested_value(config: Dict, param_path: str) -> Any:
"""Get value from nested dictionary using dot notation."""
keys = param_path.split('/')
current = config
for key in keys:
current = current[key]
return current

def set_nested_value(config: Dict, param_path: str, value: Any) -> None:
"""Set value in nested dictionary using dot notation."""
keys = param_path.split('/')
current = config
for key in keys[:-1]:
current = current[key]
current[keys[-1]] = value

def create_parameter_variations(
base_config: Dict,
param_path: str,
values: List[Union[float, int, str]],
description: str = ""
) -> List[Dict]:
"""
Create variations of the configuration by varying a single parameter.
Args:
base_config: The base configuration dictionary
param_path: Path to the parameter using '/' notation (e.g., 'motivation_parameters/height')
values: List of values to try for this parameter
description: Optional description of what these variations represent
Returns:
List of variation dictionaries
"""
original_value = get_nested_value(base_config, param_path)
variations = []

for value in values:
variation = {
"name": f"{param_path.replace('/', '_')}_{value}",
"description": description,
"original_value": original_value,
"parameters": {
param_path: value
}
}
variations.append(variation)

return variations

def save_variations(variations: List[Dict], output_path: pathlib.Path) -> None:
"""Save variations to a JSON file with proper formatting."""
with open(output_path, "w", encoding="utf8") as f:
json.dump(variations, f, indent=4)

def main(
inifile: pathlib.Path = typer.Option(
pathlib.Path("files/inifile.json"),
help="Path to the initial configuration file",
),
param: str = typer.Option(
None,
help="Parameter to vary (using '/' notation, e.g., 'motivation_parameters/height')",
),
values: str = typer.Option(
None,
help="Comma-separated list of values to try (e.g., '0.5,1.0,1.5')",
),
description: str = typer.Option(
"",
help="Optional description of these variations",
),
output: pathlib.Path = typer.Option(
None,
help="Output path for variations file (default: variations_PARAMETER_TIMESTAMP.json)",
),
) -> None:
"""Generate parameter variations for simulation runs."""
if not param or not values:
print("Usage example:")
print(" python generate_variations.py --inifile files/inifile.json \\\n"
" --param motivation_parameters/height \\\n"
" --values 0.5,1.0,1.5 \\\n"
" --description 'Testing different heights'")
return

# Load base configuration
with open(inifile, "r", encoding="utf8") as f:
base_config = json.load(f)

# Parse values (handle both float and int)
try:
parsed_values = []
for v in values.split(','):
v = v.strip()
if '.' in v:
parsed_values.append(float(v))
else:
parsed_values.append(int(v))
except ValueError as e:
print(f"Error parsing values: {e}")
return

# Generate variations
variations = create_parameter_variations(
base_config,
param,
parsed_values,
description
)

# Determine output path
if output is None:
param_name = param.replace('/', '_')
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output = pathlib.Path(f"variations_{param_name}_{timestamp}.json")

# Save variations
save_variations(variations, output)
print(f"Created {len(variations)} variations in {output}")
print("\nVariations summary:")
for var in variations:
print(f"- {var['name']}: {var['parameters'][param]}")

if __name__ == "__main__":
typer.run(main)
185 changes: 147 additions & 38 deletions simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

# Copyright © 2012-2022 Forschungszentrum Jülich GmbH
# SPDX-License-Identifier: LGPL-3.0-or-later

from datetime import datetime
import _io
import contextlib
import csv
Expand Down Expand Up @@ -231,7 +233,9 @@ def process_agent(
motivation_i = motivation_model.motivation_strategy.motivation(params)
agent_value = motivation_model.motivation_strategy.get_value(agent_id=agent.id)
if motivation_i > 1:
logging.error(f"{simulation.iteration_count()}: {agent.id}: {motivation_i = }")
logging.error(
f"Motivation too high. Count: {simulation.iteration_count()}. Agent: {agent.id}. Motivation: {motivation_i = }"
)

v_0, time_gap = motivation_model.calculate_motivation_state(motivation_i, agent.id)

Expand Down Expand Up @@ -296,7 +300,7 @@ def run_simulation_loop(
while (
simulation.elapsed_time() < simulation_time and simulation.agent_count() > 0
):
print(f"time: {simulation.elapsed_time():.2f}", end="\r")
print(f"Elapsed time: {simulation.elapsed_time():.2f}", end="\r")

if simulation.iteration_count() % every_nth_frame == 0:
for agent in simulation.agents():
Expand Down Expand Up @@ -441,7 +445,7 @@ def init_and_run_simulation(
motivation_file = _trajectory_path.with_name(
_trajectory_path.stem + "_motivation.csv"
)
logging.info(f"{motivation_file.stem = }")
logging.info(f"Motivation file: {motivation_file}")
simulation = init_simulation(
_data, _time_step, _fps, _trajectory_path, from_file=True
)
Expand All @@ -462,7 +466,6 @@ def init_and_run_simulation(
y_door = 0.5 * (motivation_model.door_point1[1] + motivation_model.door_point2[1])
motivation_door: Point = (x_door, y_door)
logging.info(f"Running simulation for {len(ped_ids)} agents:")
logging.info(f"{motivation_model.motivation_strategy.width = }")
start_time = time.time()
run_simulation_loop(
simulation=simulation,
Expand All @@ -483,7 +486,7 @@ def init_and_run_simulation(
logging.info(
f"Simulation completed after {simulation.iteration_count()} iterations"
)
logging.info(f"simulation time: {simulation.iteration_count()*_time_step:.2f} [s]")
logging.info(f"Simulation time: {simulation.iteration_count()*_time_step:.2f} [s]")
# logging.info(f"Trajectory: {_trajectory_path}")
return float(simulation.iteration_count() * _time_step)

Expand All @@ -509,56 +512,162 @@ def start_simulation(config_path: str, output_path: str) -> float:
return evac_time


def load_variations(variations_path: pathlib.Path) -> List[Dict]:
"""Load parameter variations from a JSON file."""
if not variations_path.exists():
raise FileNotFoundError(f"Variations file not found: {variations_path}")

with open(variations_path, "r", encoding="utf8") as f:
variations = json.load(f)

# Validate variations format
for var in variations:
if "parameters" not in var:
raise ValueError(f"Missing 'parameters' in variation: {var}")

return variations


def modify_and_save_config(
base_config: float, modification_dict: Dict[str, Any], new_config_path: str
base_config: Dict, variation: Dict[str, any], output_path: pathlib.Path
) -> None:
"""Modify base configuration and save as a new JSON file."""
config = json.loads(json.dumps(base_config)) # Deep copy
for key, value in modification_dict.items():
nested_keys = key.split("/")
last_key = nested_keys.pop()
temp = config
for nk in nested_keys:
temp = temp[nk]
temp[last_key] = value
with open(new_config_path, "w", encoding="utf8") as f:
json.dump(config, f, ensure_ascii=False, indent=4)
"""
Modify base configuration with variation parameters and save to new file.
Args:
base_config: Original configuration dictionary
variation: Dictionary with parameter paths and their new values
output_path: Where to save the modified configuration
"""
# Create a deep copy of the base config
new_config = json.loads(json.dumps(base_config))

# Apply all parameter changes
for param_path, value in variation.items():
keys = param_path.split("/")
current = new_config
for key in keys[:-1]:
current = current[key]
current[keys[-1]] = value

# Save the modified configuration
with open(output_path, "w", encoding="utf8") as f:
json.dump(new_config, f, indent=4)


def main(
inifile: pathlib.Path = typer.Option(
pathlib.Path("files/inifile.json"),
help="Path to the initial configuration file",
),
variations_file: pathlib.Path = typer.Option(
..., # Make this required
help="Path to the variations file",
),
output_dir: pathlib.Path = typer.Option(
pathlib.Path("files/variations"),
help="Directory for output files",
),
) -> None:
"""Implement Main function. Create variations and start simulations."""
"""Run simulations with parameter variations."""
init_logger()
logging.info(f"Base config = {inifile}")
with open(inifile, "r", encoding="utf8") as f:
base_config = json.load(f)

variations = [
{"motivation_parameters/width": 5.0, "motivation_parameters/seed": 10000.0},
{"motivation_parameters/width": 2.0, "motivation_parameters/seed": 300.0},
]

output_dir = pathlib.Path("files/variations")
# Load base configuration
logging.info(f"Loading base configuration from {inifile}")
try:
with open(inifile, "r", encoding="utf8") as f:
base_config = json.load(f)
except FileNotFoundError:
logging.error(f"Base configuration file not found: {inifile}")
raise typer.Exit(code=1)

# Load variations
logging.info(f"Loading variations from {variations_file}")
try:
variations = load_variations(variations_file)
except (FileNotFoundError, json.JSONDecodeError, ValueError) as e:
logging.error(f"Error loading variations: {e}")
raise typer.Exit(code=1)

# Create output directory
output_dir.mkdir(parents=True, exist_ok=True)

variations_file = output_dir / "variations.json"
with open(variations_file, "w") as f:
json.dump(variations, f, indent=4)
# Save a copy of the variations used for this run
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
run_info = {
"timestamp": timestamp,
"base_config": str(inifile),
"variations_file": str(variations_file),
"variations": variations,
}

run_info_file = output_dir / f"run_info_{timestamp}.json"
with open(run_info_file, "w") as f:
json.dump(run_info, f, indent=4)

# Run simulations for each variation
results = []
total_variations = len(variations)

for i, variation in enumerate(variations, start=1):
logging.info(f"running variation {i:03d}: {variation}")
new_config_path = f"{output_dir}/{inifile.stem}_variation_{i:03d}.json"
output_path = f"files/{inifile.stem}_variation_{i:03d}.sqlite"
logging.info(f"{output_path = }")
# Modify and save the new configuration
modify_and_save_config(base_config, variation, new_config_path)
var_name = variation.get("name", f"variation_{i:03d}")
var_desc = variation.get("description", "")

logging.info(f"\nRunning variation {i}/{total_variations}: {var_name}")
if var_desc:
logging.info(f"Description: {var_desc}")

evac_time = start_simulation(new_config_path, output_path)
logging.info(f"Variation {i:03d}: {evac_time = }")
# Log parameter changes
for param, value in variation["parameters"].items():
original = variation.get("original_value", "unknown")
logging.info(f" >> {param}: {original} -> {value}")

# Create variation-specific filenames
new_config_path = output_dir / f"{inifile.stem}_{var_name}.json"
output_path = output_dir / f"{inifile.stem}_{var_name}.sqlite"

# Modify and save the new configuration
modify_and_save_config(base_config, variation["parameters"], new_config_path)

# Run simulation
try:
evac_time = start_simulation(new_config_path, output_path)
status = "completed"
except Exception as e:
logging.error(f"Error in simulation: {e}")
evac_time = None
status = "failed"

# Store results
result = {
"variation_name": var_name,
"description": var_desc,
"parameters": variation["parameters"],
"evac_time": evac_time,
"status": status,
"config_file": str(new_config_path),
"output_file": str(output_path),
}
results.append(result)

logging.info(f"Status: {status}")
if evac_time is not None:
logging.info(f"Evacuation time: {evac_time:.2f} [s]")

# Save all results
results_file = output_dir / f"results_{timestamp}.json"
with open(results_file, "w") as f:
json.dump(results, f, indent=4)

logging.info(f"\nSimulation batch completed. Results saved to {results_file}")

# Print summary
completed = sum(1 for r in results if r["status"] == "completed")
failed = sum(1 for r in results if r["status"] == "failed")
logging.info("\nSummary:")
logging.info(f"Total variations: {total_variations}")
logging.info(f"Completed: {completed}")
logging.info(f"Failed: {failed}")


if __name__ == "__main__":
Expand Down

0 comments on commit 9840f3e

Please sign in to comment.