-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstate.py
More file actions
95 lines (76 loc) · 2.73 KB
/
state.py
File metadata and controls
95 lines (76 loc) · 2.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
"""
Thread-safe shared mutable state for STAMP.
All mutable global state lives here so that dependencies are explicit.
Other modules read/write through this module's attributes.
"""
import threading
import time as _time
from cache_manager import DatasetCache
# Background-job tracking
PROGRESS = {} # job_id -> progress record dict
RESULTS = {} # job_id -> completed result payload dict
PROG_LOCK = threading.Lock()
# Dataset cache (disk-backed, LRU, 24-hour TTL, 10 GB cap)
cache = DatasetCache(ttl_hours=24, max_cache_size_gb=10)
# Latest plot objects shared between route handlers
latest_surface_figure = None
latest_heatmap_figure = None
latest_spectrum_video_path = None
last_surface_plot_html = None
last_heatmap_plot_html = None
last_surface_fig_json = None
last_heatmap_fig_json = None
last_custom_bands = []
latest_spectrum_mp4_path = None
# Per-token video temp paths
def _progress_set(job_id, *, percent=None, message=None, status=None,
reset=False, stage=None, processed_integrations=None,
total_integrations=None):
"""Create or update a background-job progress record (thread-safe).
Returns a snapshot copy of the record.
"""
with PROG_LOCK:
rec = PROGRESS.get(job_id)
if reset or rec is None:
rec = {
"status": "running",
"percent": 0.0,
"message": "Starting",
"started_at": _time.time(),
"stage": "queued",
"processed_integrations": 0,
"total_integrations": None,
}
PROGRESS[job_id] = rec
if percent is not None:
p = float(percent)
if status != "done":
p = max(0.0, min(99.0, p))
rec["percent"] = p
if message is not None:
rec["message"] = message
if status is not None:
rec["status"] = status
if stage is not None:
rec["stage"] = stage
if processed_integrations is not None:
rec["processed_integrations"] = int(processed_integrations)
if total_integrations is not None:
rec["total_integrations"] = int(total_integrations)
if status == "done":
_cleanup_old_jobs()
return rec.copy()
_MAX_JOB_AGE = 3600 # 1 hour
def _cleanup_old_jobs():
"""Remove completed/errored jobs older than _MAX_JOB_AGE seconds.
Must be called while PROG_LOCK is held.
"""
now = _time.time()
stale = [
jid for jid, rec in PROGRESS.items()
if rec.get("status") in ("done", "error")
and now - rec.get("started_at", now) > _MAX_JOB_AGE
]
for jid in stale:
PROGRESS.pop(jid, None)
RESULTS.pop(jid, None)