Skip to content
87 changes: 67 additions & 20 deletions packages/cloudflare/src/request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ import {
parseStringToURLObject,
SEMANTIC_ATTRIBUTE_SENTRY_OP,
setHttpStatus,
startSpan,
startSpanManual,
winterCGHeadersToDict,
withIsolationScope,
} from '@sentry/core';
import type { CloudflareOptions } from './client';
import { addCloudResourceContext, addCultureContext, addRequest } from './scope-utils';
import { init } from './sdk';
import { classifyResponseStreaming } from './utils/streaming';

interface RequestHandlerWrapperOptions {
options: CloudflareOptions;
Expand Down Expand Up @@ -97,30 +98,76 @@ export function wrapRequestHandler(
// Note: This span will not have a duration unless I/O happens in the handler. This is
// because of how the cloudflare workers runtime works.
// See: https://developers.cloudflare.com/workers/runtime-apis/performance/
return startSpan(
{
name,
attributes,
},
async span => {

// Use startSpanManual to control when span ends (needed for streaming responses)
return startSpanManual({ name, attributes }, async span => {
let res: Response;

try {
res = await handler();
setHttpStatus(span, res.status);

// After the handler runs, the span name might have been updated by nested instrumentation
// (e.g., Remix parameterizing routes). The span should already have the correct name
// from that instrumentation, so we don't need to do anything here.
} catch (e) {
span.end();
if (captureErrors) {
captureException(e, { mechanism: { handled: false, type: 'auto.http.cloudflare' } });
}
waitUntil?.(flush(2000));
throw e;
}

// Classify response to detect actual streaming
const classification = classifyResponseStreaming(res);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

m: what happens if we mis-classify a non-streaming response as a stream response? Would we break anything with creating the stream readers?

I guess the other way around isn't "dangerous", given we'd just end the span too early, correct?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so for false positives (non-streaming → classified as streaming) functionality wise it won't break anything, the only downside is slightly more overhead (creating an unnecessary stream monitor), and for false negatives yes you're correct, it would end span too early, but that's our best guess for now


if (classification.isStreaming && res.body) {
// Streaming response detected - monitor consumption to keep span alive
try {
const res = await handler();
setHttpStatus(span, res.status);
const [clientStream, monitorStream] = res.body.tee();

// After the handler runs, the span name might have been updated by nested instrumentation
// (e.g., Remix parameterizing routes). The span should already have the correct name
// from that instrumentation, so we don't need to do anything here.
return res;
// Monitor stream consumption and end span when complete
const streamMonitor = (async () => {
const reader = monitorStream.getReader();

try {
let done = false;
while (!done) {
const result = await reader.read();
done = result.done;
}
} catch {
// Stream error or cancellation - will end span in finally
} finally {
reader.releaseLock();
span.end();
waitUntil?.(flush(2000));
}
})();

// Keep worker alive until stream monitoring completes (otherwise span won't end)
waitUntil?.(streamMonitor);

// Return response with client stream
return new Response(clientStream, {
status: res.status,
statusText: res.statusText,
headers: res.headers,
});
} catch (e) {
if (captureErrors) {
captureException(e, { mechanism: { handled: false, type: 'auto.http.cloudflare' } });
}
throw e;
} finally {
// tee() failed (e.g stream already locked) - fall back to non-streaming handling
span.end();
waitUntil?.(flush(2000));
return res;
}
},
);
}

// Non-streaming response - end span immediately and return original
span.end();
waitUntil?.(flush(2000));
return res;
});
},
);
});
Expand Down
41 changes: 41 additions & 0 deletions packages/cloudflare/src/utils/streaming.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
export type StreamingGuess = {
isStreaming: boolean;
};

/**
* Classifies a Response as streaming or non-streaming.
*
* Heuristics:
* - No body → not streaming
* - Known streaming Content-Types → streaming (SSE, NDJSON, JSON streaming)
* - text/plain without Content-Length → streaming (some AI APIs)
* - Otherwise → not streaming (conservative default, including HTML/SSR)
*
* We avoid probing the stream to prevent blocking on transform streams (like injectTraceMetaTags)
* or SSR streams that may not have data ready immediately.
*/
export function classifyResponseStreaming(res: Response): StreamingGuess {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

l/nit: Is there a reason why we return the response as well? I don't see any changes being made to it, so could a caller just directly reuse res instead of using the returned response?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch! I was mutating the res in a previous commit, these are just leftovers 😅

if (!res.body) {
return { isStreaming: false };
}

const contentType = res.headers.get('content-type') ?? '';
const contentLength = res.headers.get('content-length');

// Streaming: Known streaming content types
// - text/event-stream: Server-Sent Events (Vercel AI SDK, real-time APIs)
// - application/x-ndjson, application/ndjson: Newline-delimited JSON
// - application/stream+json: JSON streaming
// - text/plain (without Content-Length): Some AI APIs use this for streaming text
if (
/^text\/event-stream\b/i.test(contentType) ||
/^application\/(x-)?ndjson\b/i.test(contentType) ||
/^application\/stream\+json\b/i.test(contentType) ||
(/^text\/plain\b/i.test(contentType) && !contentLength)
) {
return { isStreaming: true };
}

// Default: treat as non-streaming
return { isStreaming: false };
}
34 changes: 29 additions & 5 deletions packages/cloudflare/test/durableobject.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,18 @@ describe('instrumentDurableObjectWithSentry', () => {
});

