Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def create_app(config_class=Config):
# Register simulation process cleanup function (ensure all simulation processes terminate on server shutdown)
from .services.simulation_runner import SimulationRunner
SimulationRunner.register_cleanup()
SimulationRunner.reconnect_orphaned_simulations()
if should_log_startup:
logger.info("Simulation process cleanup function registered")

Expand Down
2 changes: 2 additions & 0 deletions backend/app/api/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,8 @@ def generate_ontology():
})

except Exception as e:
logger.error(f"Ontology generation failed: {e}")
logger.error(traceback.format_exc())
return jsonify({
"success": False,
"error": str(e),
Expand Down
33 changes: 23 additions & 10 deletions backend/app/api/simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1532,40 +1532,44 @@ def start_simulation():

force_restarted = False

# Intelligently handle status: if preparation work is complete, reset status to ready
if state.status != SimulationStatus.READY:
# Check if preparation is complete
is_prepared, prepare_info = _check_simulation_prepared(simulation_id)

if is_prepared:
# Preparation work complete, verify if simulation is not already running
# Check if simulation process is really running
if state.status == SimulationStatus.RUNNING:
# Check if simulation process is really running
run_state = SimulationRunner.get_run_state(simulation_id)
if run_state and run_state.runner_status.value == "running":
# Process is indeed running
if force:
# Force mode:Stop runningSimulation
logger.info(f"Force mode:Stop runningSimulation {simulation_id}")
logger.info(f"Force mode: stopping running simulation {simulation_id}")
try:
SimulationRunner.stop_simulation(simulation_id)
except Exception as e:
logger.warning(f"Warning when stopping simulation: {str(e)}")
else:
# Ensure state.json reflects running status
manager._save_simulation_state(state)
return jsonify({
"success": False,
"error": f"Simulation is running. Please call /stop first or use force=true to force restart."
}), 400

# If force mode,Clean runtime logs
# If force mode, clean runtime logs and Neo4j graph data
if force:
logger.info(f"Force mode: cleaning simulation runtime files for {simulation_id}")
cleanup_result = SimulationRunner.cleanup_simulation_logs(simulation_id)
cleanup_storage = current_app.extensions.get('neo4j_storage')
cleanup_graph_id = state.graph_id
cleanup_result = SimulationRunner.cleanup_simulation_logs(
simulation_id,
storage=cleanup_storage,
graph_id=cleanup_graph_id
)
if not cleanup_result.get("success"):
logger.warning(f"Warning when cleaning logs: {cleanup_result.get('errors')}")
force_restarted = True

# Process does not exist or has ended,Reset status to ready
# Process does not exist or has ended, reset status to ready
logger.info(f"Simulation {simulation_id} preparation complete, resetting status to ready (previous status: {state.status.value})")
state.status = SimulationStatus.READY
manager._save_simulation_state(state)
Expand Down Expand Up @@ -1595,13 +1599,22 @@ def start_simulation():

logger.info(f"Enable knowledge graph memory update: simulation_id={simulation_id}, graph_id={graph_id}")

# Get GraphStorage for graph memory update
storage = None
if enable_graph_memory_update:
storage = current_app.extensions.get('neo4j_storage')
if not storage:
logger.warning("GraphStorage not available, graph memory update will be disabled")
enable_graph_memory_update = False

# Start simulation
run_state = SimulationRunner.start_simulation(
simulation_id=simulation_id,
platform=platform,
max_rounds=max_rounds,
enable_graph_memory_update=enable_graph_memory_update,
graph_id=graph_id
graph_id=graph_id,
storage=storage
)

# Update simulation status
Expand Down
6 changes: 6 additions & 0 deletions backend/app/services/ontology_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,9 @@ def _validate_and_process(self, result: Dict[str, Any]) -> Dict[str, Any]:
if "analysis_summary" not in result:
result["analysis_summary"] = ""

# Filter out malformed entity types (missing 'name' key)
result["entity_types"] = [e for e in result["entity_types"] if "name" in e]

# Validate entity types
for entity in result["entity_types"]:
if "attributes" not in entity:
Expand All @@ -275,6 +278,9 @@ def _validate_and_process(self, result: Dict[str, Any]) -> Dict[str, Any]:
if len(entity.get("description", "")) > 100:
entity["description"] = entity["description"][:97] + "..."

# Filter out malformed edge types (missing 'name' key)
result["edge_types"] = [e for e in result["edge_types"] if "name" in e]

# Validate relationship types
for edge in result["edge_types"]:
if "source_targets" not in edge:
Expand Down
159 changes: 158 additions & 1 deletion backend/app/services/simulation_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,149 @@ class SimulationRunner:
# Graph memory update configuration
_graph_memory_enabled: Dict[str, bool] = {} # simulation_id -> enabled

@classmethod
def reconnect_orphaned_simulations(cls):
"""
On startup, find simulations with runner_status='running' in their
run_state.json whose process is still alive, and start a monitor
thread so run_state.json keeps updating.
"""
if not os.path.exists(cls.RUN_STATE_DIR):
return

for sim_id in os.listdir(cls.RUN_STATE_DIR):
state_file = os.path.join(cls.RUN_STATE_DIR, sim_id, "run_state.json")
if not os.path.exists(state_file):
continue

try:
state = cls._load_run_state(sim_id)
if not state or state.runner_status != RunnerStatus.RUNNING:
continue

pid = state.process_pid
if not pid:
continue

# Check if process is still alive
try:
os.kill(pid, 0)
except (ProcessLookupError, PermissionError):
state.runner_status = RunnerStatus.STOPPED
state.twitter_running = False
state.reddit_running = False
state.completed_at = datetime.now().isoformat()
state.error = "Process died while backend was restarting"
cls._save_run_state(state)
logger.info(f"Marked dead simulation as stopped: {sim_id} (pid {pid})")
continue

if sim_id in cls._monitor_threads:
continue

logger.info(f"Reconnecting to orphaned simulation: {sim_id} (pid {pid})")

cls._run_states[sim_id] = state

# Try to reconnect GraphMemoryUpdater
graph_id = None
state_json = os.path.join(cls.RUN_STATE_DIR, sim_id, "state.json")
if os.path.exists(state_json):
try:
with open(state_json, 'r', encoding='utf-8') as f:
sim_state = json.load(f)
graph_id = sim_state.get("graph_id")
except Exception:
pass

if graph_id:
try:
from ..storage import Neo4jStorage
from ..config import Config
storage = Neo4jStorage(Config.NEO4J_URI, Config.NEO4J_USER, Config.NEO4J_PASSWORD)
GraphMemoryManager.create_updater(sim_id, graph_id, storage)
cls._graph_memory_enabled[sim_id] = True
logger.info(f"Reconnected graph memory updater: {sim_id}, graph_id={graph_id}")
except Exception as e:
logger.warning(f"Could not reconnect graph memory for {sim_id}: {e}")
cls._graph_memory_enabled[sim_id] = False

monitor_thread = threading.Thread(
target=cls._monitor_orphaned_simulation,
args=(sim_id, pid),
daemon=True
)
monitor_thread.start()
cls._monitor_threads[sim_id] = monitor_thread

except Exception as e:
logger.error(f"Failed to reconnect simulation {sim_id}: {e}")

@classmethod
def _monitor_orphaned_simulation(cls, simulation_id: str, pid: int):
"""Monitor an orphaned simulation by reading its action logs and checking PID liveness."""
sim_dir = os.path.join(cls.RUN_STATE_DIR, simulation_id)
twitter_log = os.path.join(sim_dir, "twitter", "actions.jsonl")
reddit_log = os.path.join(sim_dir, "reddit", "actions.jsonl")

state = cls.get_run_state(simulation_id)
if not state:
return

# Start from end of existing logs to avoid re-processing old actions
twitter_position = os.path.getsize(twitter_log) if os.path.exists(twitter_log) else 0
reddit_position = os.path.getsize(reddit_log) if os.path.exists(reddit_log) else 0

def is_alive():
try:
os.kill(pid, 0)
return True
except (ProcessLookupError, PermissionError):
return False

try:
while is_alive():
if os.path.exists(twitter_log):
twitter_position = cls._read_action_log(
twitter_log, twitter_position, state, "twitter"
)
if os.path.exists(reddit_log):
reddit_position = cls._read_action_log(
reddit_log, reddit_position, state, "reddit"
)
cls._save_run_state(state)
time.sleep(2)

# Process ended — final read
if os.path.exists(twitter_log):
cls._read_action_log(twitter_log, twitter_position, state, "twitter")
if os.path.exists(reddit_log):
cls._read_action_log(reddit_log, reddit_position, state, "reddit")

if state.twitter_completed and state.reddit_completed:
state.runner_status = RunnerStatus.COMPLETED
else:
state.runner_status = RunnerStatus.STOPPED
state.twitter_running = False
state.reddit_running = False
state.completed_at = datetime.now().isoformat()
cls._save_run_state(state)
logger.info(f"Orphaned simulation finished: {simulation_id}, status={state.runner_status.value}")

except Exception as e:
logger.error(f"Orphan monitor error for {simulation_id}: {e}")
state.runner_status = RunnerStatus.FAILED
state.error = str(e)
cls._save_run_state(state)
finally:
if cls._graph_memory_enabled.get(simulation_id, False):
try:
GraphMemoryManager.stop_updater(simulation_id)
except Exception:
pass
cls._graph_memory_enabled.pop(simulation_id, None)
cls._monitor_threads.pop(simulation_id, None)

@classmethod
def get_run_state(cls, simulation_id: str) -> Optional[SimulationRunState]:
"""Get run state"""
Expand Down Expand Up @@ -1098,7 +1241,7 @@ def get_agent_stats(cls, simulation_id: str) -> List[Dict[str, Any]]:
return result

@classmethod
def cleanup_simulation_logs(cls, simulation_id: str) -> Dict[str, Any]:
def cleanup_simulation_logs(cls, simulation_id: str, storage=None, graph_id: str = None) -> Dict[str, Any]:
"""
Clean up simulation run logs (for force restart)

Expand All @@ -1112,10 +1255,14 @@ def cleanup_simulation_logs(cls, simulation_id: str) -> Dict[str, Any]:
- reddit_simulation.db (simulation database)
- env_status.json (environment status)

If storage and graph_id are provided, also clears Neo4j graph data.

Note: Does not delete config files (simulation_config.json) and profile files

Args:
simulation_id: Simulation ID
storage: Optional GraphStorage instance for Neo4j cleanup
graph_id: Optional graph ID to clear in Neo4j

Returns:
Cleanup result information
Expand Down Expand Up @@ -1170,6 +1317,16 @@ def cleanup_simulation_logs(cls, simulation_id: str) -> Dict[str, Any]:
if simulation_id in cls._run_states:
del cls._run_states[simulation_id]

# Clean up Neo4j graph data if storage provided
if storage and graph_id:
try:
storage.delete_graph(graph_id)
cleaned_files.append(f"neo4j:graph:{graph_id}")
logger.info(f"Cleared Neo4j graph data: graph_id={graph_id}")
except Exception as e:
errors.append(f"Failed to clear Neo4j graph: {str(e)}")
logger.error(f"Failed to clear Neo4j graph data: {e}")

logger.info(f"Cleanup simulation logs completed: {simulation_id}, deleted files: {cleaned_files}")

return {
Expand Down
2 changes: 1 addition & 1 deletion backend/app/utils/llm_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def __init__(

# Ollama context window size — prevents prompt truncation.
# Read from env OLLAMA_NUM_CTX, default 8192 (Ollama default is only 2048).
self._num_ctx = int(os.environ.get('OLLAMA_NUM_CTX', '8192'))
self._num_ctx = int(os.environ.get('OLLAMA_NUM_CTX', '32768'))

def _is_ollama(self) -> bool:
"""Check if we're talking to an Ollama server."""
Expand Down
2 changes: 1 addition & 1 deletion frontend/src/api/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import axios from 'axios'

// Create axios instance
const service = axios.create({
baseURL: import.meta.env.VITE_API_BASE_URL || 'http://localhost:5001',
baseURL: import.meta.env.VITE_API_BASE_URL || '',
timeout: 300000, // 5 minute timeout (ontology generation may require longer time)
headers: {
'Content-Type': 'application/json'
Expand Down
28 changes: 26 additions & 2 deletions frontend/src/components/Step3Simulation.vue
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ const doStartSimulation = async () => {
const params = {
simulation_id: props.simulationId,
platform: 'parallel',
force: true, // Force restart
force: false,
enable_graph_memory_update: true // Enable dynamic graph update
}

Expand Down Expand Up @@ -684,9 +684,33 @@ watch(() => props.systemLogs?.length, () => {
})
})

onMounted(() => {
onMounted(async () => {
addLog('Step3 Simulation initialization')
if (props.simulationId) {
try {
const res = await getRunStatus(props.simulationId)
if (res.success && res.data) {
const status = res.data.runner_status
if (status === 'running') {
addLog('Simulation already running — resuming status polling')
runStatus.value = res.data
phase.value = 1
startStatusPolling()
startDetailPolling()
return
}
if (status === 'completed' || status === 'stopped') {
addLog(`Simulation ${status} — loading results`)
runStatus.value = res.data
phase.value = 2
emit('update-status', 'completed')
fetchRunStatusDetail()
return
}
}
} catch (e) {
// No running simulation found, safe to start
}
doStartSimulation()
}
})
Expand Down
6 changes: 6 additions & 0 deletions frontend/src/router/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,19 @@ import SimulationView from '../views/SimulationView.vue'
import SimulationRunView from '../views/SimulationRunView.vue'
import ReportView from '../views/ReportView.vue'
import InteractionView from '../views/InteractionView.vue'
import DashboardView from '../views/DashboardView.vue'

const routes = [
{
path: '/',
name: 'Home',
component: Home
},
{
path: '/dashboard',
name: 'Dashboard',
component: DashboardView
},
{
path: '/process/:projectId',
name: 'Process',
Expand Down
Loading