Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 51 additions & 43 deletions src/routers/http/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,11 @@ impl Router {
self.worker_registry.get_all_urls()
}

/// Get all registered workers (for testing/diagnostics)
pub fn get_workers(&self) -> Vec<Arc<dyn Worker>> {
self.worker_registry.get_all()
}

/// Get worker URLs for a specific model
pub fn get_worker_urls_for_model(&self, model_id: Option<&str>) -> Vec<String> {
let workers = match model_id {
Expand Down Expand Up @@ -593,13 +598,6 @@ impl Router {
false
};

// Keep a clone for potential cleanup on retry
let worker_for_cleanup = if load_incremented {
Some(worker.clone())
} else {
None
};

let response = self
.send_typed_request(
headers,
Expand All @@ -617,18 +615,6 @@ impl Router {
let status = response.status();
worker.record_outcome(status.is_success() || status.is_client_error());

// For retryable failures, we need to decrement load since send_typed_request
// won't have done it (it only decrements on success or non-retryable failures)
if is_retryable_status(response.status()) && load_incremented {
if let Some(cleanup_worker) = worker_for_cleanup {
cleanup_worker.decrement_load();
RouterMetrics::set_running_requests(
cleanup_worker.url(),
cleanup_worker.load(),
);
}
}

response
},
// should_retry predicate
Expand Down Expand Up @@ -966,33 +952,55 @@ impl Router {
let mut decremented = false;
let mut usage_extractor =
stream_run_id.map(usage_metrics::SseUsageExtractor::new);
while let Some(chunk) = stream.next().await {
match chunk {
Ok(bytes) => {
// Extract per-run usage from streaming chunks.
// Buffered across chunks because TCP segment
// boundaries can split SSE lines.
if let Some(extractor) = usage_extractor.as_mut() {
extractor.push_chunk(&bytes);
}
// Check for stream end marker
if bytes
.as_ref()
.windows(12)
.any(|window| window == b"data: [DONE]")
{
if let Some(worker) = registry.get_by_url(&worker_url) {
worker.decrement_load();
RouterMetrics::set_running_requests(&worker_url, worker.load());
decremented = true;
loop {
match tokio::time::timeout(
std::time::Duration::from_secs(300),
stream.next(),
).await {
Ok(Some(chunk)) => {
match chunk {
Ok(bytes) => {
// Extract per-run usage from streaming chunks.
// Buffered across chunks because TCP segment
// boundaries can split SSE lines.
if let Some(extractor) = usage_extractor.as_mut() {
extractor.push_chunk(&bytes);
}
// Check for stream end marker
if bytes
.as_ref()
.windows(12)
.any(|window| window == b"data: [DONE]")
{
if let Some(worker) = registry.get_by_url(&worker_url) {
worker.decrement_load();
RouterMetrics::set_running_requests(&worker_url, worker.load());
decremented = true;
}
}
if tx.send(Ok(bytes)).is_err() {
break;
}
}
Err(e) => {
let _ = tx.send(Err(format!("Stream error: {}", e)));
break;
}
}
if tx.send(Ok(bytes)).is_err() {
break;
}
}
Err(e) => {
let _ = tx.send(Err(format!("Stream error: {}", e)));
Ok(None) => {
// Stream ended normally
break;
}
Err(_elapsed) => {
// Upstream stalled for 60s — notify client and bail
tracing::warn!(
"Stream from {} timed out after 300s of inactivity, closing",
worker_url
);
let _ = tx.send(Err(
"stream timeout: upstream worker did not send data for 300 seconds".to_string()
));
break;
}
}
Expand Down
Loading
Loading