it('flush performs after all waitUntil promises are finished', async () => {
// Spy on Client.prototype.flush and mock it to resolve immediately to avoid timeout issues with fake timers
const flush = vi.spyOn(SentryCore.Client.prototype, 'flush').mockResolvedValue(true);
vi.useFakeTimers();
onTestFinished(() => {
vi.useRealTimers();
});
const flush = vi.spyOn(SentryCore.Client.prototype, 'flush');

// Measure delta instead of absolute call count to avoid interference from parallel tests.
// Since we spy on the prototype, other tests running in parallel may also call flush.
// By measuring before/after, we only verify that THIS test triggered exactly one flush call.
const before = flush.mock.calls.length;

const waitUntil = vi.fn();
const testClass = vi.fn(context => ({
fetch: () => {
Expand All @@ -133,12 +140,29 @@ describe('instrumentDurableObjectWithSentry', () => {
waitUntil,
} as unknown as ExecutionContext;
const dObject: any = Reflect.construct(instrumented, [context, {} as any]);
expect(() => dObject.fetch(new Request('https://example.com'))).not.toThrow();
expect(flush).not.toBeCalled();
expect(waitUntil).toHaveBeenCalledOnce();

// Call fetch (don't await yet)
const responsePromise = dObject.fetch(new Request('https://example.com'));

// Advance past classification timeout and get response
vi.advanceTimersByTime(30);
const response = await responsePromise;

// Consume response (triggers span end for buffered responses)
await response.text();

// The flush should now be queued in waitUntil
expect(waitUntil).toHaveBeenCalled();

// Advance to trigger the setTimeout in the handler's waitUntil
vi.advanceTimersToNextTimer();
await Promise.all(waitUntil.mock.calls.map(([p]) => p));
expect(flush).toBeCalled();

const after = flush.mock.calls.length;
const delta = after - before;

// Verify that exactly one flush call was made during this test
expect(delta).toBe(1);
});

describe('instrumentPrototypeMethods option', () => {
Expand Down
6 changes: 5 additions & 1 deletion packages/cloudflare/test/handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@ describe('withSentry', () => {
createMockExecutionContext(),
);

expect(result).toBe(response);
// Response may be wrapped for streaming detection, verify content
expect(result?.status).toBe(response.status);
if (result) {
expect(await result.text()).toBe('test');
}
});

test('merges options from env and callback', async () => {
Expand Down
4 changes: 3 additions & 1 deletion packages/cloudflare/test/pages-plugin.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ describe('sentryPagesPlugin', () => {
pluginArgs: MOCK_OPTIONS,
});

expect(result).toBe(response);
// Response may be wrapped for streaming detection, verify content
expect(result.status).toBe(response.status);
expect(await result.text()).toBe('test');
});
});
50 changes: 44 additions & 6 deletions packages/cloudflare/test/request.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ describe('withSentry', () => {
{ options: MOCK_OPTIONS, request: new Request('https://example.com'), context: createMockExecutionContext() },
() => response,
);
expect(result).toBe(response);
// Response may be wrapped for streaming detection, verify content matches
expect(result.status).toBe(response.status);
expect(await result.text()).toBe('test');
});

test('flushes the event after the handler is done using the cloudflare context.waitUntil', async () => {
Expand All @@ -48,6 +50,25 @@ describe('withSentry', () => {
expect(waitUntilSpy).toHaveBeenLastCalledWith(expect.any(Promise));
});

test('handles streaming responses correctly', async () => {
const stream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode('chunk1'));
controller.enqueue(new TextEncoder().encode('chunk2'));
controller.close();
},
});
const streamingResponse = new Response(stream);

const result = await wrapRequestHandler(
{ options: MOCK_OPTIONS, request: new Request('https://example.com'), context: createMockExecutionContext() },
() => streamingResponse,
);

const text = await result.text();
expect(text).toBe('chunk1chunk2');
});

test("doesn't error if context is undefined", () => {
expect(() =>
wrapRequestHandler(
Expand All @@ -69,11 +90,18 @@ describe('withSentry', () => {
});

test('flush must be called when all waitUntil are done', async () => {
const flush = vi.spyOn(SentryCore.Client.prototype, 'flush');
// Spy on Client.prototype.flush and mock it to resolve immediately to avoid timeout issues with fake timers
const flushSpy = vi.spyOn(SentryCore.Client.prototype, 'flush').mockResolvedValue(true);
vi.useFakeTimers();
onTestFinished(() => {
vi.useRealTimers();
});

// Measure delta instead of absolute call count to avoid interference from parallel tests.
// Since we spy on the prototype, other tests running in parallel may also call flush.
// By measuring before/after, we only verify that THIS test triggered exactly one flush call.
const before = flushSpy.mock.calls.length;

const waits: Promise<unknown>[] = [];
const waitUntil = vi.fn(promise => waits.push(promise));

Expand All @@ -83,13 +111,20 @@ describe('withSentry', () => {

await wrapRequestHandler({ options: MOCK_OPTIONS, request: new Request('https://example.com'), context }, () => {
addDelayedWaitUntil(context);
return new Response('test');
const response = new Response('test');
// Add Content-Length to skip probing
response.headers.set('content-length', '4');
return response;
});
expect(flush).not.toBeCalled();
expect(waitUntil).toBeCalled();
vi.advanceTimersToNextTimerAsync().then(() => vi.runAllTimers());
vi.advanceTimersToNextTimer().runAllTimers();
await Promise.all(waits);
expect(flush).toHaveBeenCalledOnce();

const after = flushSpy.mock.calls.length;
const delta = after - before;

// Verify that exactly one flush call was made during this test
expect(delta).toBe(1);
});

describe('scope instrumentation', () => {
Expand Down Expand Up @@ -303,6 +338,9 @@ describe('withSentry', () => {
},
);

// Wait for async span end and transaction capture
await new Promise(resolve => setTimeout(resolve, 50));

expect(sentryEvent.transaction).toEqual('GET /');
expect(sentryEvent.spans).toHaveLength(0);
expect(sentryEvent.contexts?.trace).toEqual({
Expand Down