Skip to content

Commit c5de4e0

Browse files
Evrard-Nilclaude
andauthored
fix: prevent infinite SSE error loop and remove aggressive read_timeout (#453)
* fix: prevent infinite SSE error loop and remove aggressive read_timeout Two bugs caused ~20% of concurrent streaming requests through the gateway to produce infinite `data: error: Failed to perform completion: error decoding response body` lines (multi-GB responses of repeated errors): 1. SSE parser never terminated after a byte-stream error - it kept polling the broken reqwest stream, which kept returning errors. Added a `finished` flag to BufferedSSEParser that stops polling after the first error or stream end. 2. The vLLM HTTP client set `read_timeout` (default 30s) which applies to every individual chunk read. Under concurrent load, inter-chunk gaps can exceed this, causing spurious timeouts. Removed it to match the pattern used by external providers (Anthropic, OpenAI, Gemini) which only set connect_timeout and pool_idle_timeout. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: add per-request timeouts to non-streaming vLLM methods After removing the client-level read_timeout, the non-streaming methods (get_signature, get_attestation_report, models, chat_completion) were left without timeout protection. Add explicit per-request .timeout() using config.timeout_seconds to prevent hanging connections and resource exhaustion. Streaming methods (chat_completion_stream, text_completion_stream) intentionally omit per-request timeout since reqwest's .timeout() covers the entire request lifecycle including body streaming, which would kill long-running SSE streams. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: add TTFB timeout to streaming methods and fix cargo fmt Add per-request `.timeout()` to `chat_completion_stream` and `text_completion_stream` to protect against backends that accept connections but never respond. In reqwest, the per-request timeout covers connection + response headers; the body stream is consumed lazily after `.send()` resolves, so this does not reintroduce the per-chunk timeout issue. Also fix cargo fmt formatting in the SSE parser test. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 73db980 commit c5de4e0

File tree

2 files changed

+79
-3
lines changed

2 files changed

+79
-3
lines changed

crates/inference_providers/src/sse_parser.rs

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ pub struct BufferedSSEParser<S, P: SSEEventParser> {
6060
/// Pending results from previous process_buffer() calls.
6161
/// Multiple SSE events can arrive in a single network packet.
6262
pending_results: VecDeque<Result<SSEEvent, CompletionError>>,
63+
/// Set to true after the underlying byte stream returns an error or ends.
64+
/// Prevents infinite error loops when the stream is broken.
65+
finished: bool,
6366
state: P::State,
6467
_marker: PhantomData<P>,
6568
}
@@ -76,6 +79,7 @@ where
7679
buffer: String::new(),
7780
bytes_buffer: Vec::new(),
7881
pending_results: VecDeque::new(),
82+
finished: false,
7983
state,
8084
_marker: PhantomData,
8185
}
@@ -135,6 +139,13 @@ where
135139
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
136140
let this = self.get_mut();
137141

