77 _edges_file_identity, _edges_repo_identity, _edges_url_identity,
88 _edges_delegations, _edges_soft_ops
99
10- Queue: SQLite (~/.flex/queue.db) table claude_code_pending
1110Cell: resolved via flex.registry
1211
13- Tier 0 (every 2s): Queue → chunk-atom tables + embeddings
12+ Tier 0 (every 2s): stat() scan → sync changed JONLs → embed
1413"""
1514
1615import hashlib
@@ -914,80 +913,62 @@ def sync_session_messages(session_id: str, conn: sqlite3.Connection,
914913 return inserted
915914
916915
917- def process_queue (conn : sqlite3 .Connection ) -> dict :
918- """Process queue: read session_ids, sync each from JSONL."""
919- qconn = sqlite3 .connect (str (QUEUE_DB ), timeout = 5 )
920- qconn .execute ("PRAGMA journal_mode=WAL" )
916+ def scan_sessions (conn : sqlite3 .Connection , size_cache : dict ) -> dict :
917+ """Scan all JONLs by file size, sync only those that grew.
921918
922- rows = qconn .execute (
923- "SELECT session_id FROM claude_code_pending ORDER BY ts LIMIT 100"
924- ).fetchall ()
919+ Replaces both process_queue() and startup_backfill(). No hooks, no queue,
920+ no existence check. Pure stat()-based polling — the Filebeat pattern.
925921
926- if not rows :
927- qconn .close ()
928- return {'processed' : 0 , 'embedded' : 0 }
922+ Args:
923+ conn: Open cell connection.
924+ size_cache: Mutable dict {session_id: last_synced_size}. Persisted in
925+ memory across ticks. On first call, empty dict triggers
926+ full initial scan.
929927
930- embedded = 0
931- session_ids = [r [0 ] for r in rows ]
928+ Returns:
929+ dict with 'synced' (sessions touched) and 'chunks' (new chunks inserted).
930+ """
931+ synced = 0
932+ chunks = 0
932933
933- for session_id in session_ids :
934+ for jsonl in CLAUDE_PROJECTS . rglob ( "*.jsonl" ) :
934935 try :
935- embedded += sync_session_messages (session_id , conn )
936- # Reactive warmup: reclassify after every sync.
937- # Sessions grow as they run — a session classified as warmup
938- # at message 2 may have 200 messages by the next sync.
939- _update_warmup (conn , session_id )
940- except Exception as e :
941- print (f"[worker] Error syncing { session_id [:8 ]} : { e } " , file = sys .stderr )
942-
943- conn .commit ()
944-
945- placeholders = ',' .join ('?' * len (session_ids ))
946- qconn .execute (
947- f"DELETE FROM claude_code_pending WHERE session_id IN ({ placeholders } )" ,
948- session_ids
949- )
950- qconn .commit ()
951- qconn .close ()
952-
953- return {'processed' : len (session_ids ), 'embedded' : embedded }
954-
936+ current_size = jsonl .stat ().st_size
937+ except (FileNotFoundError , OSError ):
938+ continue
955939
956- def startup_backfill ( conn : sqlite3 . Connection , commit_every : int = 50 ):
957- """Sync any sessions on disk that aren't in the cell.
940+ session_id = jsonl . stem
941+ last_size = size_cache . get ( session_id , - 1 )
958942
959- Uses existence check (not mtime) so nothing slips through.
960- Idempotent — sync_session_messages uses INSERT OR IGNORE.
961- """
962- print ("[worker] Running startup backfill..." , file = sys .stderr )
963- already = {r [0 ] for r in conn .execute ("SELECT source_id FROM _raw_sources" )}
943+ if current_size == last_size :
944+ continue # unchanged — skip
964945
965- backfilled = 0
966- sessions_since_commit = 0
967- for jsonl in CLAUDE_PROJECTS .rglob ("*.jsonl" ):
968- if jsonl .stem in already :
969- continue
946+ # New or grown file — sync it
970947 try :
971- count = sync_session_messages (jsonl .stem , conn )
948+ count = sync_session_messages (session_id , conn )
949+ _update_warmup (conn , session_id )
950+ size_cache [session_id ] = current_size
972951 if count > 0 :
973- backfilled += count
974- _update_warmup (conn , jsonl .stem )
975- sessions_since_commit += 1
976- if sessions_since_commit >= commit_every :
977- conn .commit ()
978- print (f"[worker] Backfill progress: { backfilled } chunks" ,
979- file = sys .stderr )
980- sessions_since_commit = 0
952+ synced += 1
953+ chunks += count
981954 except Exception as e :
982- print (f"[worker] backfill skip { jsonl . name } : { e } " , file = sys .stderr )
955+ print (f"[worker] sync error { session_id [: 12 ] } : { e } " , file = sys .stderr )
983956
984- if sessions_since_commit > 0 :
957+ if chunks > 0 :
985958 conn .commit ()
986959
987- if backfilled > 0 :
988- print (f"[worker] Backfilled { backfilled } chunks" , file = sys .stderr )
989- else :
990- print ("[worker] No backfill needed" , file = sys .stderr )
960+ return {'synced' : synced , 'chunks' : chunks }
961+
962+
963+ # Legacy compat — mcp_server._background_indexer imports this
964+ def process_queue (conn : sqlite3 .Connection ) -> dict :
965+ """Legacy queue drain — delegates to scan_sessions."""
966+ stats = scan_sessions (conn , _global_size_cache )
967+ return {'processed' : stats ['synced' ], 'embedded' : stats ['chunks' ]}
968+
969+
970+ # Global size cache for legacy process_queue() callers (mcp_server)
971+ _global_size_cache : dict = {}
991972
992973
993974def bootstrap_claude_code_cell () -> Path :
@@ -1534,13 +1515,9 @@ def daemon_loop(interval=2):
15341515 conn .execute ("PRAGMA synchronous=NORMAL" )
15351516 conn .execute ("PRAGMA busy_timeout=30000" )
15361517
1537- # Ensure queue tables exist — hooks also create these inline, but the
1538- # worker starts before any hook fires, so we must create them here.
1518+ # Ensure docpac queue table exists (claude_code no longer uses queue)
15391519 qconn = sqlite3 .connect (str (QUEUE_DB ), timeout = 5 )
15401520 qconn .execute ("PRAGMA journal_mode=WAL" )
1541- qconn .execute ("""CREATE TABLE IF NOT EXISTS claude_code_pending (
1542- session_id TEXT NOT NULL, ts INTEGER NOT NULL, payload TEXT NOT NULL)""" )
1543- qconn .execute ("CREATE INDEX IF NOT EXISTS idx_claude_code_ts ON claude_code_pending(ts)" )
15441521 qconn .execute ("""CREATE TABLE IF NOT EXISTS pending (
15451522 path TEXT PRIMARY KEY, ts INTEGER)""" )
15461523 qconn .commit ()
@@ -1554,27 +1531,26 @@ def daemon_loop(interval=2):
15541531 if soma_ensure_tables :
15551532 soma_ensure_tables (conn )
15561533
1557- # Startup backfill for sessions missed during outage
1558- startup_backfill (conn )
1559-
15601534 print (" Docpac: incremental indexing enabled" , file = sys .stderr )
15611535
1562- BACKFILL_INTERVAL = 30 # 30s — existence check, not mtime, so it's cheap
15631536 ENRICHMENT_INTERVAL = 30 * 60 # 30 minutes — graph, fingerprints, repo_project
15641537 GRAPH_STALENESS_THRESHOLD = 50 # sessions since last graph build
15651538
1566- last_backfill = time .time ()
15671539 last_soma_heal = time .time ()
15681540 last_enrichment = 0 # run enrichment immediately after first startup
15691541
1542+ # Size cache — empty dict triggers full initial scan on first tick
1543+ size_cache : dict = {}
1544+
15701545 while True :
1546+ # Tier 0: stat() scan — sync any JONLs that grew (replaces queue + backfill)
15711547 try :
1572- stats = process_queue (conn )
1573- if stats ['processed ' ] > 0 :
1574- print (f"[worker] sessions ={ stats ['processed ' ]} emb ={ stats ['embedded ' ]} " ,
1548+ stats = scan_sessions (conn , size_cache )
1549+ if stats ['synced ' ] > 0 :
1550+ print (f"[worker] synced ={ stats ['synced ' ]} chunks ={ stats ['chunks ' ]} " ,
15751551 file = sys .stderr )
15761552 except Exception as e :
1577- print (f"[worker] Error : { e } " , file = sys .stderr )
1553+ print (f"[worker] Scan error : { e } " , file = sys .stderr )
15781554
15791555 # Sweep NULL embeddings every tick — catches interrupted flex init,
15801556 # failed embeds, or any other orphaned chunks. Small batch (64) so
@@ -1597,14 +1573,6 @@ def daemon_loop(interval=2):
15971573 except Exception as e :
15981574 print (f"[docpac] Error: { e } " , file = sys .stderr )
15991575
1600- # Periodic scan — catch any unindexed sessions (30s cycle)
1601- if time .time () - last_backfill > BACKFILL_INTERVAL :
1602- try :
1603- startup_backfill (conn )
1604- last_backfill = time .time ()
1605- except Exception as e :
1606- print (f"[worker] Backfill error: { e } " , file = sys .stderr )
1607-
16081576 # SOMA identity heal (24h cycle)
16091577 if soma_heal and time .time () - last_soma_heal > 24 * 3600 :
16101578 try :
@@ -1623,23 +1591,6 @@ def daemon_loop(interval=2):
16231591 print (f"[worker] Enrichment error: { e } " , file = sys .stderr )
16241592 last_enrichment = time .time () # don't retry immediately
16251593
1626- # Queue depth check
1627- try :
1628- qconn = sqlite3 .connect (str (QUEUE_DB ), timeout = 5.0 )
1629- cc_depth = qconn .execute (
1630- "SELECT COUNT(*) FROM claude_code_pending"
1631- ).fetchone ()[0 ]
1632- dp_depth = qconn .execute (
1633- "SELECT COUNT(*) FROM pending"
1634- ).fetchone ()[0 ]
1635- qconn .close ()
1636- if cc_depth > 500 :
1637- print (f"[worker] WARNING: claude_code queue depth { cc_depth } " , file = sys .stderr )
1638- if dp_depth > 100 :
1639- print (f"[docpac] WARNING: docpac queue depth { dp_depth } " , file = sys .stderr )
1640- except Exception :
1641- pass # queue may not exist yet on first boot
1642-
16431594 time .sleep (interval )
16441595
16451596
0 commit comments