Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions openviking/service/resource_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,7 @@ async def wait_processed(self, timeout: Optional[float] = None) -> Dict[str, Any
return {
name: {
"processed": s.processed,
"requeue_count": getattr(s, "requeue_count", 0),
"error_count": s.error_count,
"errors": [{"message": e.message} for e in s.errors],
}
Expand Down
24 changes: 23 additions & 1 deletion openviking/storage/collection_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
@dataclass
class RequestQueueStats:
processed: int = 0
requeue_count: int = 0
error_count: int = 0


Expand Down Expand Up @@ -207,13 +208,18 @@ def _log_breaker_open_reenqueue_summary(self) -> None:

@classmethod
def _merge_request_stats(
cls, telemetry_id: str, processed: int = 0, error_count: int = 0
cls,
telemetry_id: str,
processed: int = 0,
requeue_count: int = 0,
error_count: int = 0,
) -> None:
if not telemetry_id:
return
with cls._request_stats_lock:
stats = cls._request_stats_by_telemetry_id.setdefault(telemetry_id, RequestQueueStats())
stats.processed += processed
stats.requeue_count += requeue_count
stats.error_count += error_count
cls._request_stats_order.append(telemetry_id)
if len(cls._request_stats_order) > cls._max_cached_stats:
Expand Down Expand Up @@ -291,6 +297,14 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str,
if wait > 0:
await asyncio.sleep(wait)
await self._vikingdb.enqueue_embedding_msg(embedding_msg)
self._merge_request_stats(
embedding_msg.telemetry_id,
requeue_count=1,
)
get_request_wait_tracker().record_embedding_requeue(
embedding_msg.telemetry_id
)
self.report_requeue()
report_success = True
return None
# No queue manager — cannot re-enqueue, drop with error
Expand Down Expand Up @@ -345,6 +359,14 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str,
if self._vikingdb.has_queue_manager:
try:
await self._vikingdb.enqueue_embedding_msg(embedding_msg)
self._merge_request_stats(
embedding_msg.telemetry_id,
requeue_count=1,
)
get_request_wait_tracker().record_embedding_requeue(
embedding_msg.telemetry_id
)
self.report_requeue()
logger.info(
f"Re-enqueued embedding message after transient error: {embedding_msg.id}"
)
Expand Down
5 changes: 5 additions & 0 deletions openviking/storage/observers/queue_observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def _format_status_as_table(
total_pending = 0
total_in_progress = 0
total_processed = 0
total_requeues = 0
total_errors = 0

for queue_name, status in statuses.items():
Expand All @@ -69,13 +70,15 @@ def _format_status_as_table(
"Pending": status.pending,
"In Progress": status.in_progress,
"Processed": status.processed,
"Requeued": status.requeue_count,
"Errors": status.error_count,
"Total": total,
}
)
total_pending += status.pending
total_in_progress += status.in_progress
total_processed += status.processed
total_requeues += status.requeue_count
total_errors += status.error_count

data.append(
Expand All @@ -84,6 +87,7 @@ def _format_status_as_table(
"Pending": getattr(dag_stats, "pending_nodes", 0) if dag_stats else 0,
"In Progress": getattr(dag_stats, "in_progress_nodes", 0) if dag_stats else 0,
"Processed": getattr(dag_stats, "done_nodes", 0) if dag_stats else 0,
"Requeued": 0,
"Errors": 0,
"Total": getattr(dag_stats, "total_nodes", 0) if dag_stats else 0,
}
Expand All @@ -97,6 +101,7 @@ def _format_status_as_table(
"Pending": total_pending,
"In Progress": total_in_progress,
"Processed": total_processed,
"Requeued": total_requeues,
"Errors": total_errors,
"Total": total_total,
}
Expand Down
18 changes: 18 additions & 0 deletions openviking/storage/queuefs/named_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class QueueStatus:
pending: int = 0
in_progress: int = 0
processed: int = 0
requeue_count: int = 0
error_count: int = 0
errors: List[QueueError] = field(default_factory=list)

Expand Down Expand Up @@ -60,22 +61,30 @@ class DequeueHandlerBase(abc.ABC):
"""Dequeue handler base class, supports callback mechanism to report processing results."""

_success_callback: Optional[Callable[[], None]] = None
_requeue_callback: Optional[Callable[[], None]] = None
_error_callback: Optional[Callable[[str, Optional[Dict[str, Any]]], None]] = None

def set_callbacks(
self,
on_success: Callable[[], None],
on_requeue: Callable[[], None],
on_error: Callable[[str, Optional[Dict[str, Any]]], None],
) -> None:
"""Set callback functions."""
self._success_callback = on_success
self._requeue_callback = on_requeue
self._error_callback = on_error

def report_success(self) -> None:
"""Report processing success."""
if self._success_callback:
self._success_callback()

def report_requeue(self) -> None:
"""Report that the current message was re-enqueued for later retry."""
if self._requeue_callback:
self._requeue_callback()

def report_error(self, error_msg: str, data: Optional[Dict[str, Any]] = None) -> None:
"""Report processing error."""
if self._error_callback:
Expand Down Expand Up @@ -113,13 +122,15 @@ def __init__(
self._lock = threading.Lock()
self._in_progress = 0
self._processed = 0
self._requeue_count = 0
self._error_count = 0
self._errors: List[QueueError] = []

# Inject callbacks to handler
if self._dequeue_handler:
self._dequeue_handler.set_callbacks(
on_success=self._on_process_success,
on_requeue=self._on_process_requeue,
on_error=self._on_process_error,
)

Expand All @@ -134,6 +145,11 @@ def _on_process_success(self) -> None:
self._in_progress -= 1
self._processed += 1

def _on_process_requeue(self) -> None:
"""Called when a dequeued message is re-enqueued for later retry."""
with self._lock:
self._requeue_count += 1

def _on_process_error(self, error_msg: str, data: Optional[Dict[str, Any]] = None) -> None:
"""Called on processing failure."""
with self._lock:
Expand All @@ -157,6 +173,7 @@ async def get_status(self) -> QueueStatus:
pending=pending,
in_progress=self._in_progress,
processed=self._processed,
requeue_count=self._requeue_count,
error_count=self._error_count,
errors=list(self._errors),
)
Expand All @@ -166,6 +183,7 @@ def reset_status(self) -> None:
with self._lock:
self._in_progress = 0
self._processed = 0
self._requeue_count = 0
self._error_count = 0
self._errors = []

Expand Down
9 changes: 9 additions & 0 deletions openviking/storage/queuefs/semantic_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class DiffResult:

class RequestQueueStats:
processed: int = 0
requeue_count: int = 0
error_count: int = 0


Expand Down Expand Up @@ -124,13 +125,15 @@ def _merge_request_stats(
cls,
telemetry_id: str,
processed: int = 0,
requeue_count: int = 0,
error_count: int = 0,
) -> None:
if not telemetry_id:
return
with cls._stats_lock:
stats = cls._request_stats_by_telemetry_id.setdefault(telemetry_id, RequestQueueStats())
stats.processed += processed
stats.requeue_count += requeue_count
stats.error_count += error_count
cls._request_stats_order.append(telemetry_id)
if len(cls._request_stats_order) > cls._max_cached_stats:
Expand Down Expand Up @@ -258,6 +261,9 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str,
f"Circuit breaker is open, re-enqueueing semantic message: {msg.uri}"
)
await self._reenqueue_semantic_msg(msg)
self._merge_request_stats(msg.telemetry_id, requeue_count=1)
get_request_wait_tracker().record_semantic_requeue(msg.telemetry_id)
self.report_requeue()
self.report_success()
return None
collector = resolve_telemetry(msg.telemetry_id)
Expand Down Expand Up @@ -348,6 +354,9 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str,
if msg is not None:
try:
await self._reenqueue_semantic_msg(msg)
self._merge_request_stats(msg.telemetry_id, requeue_count=1)
get_request_wait_tracker().record_semantic_requeue(msg.telemetry_id)
self.report_requeue()
except Exception as requeue_err:
logger.error(f"Failed to re-enqueue semantic message: {requeue_err}")
self._merge_request_stats(msg.telemetry_id, error_count=1)
Expand Down
2 changes: 2 additions & 0 deletions openviking/telemetry/operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,12 @@ def build(
summary["queue"] = {
"semantic": {
"processed": cls._i(gauges.get("queue.semantic.processed"), 0),
"requeue_count": cls._i(gauges.get("queue.semantic.requeue_count"), 0),
"error_count": cls._i(gauges.get("queue.semantic.error_count"), 0),
},
"embedding": {
"processed": cls._i(gauges.get("queue.embedding.processed"), 0),
"requeue_count": cls._i(gauges.get("queue.embedding.requeue_count"), 0),
"error_count": cls._i(gauges.get("queue.embedding.error_count"), 0),
},
}
Expand Down
22 changes: 22 additions & 0 deletions openviking/telemetry/request_wait_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ class _RequestWaitState:
pending_semantic_roots: Set[str] = field(default_factory=set)
pending_embedding_roots: Set[str] = field(default_factory=set)
semantic_processed: int = 0
semantic_requeue_count: int = 0
semantic_error_count: int = 0
semantic_errors: List[str] = field(default_factory=list)
embedding_processed: int = 0
embedding_requeue_count: int = 0
embedding_error_count: int = 0
embedding_errors: List[str] = field(default_factory=list)
created_at: float = field(default_factory=time.time)
Expand Down Expand Up @@ -80,6 +82,15 @@ def record_embedding_processed(self, telemetry_id: str, delta: int = 1) -> None:
return
state.embedding_processed += max(delta, 0)

def record_embedding_requeue(self, telemetry_id: str, delta: int = 1) -> None:
if not telemetry_id:
return
with self._lock:
state = self._states.get(telemetry_id)
if state is None:
return
state.embedding_requeue_count += max(delta, 0)

def record_embedding_error(self, telemetry_id: str, message: str) -> None:
if not telemetry_id:
return
Expand All @@ -106,6 +117,15 @@ def mark_semantic_done(
state.pending_semantic_roots.discard(semantic_msg_id)
state.semantic_processed += max(processed_delta, 0)

def record_semantic_requeue(self, telemetry_id: str, delta: int = 1) -> None:
if not telemetry_id:
return
with self._lock:
state = self._states.get(telemetry_id)
if state is None:
return
state.semantic_requeue_count += max(delta, 0)

def mark_semantic_failed(self, telemetry_id: str, semantic_msg_id: str, message: str) -> None:
if not telemetry_id:
return
Expand Down Expand Up @@ -176,11 +196,13 @@ def build_queue_status(self, telemetry_id: str) -> Dict[str, Dict[str, object]]:
return {
"Semantic": {
"processed": state.semantic_processed,
"requeue_count": state.semantic_requeue_count,
"error_count": state.semantic_error_count,
"errors": [{"message": msg} for msg in state.semantic_errors],
},
"Embedding": {
"processed": state.embedding_processed,
"requeue_count": state.embedding_requeue_count,
"error_count": state.embedding_error_count,
"errors": [{"message": msg} for msg in state.embedding_errors],
},
Expand Down
12 changes: 9 additions & 3 deletions openviking/telemetry/resource_summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def build_queue_status_payload(status: Dict[str, Any]) -> Dict[str, Dict[str, An
return {
name: {
"processed": s.processed,
"requeue_count": getattr(s, "requeue_count", 0),
"error_count": s.error_count,
"errors": [{"message": e.message} for e in s.errors],
}
Expand All @@ -73,17 +74,20 @@ def _resolve_queue_group(
if explicit_stats is not None:
return {
"processed": explicit_stats.processed,
"requeue_count": getattr(explicit_stats, "requeue_count", 0),
"error_count": explicit_stats.error_count,
}
if fallback_status is None:
return {"processed": 0, "error_count": 0}
return {"processed": 0, "requeue_count": 0, "error_count": 0}
if isinstance(fallback_status, dict):
return {
"processed": int(fallback_status.get("processed", 0) or 0),
"requeue_count": int(fallback_status.get("requeue_count", 0) or 0),
"error_count": int(fallback_status.get("error_count", 0) or 0),
}
return {
"processed": fallback_status.processed,
"requeue_count": getattr(fallback_status, "requeue_count", 0),
"error_count": fallback_status.error_count,
}

Expand All @@ -99,8 +103,8 @@ def record_resource_wait_metrics(
telemetry = telemetry or get_current_telemetry()
if not telemetry.enabled:
return {
"semantic": {"processed": 0, "error_count": 0},
"embedding": {"processed": 0, "error_count": 0},
"semantic": {"processed": 0, "requeue_count": 0, "error_count": 0},
"embedding": {"processed": 0, "requeue_count": 0, "error_count": 0},
}

semantic = _resolve_queue_group(
Expand All @@ -113,8 +117,10 @@ def record_resource_wait_metrics(
)

telemetry.set("queue.semantic.processed", semantic["processed"])
telemetry.set("queue.semantic.requeue_count", semantic["requeue_count"])
telemetry.set("queue.semantic.error_count", semantic["error_count"])
telemetry.set("queue.embedding.processed", embedding["processed"])
telemetry.set("queue.embedding.requeue_count", embedding["requeue_count"])
telemetry.set("queue.embedding.error_count", embedding["error_count"])

dag_stats = _consume_semantic_dag_stats(telemetry_id, root_uri)
Expand Down
Loading
Loading