142+
// If the underlying stream has errored or ended, don't poll it again.
143+
// This prevents infinite error loops when the byte stream is broken
144+
// (e.g., due to read timeouts under load).
145+
if this.finished {
146+
return Poll::Ready(None);
147+
}
148+
138149
loop {
139150
// First, return any pending results from previous process_buffer() calls
140151
if let Some(result) = this.pending_results.pop_front() {
@@ -158,9 +169,12 @@ where
158169
continue;
159170
}
160171
Poll::Ready(Some(Err(e))) => {
172+
// Mark stream as finished so we don't poll the broken stream again
173+
this.finished = true;
161174
return Poll::Ready(Some(Err(CompletionError::CompletionError(e.to_string()))));
162175
}
163176
Poll::Ready(None) => {
177+
this.finished = true;
164178
// Stream ended - process any remaining buffer content
165179
if !this.buffer.trim().is_empty() {
166180
warn!("Incomplete SSE data in buffer at stream end");
@@ -383,4 +397,63 @@ mod tests {
383397
assert_eq!(events.len(), 1, "Expected 1 event, got {}", events.len());
384398
assert!(events[0].is_ok());
385399
}
400+
401+
#[tokio::test]
402+
async fn test_sse_parser_terminates_after_stream_error() {
403+
// Test that the parser stops polling the underlying stream after an error.
404+
// We use a custom Stream impl that panics if polled after returning an error,
405+
// proving the `finished` flag prevents infinite error loops.
406+
use std::sync::atomic::{AtomicU8, Ordering};
407+
use std::sync::Arc;
408+
use std::task::Poll;
409+
410+
struct ErrorThenPanicStream {
411+
state: Arc<AtomicU8>, // 0=send_ok, 1=send_none, 2+=panic
412+
}
413+
414+
impl Stream for ErrorThenPanicStream {
415+
type Item = Result<bytes::Bytes, reqwest::Error>;
416+
417+
fn poll_next(
418+
self: Pin<&mut Self>,
419+
_cx: &mut std::task::Context<'_>,
420+
) -> Poll<Option<Self::Item>> {
421+
let s = self.state.fetch_add(1, Ordering::SeqCst);
422+
match s {
423+
0 => {
424+
// First poll: return a valid SSE chunk
425+
Poll::Ready(Some(Ok(bytes::Bytes::from(
426+
"data: {\"id\":\"1\",\"object\":\"chat.completion.chunk\",\"created\":1234567890,\"model\":\"test\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\"Hello\"},\"finish_reason\":null}]}\n\n"
427+
))))
428+
}
429+
1 => {
430+
// Second poll: stream ends (simulating a broken connection)
431+
// We return None here since we can't easily construct a reqwest::Error.
432+
// The `finished` flag is also set on stream end (Poll::Ready(None)).
433+
Poll::Ready(None)
434+
}
435+
_ => {
436+
// Third+ poll: should never happen if `finished` flag works
437+
panic!("Stream was polled after ending! The `finished` flag is broken.");
438+
}
439+
}
440+
}
441+
}
442+
443+
impl Unpin for ErrorThenPanicStream {}
444+
445+
let stream = ErrorThenPanicStream {
446+
state: Arc::new(AtomicU8::new(0)),
447+
};
448+
449+
let parser =
450+
BufferedSSEParser::<_, OpenAIEventParser>::new(stream, OpenAIParserState::new(true));
451+
let events: Vec<_> = parser.collect().await;
452+
453+
// Should have exactly 1 event (the good chunk).
454+
// The stream ended after that, and the parser must NOT poll again.
455+
// If the `finished` flag is broken, the ErrorThenPanicStream will panic.
456+
assert_eq!(events.len(), 1, "Expected 1 event, got {}", events.len());
457+
assert!(events[0].is_ok(), "Event should be Ok");
458+
}
386459
}

crates/inference_providers/src/vllm/mod.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,6 @@ impl VLlmProvider {
6969
let client = Client::builder()
7070
.connect_timeout(std::time::Duration::from_secs(30))
7171
.pool_idle_timeout(std::time::Duration::from_secs(90))
72-
.read_timeout(std::time::Duration::from_secs(
73-
config.timeout_seconds as u64,
74-
))
7572
.build()
7673
.expect("Failed to create HTTP client");
7774

@@ -151,6 +148,7 @@ impl InferenceProvider for VLlmProvider {
151148
.client
152149
.get(&url)
153150
.headers(headers)
151+
.timeout(Duration::from_secs(self.config.timeout_seconds as u64))
154152
.send()
155153
.await
156154
.map_err(|e| CompletionError::CompletionError(e.to_string()))?;
@@ -198,6 +196,7 @@ impl InferenceProvider for VLlmProvider {
198196
.client
199197
.get(&url)
200198
.headers(headers)
199+
.timeout(Duration::from_secs(self.config.timeout_seconds as u64))
201200
.send()
202201
.await
203202
.map_err(|e| AttestationError::FetchError(e.to_string()))?;
@@ -237,6 +236,7 @@ impl InferenceProvider for VLlmProvider {
237236
.client
238237
.get(&url)
239238
.headers(headers)
239+
.timeout(Duration::from_secs(self.config.timeout_seconds as u64))
240240
.send()
241241
.await
242242
.map_err(|e| ListModelsError::FetchError(format!("{e:?}")))?;
@@ -288,6 +288,7 @@ impl InferenceProvider for VLlmProvider {
288288
.post(&url)
289289
.headers(headers)
290290
.json(&streaming_params)
291+
.timeout(Duration::from_secs(self.config.timeout_seconds as u64))
291292
.send()
292293
.await
293294
.map_err(|e| CompletionError::CompletionError(e.to_string()))?;
@@ -336,6 +337,7 @@ impl InferenceProvider for VLlmProvider {
336337
.post(&url)
337338
.headers(headers)
338339
.json(&non_streaming_params)
340+
.timeout(Duration::from_secs(self.config.timeout_seconds as u64))
339341
.send()
340342
.await
341343
.map_err(|e| CompletionError::CompletionError(e.to_string()))?;
@@ -396,6 +398,7 @@ impl InferenceProvider for VLlmProvider {
396398
.post(&url)
397399
.headers(headers)
398400
.json(&streaming_params)
401+
.timeout(Duration::from_secs(self.config.timeout_seconds as u64))
399402
.send()
400403
.await
401404
.map_err(|e| CompletionError::CompletionError(e.to_string()))?;

0 commit comments

Comments
 (0)