diff --git a/openviking/service/resource_service.py b/openviking/service/resource_service.py index 0447a784a..54b2d52f0 100644 --- a/openviking/service/resource_service.py +++ b/openviking/service/resource_service.py @@ -502,6 +502,7 @@ async def wait_processed(self, timeout: Optional[float] = None) -> Dict[str, Any return { name: { "processed": s.processed, + "requeue_count": getattr(s, "requeue_count", 0), "error_count": s.error_count, "errors": [{"message": e.message} for e in s.errors], } diff --git a/openviking/storage/collection_schemas.py b/openviking/storage/collection_schemas.py index d79f9092a..f98dd1b88 100644 --- a/openviking/storage/collection_schemas.py +++ b/openviking/storage/collection_schemas.py @@ -39,6 +39,7 @@ @dataclass class RequestQueueStats: processed: int = 0 + requeue_count: int = 0 error_count: int = 0 @@ -207,13 +208,18 @@ def _log_breaker_open_reenqueue_summary(self) -> None: @classmethod def _merge_request_stats( - cls, telemetry_id: str, processed: int = 0, error_count: int = 0 + cls, + telemetry_id: str, + processed: int = 0, + requeue_count: int = 0, + error_count: int = 0, ) -> None: if not telemetry_id: return with cls._request_stats_lock: stats = cls._request_stats_by_telemetry_id.setdefault(telemetry_id, RequestQueueStats()) stats.processed += processed + stats.requeue_count += requeue_count stats.error_count += error_count cls._request_stats_order.append(telemetry_id) if len(cls._request_stats_order) > cls._max_cached_stats: @@ -291,6 +297,14 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str, if wait > 0: await asyncio.sleep(wait) await self._vikingdb.enqueue_embedding_msg(embedding_msg) + self._merge_request_stats( + embedding_msg.telemetry_id, + requeue_count=1, + ) + get_request_wait_tracker().record_embedding_requeue( + embedding_msg.telemetry_id + ) + self.report_requeue() report_success = True return None # No queue manager — cannot re-enqueue, drop with error @@ -345,6 +359,14 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str, if self._vikingdb.has_queue_manager: try: await self._vikingdb.enqueue_embedding_msg(embedding_msg) + self._merge_request_stats( + embedding_msg.telemetry_id, + requeue_count=1, + ) + get_request_wait_tracker().record_embedding_requeue( + embedding_msg.telemetry_id + ) + self.report_requeue() logger.info( f"Re-enqueued embedding message after transient error: {embedding_msg.id}" ) diff --git a/openviking/storage/observers/queue_observer.py b/openviking/storage/observers/queue_observer.py index 0afd840f8..9dfc74cfe 100644 --- a/openviking/storage/observers/queue_observer.py +++ b/openviking/storage/observers/queue_observer.py @@ -59,6 +59,7 @@ def _format_status_as_table( total_pending = 0 total_in_progress = 0 total_processed = 0 + total_requeues = 0 total_errors = 0 for queue_name, status in statuses.items(): @@ -69,6 +70,7 @@ def _format_status_as_table( "Pending": status.pending, "In Progress": status.in_progress, "Processed": status.processed, + "Requeued": status.requeue_count, "Errors": status.error_count, "Total": total, } @@ -76,6 +78,7 @@ def _format_status_as_table( total_pending += status.pending total_in_progress += status.in_progress total_processed += status.processed + total_requeues += status.requeue_count total_errors += status.error_count data.append( @@ -84,6 +87,7 @@ def _format_status_as_table( "Pending": getattr(dag_stats, "pending_nodes", 0) if dag_stats else 0, "In Progress": getattr(dag_stats, "in_progress_nodes", 0) if dag_stats else 0, "Processed": getattr(dag_stats, "done_nodes", 0) if dag_stats else 0, + "Requeued": 0, "Errors": 0, "Total": getattr(dag_stats, "total_nodes", 0) if dag_stats else 0, } @@ -97,6 +101,7 @@ def _format_status_as_table( "Pending": total_pending, "In Progress": total_in_progress, "Processed": total_processed, + "Requeued": total_requeues, "Errors": total_errors, "Total": total_total, } diff --git a/openviking/storage/queuefs/named_queue.py b/openviking/storage/queuefs/named_queue.py index 8a7fd4f40..f672c440f 100644 --- a/openviking/storage/queuefs/named_queue.py +++ b/openviking/storage/queuefs/named_queue.py @@ -31,6 +31,7 @@ class QueueStatus: pending: int = 0 in_progress: int = 0 processed: int = 0 + requeue_count: int = 0 error_count: int = 0 errors: List[QueueError] = field(default_factory=list) @@ -60,15 +61,18 @@ class DequeueHandlerBase(abc.ABC): """Dequeue handler base class, supports callback mechanism to report processing results.""" _success_callback: Optional[Callable[[], None]] = None + _requeue_callback: Optional[Callable[[], None]] = None _error_callback: Optional[Callable[[str, Optional[Dict[str, Any]]], None]] = None def set_callbacks( self, on_success: Callable[[], None], + on_requeue: Callable[[], None], on_error: Callable[[str, Optional[Dict[str, Any]]], None], ) -> None: """Set callback functions.""" self._success_callback = on_success + self._requeue_callback = on_requeue self._error_callback = on_error def report_success(self) -> None: @@ -76,6 +80,11 @@ def report_success(self) -> None: if self._success_callback: self._success_callback() + def report_requeue(self) -> None: + """Report that the current message was re-enqueued for later retry.""" + if self._requeue_callback: + self._requeue_callback() + def report_error(self, error_msg: str, data: Optional[Dict[str, Any]] = None) -> None: """Report processing error.""" if self._error_callback: @@ -113,6 +122,7 @@ def __init__( self._lock = threading.Lock() self._in_progress = 0 self._processed = 0 + self._requeue_count = 0 self._error_count = 0 self._errors: List[QueueError] = [] @@ -120,6 +130,7 @@ def __init__( if self._dequeue_handler: self._dequeue_handler.set_callbacks( on_success=self._on_process_success, + on_requeue=self._on_process_requeue, on_error=self._on_process_error, ) @@ -134,6 +145,11 @@ def _on_process_success(self) -> None: self._in_progress -= 1 self._processed += 1 + def _on_process_requeue(self) -> None: + """Called when a dequeued message is re-enqueued for later retry.""" + with self._lock: + self._requeue_count += 1 + def _on_process_error(self, error_msg: str, data: Optional[Dict[str, Any]] = None) -> None: """Called on processing failure.""" with self._lock: @@ -157,6 +173,7 @@ async def get_status(self) -> QueueStatus: pending=pending, in_progress=self._in_progress, processed=self._processed, + requeue_count=self._requeue_count, error_count=self._error_count, errors=list(self._errors), ) @@ -166,6 +183,7 @@ def reset_status(self) -> None: with self._lock: self._in_progress = 0 self._processed = 0 + self._requeue_count = 0 self._error_count = 0 self._errors = [] diff --git a/openviking/storage/queuefs/semantic_processor.py b/openviking/storage/queuefs/semantic_processor.py index 2a6e2653a..efda9dd43 100644 --- a/openviking/storage/queuefs/semantic_processor.py +++ b/openviking/storage/queuefs/semantic_processor.py @@ -55,6 +55,7 @@ class DiffResult: class RequestQueueStats: processed: int = 0 + requeue_count: int = 0 error_count: int = 0 @@ -124,6 +125,7 @@ def _merge_request_stats( cls, telemetry_id: str, processed: int = 0, + requeue_count: int = 0, error_count: int = 0, ) -> None: if not telemetry_id: @@ -131,6 +133,7 @@ def _merge_request_stats( with cls._stats_lock: stats = cls._request_stats_by_telemetry_id.setdefault(telemetry_id, RequestQueueStats()) stats.processed += processed + stats.requeue_count += requeue_count stats.error_count += error_count cls._request_stats_order.append(telemetry_id) if len(cls._request_stats_order) > cls._max_cached_stats: @@ -258,6 +261,9 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str, f"Circuit breaker is open, re-enqueueing semantic message: {msg.uri}" ) await self._reenqueue_semantic_msg(msg) + self._merge_request_stats(msg.telemetry_id, requeue_count=1) + get_request_wait_tracker().record_semantic_requeue(msg.telemetry_id) + self.report_requeue() self.report_success() return None collector = resolve_telemetry(msg.telemetry_id) @@ -348,6 +354,9 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str, if msg is not None: try: await self._reenqueue_semantic_msg(msg) + self._merge_request_stats(msg.telemetry_id, requeue_count=1) + get_request_wait_tracker().record_semantic_requeue(msg.telemetry_id) + self.report_requeue() except Exception as requeue_err: logger.error(f"Failed to re-enqueue semantic message: {requeue_err}") self._merge_request_stats(msg.telemetry_id, error_count=1) diff --git a/openviking/telemetry/operation.py b/openviking/telemetry/operation.py index 08fdd32f3..713aa0ba5 100644 --- a/openviking/telemetry/operation.py +++ b/openviking/telemetry/operation.py @@ -169,10 +169,12 @@ def build( summary["queue"] = { "semantic": { "processed": cls._i(gauges.get("queue.semantic.processed"), 0), + "requeue_count": cls._i(gauges.get("queue.semantic.requeue_count"), 0), "error_count": cls._i(gauges.get("queue.semantic.error_count"), 0), }, "embedding": { "processed": cls._i(gauges.get("queue.embedding.processed"), 0), + "requeue_count": cls._i(gauges.get("queue.embedding.requeue_count"), 0), "error_count": cls._i(gauges.get("queue.embedding.error_count"), 0), }, } diff --git a/openviking/telemetry/request_wait_tracker.py b/openviking/telemetry/request_wait_tracker.py index 69a122797..b22fc1527 100644 --- a/openviking/telemetry/request_wait_tracker.py +++ b/openviking/telemetry/request_wait_tracker.py @@ -16,9 +16,11 @@ class _RequestWaitState: pending_semantic_roots: Set[str] = field(default_factory=set) pending_embedding_roots: Set[str] = field(default_factory=set) semantic_processed: int = 0 + semantic_requeue_count: int = 0 semantic_error_count: int = 0 semantic_errors: List[str] = field(default_factory=list) embedding_processed: int = 0 + embedding_requeue_count: int = 0 embedding_error_count: int = 0 embedding_errors: List[str] = field(default_factory=list) created_at: float = field(default_factory=time.time) @@ -80,6 +82,15 @@ def record_embedding_processed(self, telemetry_id: str, delta: int = 1) -> None: return state.embedding_processed += max(delta, 0) + def record_embedding_requeue(self, telemetry_id: str, delta: int = 1) -> None: + if not telemetry_id: + return + with self._lock: + state = self._states.get(telemetry_id) + if state is None: + return + state.embedding_requeue_count += max(delta, 0) + def record_embedding_error(self, telemetry_id: str, message: str) -> None: if not telemetry_id: return @@ -106,6 +117,15 @@ def mark_semantic_done( state.pending_semantic_roots.discard(semantic_msg_id) state.semantic_processed += max(processed_delta, 0) + def record_semantic_requeue(self, telemetry_id: str, delta: int = 1) -> None: + if not telemetry_id: + return + with self._lock: + state = self._states.get(telemetry_id) + if state is None: + return + state.semantic_requeue_count += max(delta, 0) + def mark_semantic_failed(self, telemetry_id: str, semantic_msg_id: str, message: str) -> None: if not telemetry_id: return @@ -176,11 +196,13 @@ def build_queue_status(self, telemetry_id: str) -> Dict[str, Dict[str, object]]: return { "Semantic": { "processed": state.semantic_processed, + "requeue_count": state.semantic_requeue_count, "error_count": state.semantic_error_count, "errors": [{"message": msg} for msg in state.semantic_errors], }, "Embedding": { "processed": state.embedding_processed, + "requeue_count": state.embedding_requeue_count, "error_count": state.embedding_error_count, "errors": [{"message": msg} for msg in state.embedding_errors], }, diff --git a/openviking/telemetry/resource_summary.py b/openviking/telemetry/resource_summary.py index a43a996f9..5f090f38d 100644 --- a/openviking/telemetry/resource_summary.py +++ b/openviking/telemetry/resource_summary.py @@ -58,6 +58,7 @@ def build_queue_status_payload(status: Dict[str, Any]) -> Dict[str, Dict[str, An return { name: { "processed": s.processed, + "requeue_count": getattr(s, "requeue_count", 0), "error_count": s.error_count, "errors": [{"message": e.message} for e in s.errors], } @@ -73,17 +74,20 @@ def _resolve_queue_group( if explicit_stats is not None: return { "processed": explicit_stats.processed, + "requeue_count": getattr(explicit_stats, "requeue_count", 0), "error_count": explicit_stats.error_count, } if fallback_status is None: - return {"processed": 0, "error_count": 0} + return {"processed": 0, "requeue_count": 0, "error_count": 0} if isinstance(fallback_status, dict): return { "processed": int(fallback_status.get("processed", 0) or 0), + "requeue_count": int(fallback_status.get("requeue_count", 0) or 0), "error_count": int(fallback_status.get("error_count", 0) or 0), } return { "processed": fallback_status.processed, + "requeue_count": getattr(fallback_status, "requeue_count", 0), "error_count": fallback_status.error_count, } @@ -99,8 +103,8 @@ def record_resource_wait_metrics( telemetry = telemetry or get_current_telemetry() if not telemetry.enabled: return { - "semantic": {"processed": 0, "error_count": 0}, - "embedding": {"processed": 0, "error_count": 0}, + "semantic": {"processed": 0, "requeue_count": 0, "error_count": 0}, + "embedding": {"processed": 0, "requeue_count": 0, "error_count": 0}, } semantic = _resolve_queue_group( @@ -113,8 +117,10 @@ def record_resource_wait_metrics( ) telemetry.set("queue.semantic.processed", semantic["processed"]) + telemetry.set("queue.semantic.requeue_count", semantic["requeue_count"]) telemetry.set("queue.semantic.error_count", semantic["error_count"]) telemetry.set("queue.embedding.processed", embedding["processed"]) + telemetry.set("queue.embedding.requeue_count", embedding["requeue_count"]) telemetry.set("queue.embedding.error_count", embedding["error_count"]) dag_stats = _consume_semantic_dag_stats(telemetry_id, root_uri) diff --git a/tests/storage/test_collection_schemas.py b/tests/storage/test_collection_schemas.py index bc636e55d..8047a9d15 100644 --- a/tests/storage/test_collection_schemas.py +++ b/tests/storage/test_collection_schemas.py @@ -94,9 +94,10 @@ async def upsert(self, _data, *, ctx): # pragma: no cover - should never run ) handler = TextEmbeddingHandler(_ClosingVikingDB()) - status = {"success": 0, "error": 0} + status = {"success": 0, "requeue": 0, "error": 0} handler.set_callbacks( on_success=lambda: status.__setitem__("success", status["success"] + 1), + on_requeue=lambda: status.__setitem__("requeue", status["requeue"] + 1), on_error=lambda *_: status.__setitem__("error", status["error"] + 1), ) @@ -105,6 +106,7 @@ async def upsert(self, _data, *, ctx): # pragma: no cover - should never run assert result is None assert embedder.calls == 0 assert status["success"] == 1 + assert status["requeue"] == 0 assert status["error"] == 0 @@ -132,6 +134,12 @@ async def enqueue_embedding_msg(self, msg): ) handler = TextEmbeddingHandler(_QueueingVikingDB()) + status = {"success": 0, "requeue": 0, "error": 0} + handler.set_callbacks( + on_success=lambda: status.__setitem__("success", status["success"] + 1), + on_requeue=lambda: status.__setitem__("requeue", status["requeue"] + 1), + on_error=lambda *_: status.__setitem__("error", status["error"] + 1), + ) monkeypatch.setattr( handler._circuit_breaker, "check", @@ -151,6 +159,7 @@ async def enqueue_embedding_msg(self, msg): warnings = [record.message for record in caplog.records if record.levelno == logging.WARNING] assert warnings.count("Embedding circuit breaker is open; re-enqueueing messages") == 1 + assert status == {"success": 2, "requeue": 2, "error": 0} @pytest.mark.asyncio @@ -173,9 +182,10 @@ async def upsert(self, _data, *, ctx): vikingdb = _ClosingDuringUpsertVikingDB() handler = TextEmbeddingHandler(vikingdb) - status = {"success": 0, "error": 0} + status = {"success": 0, "requeue": 0, "error": 0} handler.set_callbacks( on_success=lambda: status.__setitem__("success", status["success"] + 1), + on_requeue=lambda: status.__setitem__("requeue", status["requeue"] + 1), on_error=lambda *_: status.__setitem__("error", status["error"] + 1), ) @@ -185,6 +195,7 @@ async def upsert(self, _data, *, ctx): assert vikingdb.calls == 1 assert embedder.calls == 1 assert status["success"] == 1 + assert status["requeue"] == 0 assert status["error"] == 0 @@ -249,9 +260,10 @@ async def decrement(self, _semantic_msg_id): ) handler = TextEmbeddingHandler(_CapturingVikingDB()) - status = {"success": 0, "error": 0} + status = {"success": 0, "requeue": 0, "error": 0} handler.set_callbacks( on_success=lambda: status.__setitem__("success", status["success"] + 1), + on_requeue=lambda: status.__setitem__("requeue", status["requeue"] + 1), on_error=lambda *_: status.__setitem__("error", status["error"] + 1), ) @@ -264,12 +276,14 @@ async def decrement(self, _semantic_msg_id): await decrement_started.wait() assert status["success"] == 0 + assert status["requeue"] == 0 assert status["error"] == 0 allow_decrement_finish.set() await task assert status["success"] == 1 + assert status["requeue"] == 0 assert status["error"] == 0 diff --git a/tests/telemetry/test_request_wait_tracker.py b/tests/telemetry/test_request_wait_tracker.py index 9e5850e1e..61640e6d6 100644 --- a/tests/telemetry/test_request_wait_tracker.py +++ b/tests/telemetry/test_request_wait_tracker.py @@ -16,8 +16,8 @@ def test_request_wait_tracker_cleanup_prevents_state_recreation(): tracker.mark_embedding_done(telemetry_id, "embedding-1") assert tracker.build_queue_status(telemetry_id) == { - "Semantic": {"processed": 0, "error_count": 0, "errors": []}, - "Embedding": {"processed": 0, "error_count": 0, "errors": []}, + "Semantic": {"processed": 0, "requeue_count": 0, "error_count": 0, "errors": []}, + "Embedding": {"processed": 0, "requeue_count": 0, "error_count": 0, "errors": []}, } @@ -33,6 +33,20 @@ def test_request_wait_tracker_cleanup_prevents_root_recreation(): assert tracker.is_complete(telemetry_id) is True assert tracker.build_queue_status(telemetry_id) == { - "Semantic": {"processed": 0, "error_count": 0, "errors": []}, - "Embedding": {"processed": 0, "error_count": 0, "errors": []}, + "Semantic": {"processed": 0, "requeue_count": 0, "error_count": 0, "errors": []}, + "Embedding": {"processed": 0, "requeue_count": 0, "error_count": 0, "errors": []}, + } + + +def test_request_wait_tracker_records_requeues(): + tracker = RequestWaitTracker() + telemetry_id = "tm_requeue" + + tracker.register_request(telemetry_id) + tracker.record_semantic_requeue(telemetry_id) + tracker.record_embedding_requeue(telemetry_id, delta=2) + + assert tracker.build_queue_status(telemetry_id) == { + "Semantic": {"processed": 0, "requeue_count": 1, "error_count": 0, "errors": []}, + "Embedding": {"processed": 0, "requeue_count": 2, "error_count": 0, "errors": []}, } diff --git a/tests/test_telemetry_runtime.py b/tests/test_telemetry_runtime.py index 7c6dbe7fb..967ce205a 100644 --- a/tests/test_telemetry_runtime.py +++ b/tests/test_telemetry_runtime.py @@ -197,6 +197,11 @@ def __init__(self): self.embedding = SimpleNamespace( dimension=2, get_embedder=lambda: _TelemetryAwareEmbedder(), + circuit_breaker=SimpleNamespace( + failure_threshold=5, + reset_timeout=300.0, + max_reset_timeout=300.0, + ), ) class _DummyVikingDB: @@ -240,6 +245,20 @@ async def upsert(self, _data, *, ctx=None): @pytest.mark.asyncio async def test_resource_service_add_resource_reports_queue_summary(monkeypatch): telemetry = MemoryOperationTelemetry(operation="resources.add_resource", enabled=True) + queue_status = { + "Semantic": { + "processed": 2, + "requeue_count": 0, + "error_count": 1, + "errors": [], + }, + "Embedding": { + "processed": 5, + "requeue_count": 0, + "error_count": 0, + "errors": [], + }, + } class _DummyProcessor: async def process_resource(self, **kwargs): @@ -248,16 +267,24 @@ async def process_resource(self, **kwargs): "root_uri": "viking://resources/demo", } - class _DummyQueueManager: - async def wait_complete(self, timeout=None): - return { - "Semantic": SimpleNamespace(processed=2, error_count=1, errors=[]), - "Embedding": SimpleNamespace(processed=5, error_count=0, errors=[]), - } + class _DummyRequestWaitTracker: + def register_request(self, telemetry_id: str) -> None: + del telemetry_id + + async def wait_for_request(self, telemetry_id: str, timeout=None) -> None: + del telemetry_id, timeout + + def build_queue_status(self, telemetry_id: str): + del telemetry_id + return queue_status + + def cleanup(self, telemetry_id: str) -> None: + del telemetry_id monkeypatch.setattr( - "openviking.service.resource_service.get_queue_manager", - lambda: _DummyQueueManager(), + "openviking.service.resource_service.get_request_wait_tracker", + lambda: _DummyRequestWaitTracker(), + raising=False, ) class _DagStats: