Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 36 additions & 26 deletions src/commands/webhooks/listen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@ import { outputError } from '../../lib/output';
import { requireText } from '../../lib/prompts';
import { createSpinner } from '../../lib/spinner';
import { isInteractive } from '../../lib/tty';
import { resolveUpstream } from './resolve-upstream';
import { ALL_WEBHOOK_EVENTS, normalizeEvents } from './utils';

const SVIX_HEADERS = ['svix-id', 'svix-timestamp', 'svix-signature'];

const FORWARD_TIMEOUT_MS = 30_000;

function timestamp(): string {
return new Date().toLocaleTimeString('en-GB', { hour12: false });
}
Expand Down Expand Up @@ -95,6 +98,7 @@ async function forwardPayload(
method: 'POST',
headers: forwardHeaders,
body: rawBody,
signal: AbortSignal.timeout(FORWARD_TIMEOUT_MS),
});
return { status: resp.status };
}
Expand Down Expand Up @@ -223,6 +227,26 @@ For example, if using ngrok: ngrok http 4318`,

const { type, resourceId, detail } = summarizeEvent(body);

let forwardResult:
| { readonly status: number }
| { readonly error: string }
| undefined;

if (opts.forwardTo) {
try {
const { status } = await forwardPayload(
opts.forwardTo,
rawBody,
svixHeaders,
);
forwardResult = { status };
} catch (err) {
forwardResult = {
error: err instanceof Error ? err.message : 'Unknown error',
};
}
}

if (jsonMode) {
const entry: Record<string, unknown> = {
timestamp: new Date().toISOString(),
Expand All @@ -231,20 +255,11 @@ For example, if using ngrok: ngrok http 4318`,
payload: body,
};

if (opts.forwardTo) {
try {
const { status } = await forwardPayload(
opts.forwardTo,
rawBody,
svixHeaders,
);
entry.forwarded = { url: opts.forwardTo, status };
} catch (err) {
entry.forwarded = {
url: opts.forwardTo,
error: err instanceof Error ? err.message : 'Unknown error',
};
}
if (opts.forwardTo && forwardResult) {
entry.forwarded =
'error' in forwardResult
? { url: opts.forwardTo, error: forwardResult.error }
: { url: opts.forwardTo, status: forwardResult.status };
}

console.log(JSON.stringify(entry));
Expand All @@ -256,29 +271,24 @@ For example, if using ngrok: ngrok http 4318`,
`${ts} ${pc.bold(typePad)} ${pc.cyan(idPad)} ${detail}\n`,
);

if (opts.forwardTo) {
if (opts.forwardTo && forwardResult) {
const target = opts.forwardTo.startsWith('http')
? opts.forwardTo
: `http://${opts.forwardTo}`;
try {
const { status } = await forwardPayload(
opts.forwardTo,
rawBody,
svixHeaders,
);
if ('error' in forwardResult) {
process.stderr.write(
`${pc.dim(' -> POST')} ${target} ${pc.dim(`[${formatStatus(status)}]`)}\n`,
`${pc.dim(' -> POST')} ${target} ${pc.red(`[Error: ${forwardResult.error}]`)}\n`,
);
} catch (err) {
const msg = err instanceof Error ? err.message : 'Unknown error';
} else {
process.stderr.write(
`${pc.dim(' -> POST')} ${target} ${pc.red(`[Error: ${msg}]`)}\n`,
`${pc.dim(' -> POST')} ${target} ${pc.dim(`[${formatStatus(forwardResult.status)}]`)}\n`,
);
}
}
}

res.writeHead(200).end('OK');
const upstream = resolveUpstream(forwardResult);
res.writeHead(upstream.status).end(upstream.body);
},
);

Expand Down
15 changes: 15 additions & 0 deletions src/commands/webhooks/resolve-upstream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
type ForwardResult = { readonly status: number } | { readonly error: string };

export const resolveUpstream = (
result: ForwardResult | undefined,
): { readonly status: number; readonly body: string } => {
if (!result) {
return { status: 200, body: 'OK' };
}
if ('error' in result) {
return { status: 502, body: 'Forward target unreachable' };
}
return result.status >= 200 && result.status < 300
? { status: 200, body: 'OK' }
: { status: result.status, body: 'Forward target failed' };
};
178 changes: 178 additions & 0 deletions tests/commands/webhooks/listen-forward.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
import { createServer, type Server } from 'node:http';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import {
captureTestEnv,
setNonInteractive,
setupOutputSpies,
} from '../../helpers';

const mockCreate = vi.fn(async () => ({
data: { object: 'webhook' as const, id: 'wh_test', signing_secret: 'sec' },
error: null,
}));
const mockRemove = vi.fn(async () => ({ data: null, error: null }));

vi.mock('resend', () => ({
Resend: class MockResend {
constructor(public key: string) {}
webhooks = { create: mockCreate, remove: mockRemove };
},
}));

const postJSON = async (
port: number,
body: Record<string, unknown>,
): Promise<{ status: number; body: string }> => {
const resp = await fetch(`http://127.0.0.1:${port}`, {
method: 'POST',
headers: { 'content-type': 'application/json' },
body: JSON.stringify(body),
});
return { status: resp.status, body: await resp.text() };
};

const startTargetServer = (
responseStatus: number,
): Promise<{ server: Server; port: number }> =>
new Promise((resolve) => {
const server = createServer((_req, res) => {
res.writeHead(responseStatus).end('target response');
});
server.listen(0, '127.0.0.1', () => {
const addr = server.address();
const port = typeof addr === 'object' && addr ? addr.port : 0;
resolve({ server, port });
});
});

describe('webhook listen --forward-to status propagation', () => {
const restoreEnv = captureTestEnv();
let listenerAbort: AbortController | undefined;
let targetServer: Server | undefined;

beforeEach(() => {
process.env.RESEND_API_KEY = 're_test_key';
mockCreate.mockClear();
mockRemove.mockClear();
});

afterEach(() => {
restoreEnv();
listenerAbort?.abort();
listenerAbort = undefined;
targetServer?.close();
targetServer = undefined;
process.removeAllListeners('SIGINT');
process.removeAllListeners('SIGTERM');
});

const startListener = async (forwardPort: number): Promise<number> => {
setNonInteractive();
setupOutputSpies();

const { listenWebhookCommand } = await import(
'../../../src/commands/webhooks/listen'
);

const listenerPort = 10_000 + Math.floor(Math.random() * 50_000);

listenerAbort = new AbortController();

listenWebhookCommand
.parseAsync(
[
'--url',
'https://tunnel.example.com',
'--port',
String(listenerPort),
'--forward-to',
`http://127.0.0.1:${forwardPort}/webhook`,
],
{ from: 'user' },
)
.catch(() => {});

await new Promise((r) => setTimeout(r, 500));

return listenerPort;
};

it('returns 200 when forward target responds 200', async () => {
const target = await startTargetServer(200);
targetServer = target.server;
const listenerPort = await startListener(target.port);

const result = await postJSON(listenerPort, {
type: 'email.sent',
data: { id: 'evt_1' },
});

expect(result.status).toBe(200);
expect(result.body).toBe('OK');
});

it('propagates 500 when forward target responds 500', async () => {
const target = await startTargetServer(500);
targetServer = target.server;
const listenerPort = await startListener(target.port);

const result = await postJSON(listenerPort, {
type: 'email.sent',
data: { id: 'evt_2' },
});

expect(result.status).toBe(500);
expect(result.body).toBe('Forward target failed');
});

it('propagates 401 when forward target responds 401', async () => {
const target = await startTargetServer(401);
targetServer = target.server;
const listenerPort = await startListener(target.port);

const result = await postJSON(listenerPort, {
type: 'email.sent',
data: { id: 'evt_3' },
});

expect(result.status).toBe(401);
expect(result.body).toBe('Forward target failed');
});

it('returns 502 when forward target is unreachable', async () => {
setNonInteractive();
setupOutputSpies();

const { listenWebhookCommand } = await import(
'../../../src/commands/webhooks/listen'
);

const listenerPort = 10_000 + Math.floor(Math.random() * 50_000);

listenerAbort = new AbortController();

listenWebhookCommand
.parseAsync(
[
'--url',
'https://tunnel.example.com',
'--port',
String(listenerPort),
'--forward-to',
'http://127.0.0.1:1/unreachable',
],
{ from: 'user' },
)
.catch(() => {});

await new Promise((r) => setTimeout(r, 500));

const result = await postJSON(listenerPort, {
type: 'email.sent',
data: { id: 'evt_4' },
});

expect(result.status).toBe(502);
expect(result.body).toBe('Forward target unreachable');
});
});
73 changes: 73 additions & 0 deletions tests/commands/webhooks/resolve-upstream.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import { describe, expect, it } from 'vitest';
import { resolveUpstream } from '../../../src/commands/webhooks/resolve-upstream';

describe('resolveUpstream', () => {
it('returns 200 OK when no forward result is provided', () => {
expect(resolveUpstream(undefined)).toEqual({ status: 200, body: 'OK' });
});

it('returns 200 OK for a 2xx downstream status', () => {
expect(resolveUpstream({ status: 200 })).toEqual({
status: 200,
body: 'OK',
});
expect(resolveUpstream({ status: 201 })).toEqual({
status: 200,
body: 'OK',
});
expect(resolveUpstream({ status: 204 })).toEqual({
status: 200,
body: 'OK',
});
});

it('propagates downstream 4xx status', () => {
expect(resolveUpstream({ status: 401 })).toEqual({
status: 401,
body: 'Forward target failed',
});
expect(resolveUpstream({ status: 404 })).toEqual({
status: 404,
body: 'Forward target failed',
});
});

it('propagates downstream 5xx status', () => {
expect(resolveUpstream({ status: 500 })).toEqual({
status: 500,
body: 'Forward target failed',
});
expect(resolveUpstream({ status: 503 })).toEqual({
status: 503,
body: 'Forward target failed',
});
});

it('returns 502 for a forwarding error', () => {
expect(resolveUpstream({ error: 'Connection refused' })).toEqual({
status: 502,
body: 'Forward target unreachable',
});
});

it('returns 502 for an unknown forwarding error', () => {
expect(resolveUpstream({ error: 'Unknown error' })).toEqual({
status: 502,
body: 'Forward target unreachable',
});
});

it('treats boundary status 300 as failure', () => {
expect(resolveUpstream({ status: 300 })).toEqual({
status: 300,
body: 'Forward target failed',
});
});

it('treats boundary status 199 as failure', () => {
expect(resolveUpstream({ status: 199 })).toEqual({
status: 199,
body: 'Forward target failed',
});
});
});
Loading