From 4a4346bd7f865317a93a417813474ad49011ba98 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Thu, 9 Apr 2026 17:30:48 +0000 Subject: [PATCH 1/3] fix: propagate downstream failure status in webhook forwarding When --forward-to is set, the listener now returns the downstream HTTP status to Resend instead of always returning 200 OK. This preserves Resend's retry behavior when the local handler fails. - Non-2xx downstream responses are propagated as-is to the upstream - Forwarding errors (connection refused, timeout) return 502 - Added AbortSignal.timeout(30s) to prevent stalled forwarding - Extracted resolveUpstream to its own utility file Co-authored-by: Bu Kinoshita --- src/commands/webhooks/listen.ts | 62 +++--- src/commands/webhooks/resolve-upstream.ts | 15 ++ .../commands/webhooks/listen-forward.test.ts | 178 ++++++++++++++++++ .../webhooks/resolve-upstream.test.ts | 73 +++++++ 4 files changed, 302 insertions(+), 26 deletions(-) create mode 100644 src/commands/webhooks/resolve-upstream.ts create mode 100644 tests/commands/webhooks/listen-forward.test.ts create mode 100644 tests/commands/webhooks/resolve-upstream.test.ts diff --git a/src/commands/webhooks/listen.ts b/src/commands/webhooks/listen.ts index f71ba161..7f60599a 100644 --- a/src/commands/webhooks/listen.ts +++ b/src/commands/webhooks/listen.ts @@ -14,10 +14,13 @@ import { outputError } from '../../lib/output'; import { requireText } from '../../lib/prompts'; import { createSpinner } from '../../lib/spinner'; import { isInteractive } from '../../lib/tty'; +import { resolveUpstream } from './resolve-upstream'; import { ALL_WEBHOOK_EVENTS, normalizeEvents } from './utils'; const SVIX_HEADERS = ['svix-id', 'svix-timestamp', 'svix-signature']; +const FORWARD_TIMEOUT_MS = 30_000; + function timestamp(): string { return new Date().toLocaleTimeString('en-GB', { hour12: false }); } @@ -95,6 +98,7 @@ async function forwardPayload( method: 'POST', headers: forwardHeaders, body: rawBody, + signal: AbortSignal.timeout(FORWARD_TIMEOUT_MS), }); return { status: resp.status }; } @@ -223,6 +227,26 @@ For example, if using ngrok: ngrok http 4318`, const { type, resourceId, detail } = summarizeEvent(body); + let forwardResult: + | { readonly status: number } + | { readonly error: string } + | undefined; + + if (opts.forwardTo) { + try { + const { status } = await forwardPayload( + opts.forwardTo, + rawBody, + svixHeaders, + ); + forwardResult = { status }; + } catch (err) { + forwardResult = { + error: err instanceof Error ? err.message : 'Unknown error', + }; + } + } + if (jsonMode) { const entry: Record = { timestamp: new Date().toISOString(), @@ -231,20 +255,11 @@ For example, if using ngrok: ngrok http 4318`, payload: body, }; - if (opts.forwardTo) { - try { - const { status } = await forwardPayload( - opts.forwardTo, - rawBody, - svixHeaders, - ); - entry.forwarded = { url: opts.forwardTo, status }; - } catch (err) { - entry.forwarded = { - url: opts.forwardTo, - error: err instanceof Error ? err.message : 'Unknown error', - }; - } + if (opts.forwardTo && forwardResult) { + entry.forwarded = + 'error' in forwardResult + ? { url: opts.forwardTo, error: forwardResult.error } + : { url: opts.forwardTo, status: forwardResult.status }; } console.log(JSON.stringify(entry)); @@ -256,29 +271,24 @@ For example, if using ngrok: ngrok http 4318`, `${ts} ${pc.bold(typePad)} ${pc.cyan(idPad)} ${detail}\n`, ); - if (opts.forwardTo) { + if (opts.forwardTo && forwardResult) { const target = opts.forwardTo.startsWith('http') ? opts.forwardTo : `http://${opts.forwardTo}`; - try { - const { status } = await forwardPayload( - opts.forwardTo, - rawBody, - svixHeaders, - ); + if ('error' in forwardResult) { process.stderr.write( - `${pc.dim(' -> POST')} ${target} ${pc.dim(`[${formatStatus(status)}]`)}\n`, + `${pc.dim(' -> POST')} ${target} ${pc.red(`[Error: ${forwardResult.error}]`)}\n`, ); - } catch (err) { - const msg = err instanceof Error ? err.message : 'Unknown error'; + } else { process.stderr.write( - `${pc.dim(' -> POST')} ${target} ${pc.red(`[Error: ${msg}]`)}\n`, + `${pc.dim(' -> POST')} ${target} ${pc.dim(`[${formatStatus(forwardResult.status)}]`)}\n`, ); } } } - res.writeHead(200).end('OK'); + const upstream = resolveUpstream(forwardResult); + res.writeHead(upstream.status).end(upstream.body); }, ); diff --git a/src/commands/webhooks/resolve-upstream.ts b/src/commands/webhooks/resolve-upstream.ts new file mode 100644 index 00000000..fdac2ecc --- /dev/null +++ b/src/commands/webhooks/resolve-upstream.ts @@ -0,0 +1,15 @@ +type ForwardResult = { readonly status: number } | { readonly error: string }; + +export const resolveUpstream = ( + result: ForwardResult | undefined, +): { readonly status: number; readonly body: string } => { + if (!result) { + return { status: 200, body: 'OK' }; + } + if ('error' in result) { + return { status: 502, body: 'Forward target unreachable' }; + } + return result.status >= 200 && result.status < 300 + ? { status: 200, body: 'OK' } + : { status: result.status, body: 'Forward target failed' }; +}; diff --git a/tests/commands/webhooks/listen-forward.test.ts b/tests/commands/webhooks/listen-forward.test.ts new file mode 100644 index 00000000..ffab80ee --- /dev/null +++ b/tests/commands/webhooks/listen-forward.test.ts @@ -0,0 +1,178 @@ +import { createServer, type Server } from 'node:http'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { + captureTestEnv, + setNonInteractive, + setupOutputSpies, +} from '../../helpers'; + +const mockCreate = vi.fn(async () => ({ + data: { object: 'webhook' as const, id: 'wh_test', signing_secret: 'sec' }, + error: null, +})); +const mockRemove = vi.fn(async () => ({ data: null, error: null })); + +vi.mock('resend', () => ({ + Resend: class MockResend { + constructor(public key: string) {} + webhooks = { create: mockCreate, remove: mockRemove }; + }, +})); + +const postJSON = async ( + port: number, + body: Record, +): Promise<{ status: number; body: string }> => { + const resp = await fetch(`http://127.0.0.1:${port}`, { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify(body), + }); + return { status: resp.status, body: await resp.text() }; +}; + +const startTargetServer = ( + responseStatus: number, +): Promise<{ server: Server; port: number }> => + new Promise((resolve) => { + const server = createServer((_req, res) => { + res.writeHead(responseStatus).end('target response'); + }); + server.listen(0, '127.0.0.1', () => { + const addr = server.address(); + const port = typeof addr === 'object' && addr ? addr.port : 0; + resolve({ server, port }); + }); + }); + +describe('webhook listen --forward-to status propagation', () => { + const restoreEnv = captureTestEnv(); + let listenerAbort: AbortController | undefined; + let targetServer: Server | undefined; + + beforeEach(() => { + process.env.RESEND_API_KEY = 're_test_key'; + mockCreate.mockClear(); + mockRemove.mockClear(); + }); + + afterEach(() => { + restoreEnv(); + listenerAbort?.abort(); + listenerAbort = undefined; + targetServer?.close(); + targetServer = undefined; + process.removeAllListeners('SIGINT'); + process.removeAllListeners('SIGTERM'); + }); + + const startListener = async (forwardPort: number): Promise => { + setNonInteractive(); + setupOutputSpies(); + + const { listenWebhookCommand } = await import( + '../../../src/commands/webhooks/listen' + ); + + const listenerPort = 10_000 + Math.floor(Math.random() * 50_000); + + listenerAbort = new AbortController(); + + listenWebhookCommand + .parseAsync( + [ + '--url', + 'https://tunnel.example.com', + '--port', + String(listenerPort), + '--forward-to', + `http://127.0.0.1:${forwardPort}/webhook`, + ], + { from: 'user' }, + ) + .catch(() => {}); + + await new Promise((r) => setTimeout(r, 500)); + + return listenerPort; + }; + + it('returns 200 when forward target responds 200', async () => { + const target = await startTargetServer(200); + targetServer = target.server; + const listenerPort = await startListener(target.port); + + const result = await postJSON(listenerPort, { + type: 'email.sent', + data: { id: 'evt_1' }, + }); + + expect(result.status).toBe(200); + expect(result.body).toBe('OK'); + }); + + it('propagates 500 when forward target responds 500', async () => { + const target = await startTargetServer(500); + targetServer = target.server; + const listenerPort = await startListener(target.port); + + const result = await postJSON(listenerPort, { + type: 'email.sent', + data: { id: 'evt_2' }, + }); + + expect(result.status).toBe(500); + expect(result.body).toBe('Forward target failed'); + }); + + it('propagates 401 when forward target responds 401', async () => { + const target = await startTargetServer(401); + targetServer = target.server; + const listenerPort = await startListener(target.port); + + const result = await postJSON(listenerPort, { + type: 'email.sent', + data: { id: 'evt_3' }, + }); + + expect(result.status).toBe(401); + expect(result.body).toBe('Forward target failed'); + }); + + it('returns 502 when forward target is unreachable', async () => { + setNonInteractive(); + setupOutputSpies(); + + const { listenWebhookCommand } = await import( + '../../../src/commands/webhooks/listen' + ); + + const listenerPort = 10_000 + Math.floor(Math.random() * 50_000); + + listenerAbort = new AbortController(); + + listenWebhookCommand + .parseAsync( + [ + '--url', + 'https://tunnel.example.com', + '--port', + String(listenerPort), + '--forward-to', + 'http://127.0.0.1:1/unreachable', + ], + { from: 'user' }, + ) + .catch(() => {}); + + await new Promise((r) => setTimeout(r, 500)); + + const result = await postJSON(listenerPort, { + type: 'email.sent', + data: { id: 'evt_4' }, + }); + + expect(result.status).toBe(502); + expect(result.body).toBe('Forward target unreachable'); + }); +}); diff --git a/tests/commands/webhooks/resolve-upstream.test.ts b/tests/commands/webhooks/resolve-upstream.test.ts new file mode 100644 index 00000000..c44e076a --- /dev/null +++ b/tests/commands/webhooks/resolve-upstream.test.ts @@ -0,0 +1,73 @@ +import { describe, expect, it } from 'vitest'; +import { resolveUpstream } from '../../../src/commands/webhooks/resolve-upstream'; + +describe('resolveUpstream', () => { + it('returns 200 OK when no forward result is provided', () => { + expect(resolveUpstream(undefined)).toEqual({ status: 200, body: 'OK' }); + }); + + it('returns 200 OK for a 2xx downstream status', () => { + expect(resolveUpstream({ status: 200 })).toEqual({ + status: 200, + body: 'OK', + }); + expect(resolveUpstream({ status: 201 })).toEqual({ + status: 200, + body: 'OK', + }); + expect(resolveUpstream({ status: 204 })).toEqual({ + status: 200, + body: 'OK', + }); + }); + + it('propagates downstream 4xx status', () => { + expect(resolveUpstream({ status: 401 })).toEqual({ + status: 401, + body: 'Forward target failed', + }); + expect(resolveUpstream({ status: 404 })).toEqual({ + status: 404, + body: 'Forward target failed', + }); + }); + + it('propagates downstream 5xx status', () => { + expect(resolveUpstream({ status: 500 })).toEqual({ + status: 500, + body: 'Forward target failed', + }); + expect(resolveUpstream({ status: 503 })).toEqual({ + status: 503, + body: 'Forward target failed', + }); + }); + + it('returns 502 for a forwarding error', () => { + expect(resolveUpstream({ error: 'Connection refused' })).toEqual({ + status: 502, + body: 'Forward target unreachable', + }); + }); + + it('returns 502 for an unknown forwarding error', () => { + expect(resolveUpstream({ error: 'Unknown error' })).toEqual({ + status: 502, + body: 'Forward target unreachable', + }); + }); + + it('treats boundary status 300 as failure', () => { + expect(resolveUpstream({ status: 300 })).toEqual({ + status: 300, + body: 'Forward target failed', + }); + }); + + it('treats boundary status 199 as failure', () => { + expect(resolveUpstream({ status: 199 })).toEqual({ + status: 199, + body: 'Forward target failed', + }); + }); +}); From 8a9b7b73bdf7be72671a3fe624c636bd1e46d1f6 Mon Sep 17 00:00:00 2001 From: Bu Kinoshita Date: Mon, 13 Apr 2026 22:11:19 +0000 Subject: [PATCH 2/3] fix: minor tweak --- tests/commands/webhooks/listen-forward.test.ts | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/tests/commands/webhooks/listen-forward.test.ts b/tests/commands/webhooks/listen-forward.test.ts index ffab80ee..b01e9d08 100644 --- a/tests/commands/webhooks/listen-forward.test.ts +++ b/tests/commands/webhooks/listen-forward.test.ts @@ -47,7 +47,6 @@ const startTargetServer = ( describe('webhook listen --forward-to status propagation', () => { const restoreEnv = captureTestEnv(); - let listenerAbort: AbortController | undefined; let targetServer: Server | undefined; beforeEach(() => { @@ -56,10 +55,16 @@ describe('webhook listen --forward-to status propagation', () => { mockRemove.mockClear(); }); - afterEach(() => { + afterEach(async () => { restoreEnv(); - listenerAbort?.abort(); - listenerAbort = undefined; + + const exitStub = vi + .spyOn(process, 'exit') + .mockImplementation((() => {}) as never); + process.emit('SIGTERM', 'SIGTERM'); + await new Promise((r) => setTimeout(r, 200)); + exitStub.mockRestore(); + targetServer?.close(); targetServer = undefined; process.removeAllListeners('SIGINT'); @@ -76,8 +81,6 @@ describe('webhook listen --forward-to status propagation', () => { const listenerPort = 10_000 + Math.floor(Math.random() * 50_000); - listenerAbort = new AbortController(); - listenWebhookCommand .parseAsync( [ @@ -149,8 +152,6 @@ describe('webhook listen --forward-to status propagation', () => { const listenerPort = 10_000 + Math.floor(Math.random() * 50_000); - listenerAbort = new AbortController(); - listenWebhookCommand .parseAsync( [ From 1d52c6d00ee4b9e320280da472da20d573c1d990 Mon Sep 17 00:00:00 2001 From: Felipe Freitag Date: Tue, 14 Apr 2026 15:47:22 -0300 Subject: [PATCH 3/3] fix: distinguish timeout from unreachable, harden request handler - Export ForwardResult type from resolve-upstream and reuse in listen.ts - Detect AbortSignal timeout (DOMException/TimeoutError) and map to 504 - Wrap request handler in try/catch to prevent unhandled rejections from readBody --- src/commands/webhooks/listen.ts | 155 +++++++++--------- src/commands/webhooks/resolve-upstream.ts | 17 +- .../webhooks/resolve-upstream.test.ts | 9 + 3 files changed, 101 insertions(+), 80 deletions(-) diff --git a/src/commands/webhooks/listen.ts b/src/commands/webhooks/listen.ts index 7880527e..61e03a34 100644 --- a/src/commands/webhooks/listen.ts +++ b/src/commands/webhooks/listen.ts @@ -15,7 +15,7 @@ import { requireText } from '../../lib/prompts'; import { safeTerminalText } from '../../lib/safe-terminal-text'; import { createSpinner } from '../../lib/spinner'; import { isInteractive } from '../../lib/tty'; -import { resolveUpstream } from './resolve-upstream'; +import { type ForwardResult, resolveUpstream } from './resolve-upstream'; import { ALL_WEBHOOK_EVENTS, normalizeEvents } from './utils'; const SVIX_HEADERS = ['svix-id', 'svix-timestamp', 'svix-signature']; @@ -77,6 +77,7 @@ function statusText(code: number): string { 500: 'Internal Server Error', 502: 'Bad Gateway', 503: 'Service Unavailable', + 504: 'Gateway Timeout', }; return map[code] ?? ''; } @@ -208,90 +209,96 @@ For example, if using ngrok: ngrok http 4318`, // Start local server const server = createServer( async (req: IncomingMessage, res: ServerResponse) => { - if (req.method !== 'POST') { - res.writeHead(405).end('Method not allowed'); - return; - } - - const rawBody = await readBody(req); - let body: Record; try { - body = JSON.parse(rawBody); - } catch { - res.writeHead(400).end('Invalid JSON'); - return; - } - - const svixHeaders: Record = {}; - for (const h of SVIX_HEADERS) { - const val = req.headers[h]; - svixHeaders[h] = Array.isArray(val) ? val[0] : val; - } - - const { type, resourceId, detail } = summarizeEvent(body); - - let forwardResult: - | { readonly status: number } - | { readonly error: string } - | undefined; + if (req.method !== 'POST') { + res.writeHead(405).end('Method not allowed'); + return; + } - if (opts.forwardTo) { + const rawBody = await readBody(req); + let body: Record; try { - const { status } = await forwardPayload( - opts.forwardTo, - rawBody, - svixHeaders, - ); - forwardResult = { status }; - } catch (err) { - forwardResult = { - error: err instanceof Error ? err.message : 'Unknown error', - }; + body = JSON.parse(rawBody); + } catch { + res.writeHead(400).end('Invalid JSON'); + return; } - } - if (jsonMode) { - const entry: Record = { - timestamp: new Date().toISOString(), - type, - resource_id: resourceId, - payload: body, - }; - - if (opts.forwardTo && forwardResult) { - entry.forwarded = - 'error' in forwardResult - ? { url: opts.forwardTo, error: forwardResult.error } - : { url: opts.forwardTo, status: forwardResult.status }; + const svixHeaders: Record = {}; + for (const h of SVIX_HEADERS) { + const val = req.headers[h]; + svixHeaders[h] = Array.isArray(val) ? val[0] : val; } - console.log(JSON.stringify(entry)); - } else { - const ts = pc.dim(`[${timestamp()}]`); - const typePad = type.padEnd(20); - const idPad = resourceId.padEnd(14); - process.stderr.write( - `${ts} ${pc.bold(typePad)} ${pc.cyan(idPad)} ${detail}\n`, - ); - - if (opts.forwardTo && forwardResult) { - const target = opts.forwardTo.startsWith('http') - ? opts.forwardTo - : `http://${opts.forwardTo}`; - if ('error' in forwardResult) { - process.stderr.write( - `${pc.dim(' -> POST')} ${target} ${pc.red(`[Error: ${forwardResult.error}]`)}\n`, - ); - } else { - process.stderr.write( - `${pc.dim(' -> POST')} ${target} ${pc.dim(`[${formatStatus(forwardResult.status)}]`)}\n`, + const { type, resourceId, detail } = summarizeEvent(body); + + let forwardResult: ForwardResult | undefined; + + if (opts.forwardTo) { + try { + const { status } = await forwardPayload( + opts.forwardTo, + rawBody, + svixHeaders, ); + forwardResult = { status }; + } catch (err) { + const isTimeout = + err instanceof DOMException && err.name === 'TimeoutError'; + forwardResult = { + error: err instanceof Error ? err.message : 'Unknown error', + timeout: isTimeout, + }; + } + } + + if (jsonMode) { + const entry: Record = { + timestamp: new Date().toISOString(), + type, + resource_id: resourceId, + payload: body, + }; + + if (opts.forwardTo && forwardResult) { + entry.forwarded = + 'error' in forwardResult + ? { url: opts.forwardTo, error: forwardResult.error } + : { url: opts.forwardTo, status: forwardResult.status }; + } + + console.log(JSON.stringify(entry)); + } else { + const ts = pc.dim(`[${timestamp()}]`); + const typePad = type.padEnd(20); + const idPad = resourceId.padEnd(14); + process.stderr.write( + `${ts} ${pc.bold(typePad)} ${pc.cyan(idPad)} ${detail}\n`, + ); + + if (opts.forwardTo && forwardResult) { + const target = opts.forwardTo.startsWith('http') + ? opts.forwardTo + : `http://${opts.forwardTo}`; + if ('error' in forwardResult) { + process.stderr.write( + `${pc.dim(' -> POST')} ${target} ${pc.red(`[Error: ${forwardResult.error}]`)}\n`, + ); + } else { + process.stderr.write( + `${pc.dim(' -> POST')} ${target} ${pc.dim(`[${formatStatus(forwardResult.status)}]`)}\n`, + ); + } } } - } - const upstream = resolveUpstream(forwardResult); - res.writeHead(upstream.status).end(upstream.body); + const upstream = resolveUpstream(forwardResult); + res.writeHead(upstream.status).end(upstream.body); + } catch { + if (!res.headersSent) { + res.writeHead(500).end('Internal error'); + } + } }, ); diff --git a/src/commands/webhooks/resolve-upstream.ts b/src/commands/webhooks/resolve-upstream.ts index fdac2ecc..51cbd6b0 100644 --- a/src/commands/webhooks/resolve-upstream.ts +++ b/src/commands/webhooks/resolve-upstream.ts @@ -1,15 +1,20 @@ -type ForwardResult = { readonly status: number } | { readonly error: string }; +export type ForwardResult = + | { readonly status: number } + | { readonly error: string; readonly timeout?: boolean }; -export const resolveUpstream = ( - result: ForwardResult | undefined, -): { readonly status: number; readonly body: string } => { +export function resolveUpstream(result: ForwardResult | undefined): { + readonly status: number; + readonly body: string; +} { if (!result) { return { status: 200, body: 'OK' }; } if ('error' in result) { - return { status: 502, body: 'Forward target unreachable' }; + return result.timeout + ? { status: 504, body: 'Forward target timed out' } + : { status: 502, body: 'Forward target unreachable' }; } return result.status >= 200 && result.status < 300 ? { status: 200, body: 'OK' } : { status: result.status, body: 'Forward target failed' }; -}; +} diff --git a/tests/commands/webhooks/resolve-upstream.test.ts b/tests/commands/webhooks/resolve-upstream.test.ts index c44e076a..b3e20263 100644 --- a/tests/commands/webhooks/resolve-upstream.test.ts +++ b/tests/commands/webhooks/resolve-upstream.test.ts @@ -57,6 +57,15 @@ describe('resolveUpstream', () => { }); }); + it('returns 504 for a timeout error', () => { + expect( + resolveUpstream({ error: 'The operation timed out', timeout: true }), + ).toEqual({ + status: 504, + body: 'Forward target timed out', + }); + }); + it('treats boundary status 300 as failure', () => { expect(resolveUpstream({ status: 300 })).toEqual({ status: 300,