Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 6 additions & 12 deletions apps/app/src/components/QueryInvalidationSubscriber.tsx
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
'use client';

import { trpc } from '@op/api/client';
import type { ChannelName, RegistryEvents } from '@op/common/realtime';
import { queryChannelRegistry } from '@op/common/realtime';
import { RealtimeManager } from '@op/realtime/client';
Expand Down Expand Up @@ -41,15 +40,12 @@ export function QueryInvalidationSubscriber() {
function useInvalidateQueries(): void {
const queryClient = useRequiredQueryClient();
const invalidatedMutationIds = useRef(new Set<string>());
const utils = trpc.useUtils();
const unsubscribersRef = useRef<Map<ChannelName, () => void>>(new Map());
const initializedRef = useRef(false);

// Store refs to avoid effect re-runs
const queryClientRef = useRef(queryClient);
const utilsRef = useRef(utils);
queryClientRef.current = queryClient;
utilsRef.current = utils;

const handleInvalidation = useCallback(
async ({ channels, mutationId }: RegistryEvents['mutation:added']) => {
Expand Down Expand Up @@ -87,21 +83,19 @@ function useInvalidateQueries(): void {
* Subscribe to WebSocket channel when a query registers interest in channels
*/
useEffect(() => {
const wsUrl = process.env.NEXT_PUBLIC_CENTRIFUGO_WS_URL;
const supabaseUrl = process.env.NEXT_PUBLIC_SUPABASE_URL;
const supabaseAnonKey = process.env.NEXT_PUBLIC_SUPABASE_ANON_KEY;

// Skip realtime subscriptions entirely if WebSocket URL is not configured
if (!wsUrl) {
// Skip realtime subscriptions entirely if Supabase config is not available
if (!supabaseUrl || !supabaseAnonKey) {
return;
}

// Initialize RealtimeManager only once
if (!initializedRef.current) {
RealtimeManager.initialize({
wsUrl,
getToken: async () => {
const { token } = await utilsRef.current.realtime.getToken.fetch();
return token;
},
supabaseUrl,
supabaseAnonKey,
});
initializedRef.current = true;
}
Expand Down
14 changes: 3 additions & 11 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion services/realtime/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
},
"dependencies": {
"@op/common": "workspace:*",
"centrifuge": "^5.5.2",
"@supabase/supabase-js": "^2.49.3",
"jsonwebtoken": "^9.0.2",
"zod": "catalog:"
},
Expand Down
6 changes: 5 additions & 1 deletion services/realtime/src/client/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
export { type RealtimeConfig, RealtimeManager } from './manager';
export {
type RealtimeConfig,
type RealtimeHandler,
RealtimeManager,
} from './manager';
export type { RealtimeMessage } from '../schemas';
126 changes: 9 additions & 117 deletions services/realtime/src/client/manager.test.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,21 @@
import { Channels } from '@op/common/realtime';
import { afterAll, beforeAll, describe, expect, it } from 'vitest';
import WebSocket from 'ws';

import type { RealtimeMessage } from '../schemas';
import { RealtimeClient } from '../server/client';
import { generateConnectionToken } from '../server/token';
import { type RealtimeHandler, RealtimeManager } from './manager';

// Make WebSocket available globally for Centrifuge
global.WebSocket = WebSocket as any;

describe.concurrent('RealtimeManager', () => {
let realtimeClient: RealtimeClient;
const WS_URL = process.env.CENTRIFUGO_WS_URL!;
const API_URL = process.env.CENTRIFUGO_API_URL!;
const API_KEY = process.env.CENTRIFUGO_API_KEY!;
const SUPABASE_URL = process.env.SUPABASE_URL!;
const SUPABASE_ANON_KEY = process.env.SUPABASE_ANON_KEY!;
const SUPABASE_SERVICE_ROLE_KEY = process.env.SUPABASE_SERVICE_ROLE_KEY!;

beforeAll(() => {
// Initialize the server client for publishing messages
realtimeClient = new RealtimeClient({
apiUrl: API_URL,
apiKey: API_KEY,
supabaseUrl: SUPABASE_URL,
serviceRoleKey: SUPABASE_SERVICE_ROLE_KEY,
});
});

Expand All @@ -32,18 +27,12 @@ describe.concurrent('RealtimeManager', () => {
});

it('should connect, subscribe to a channel, and receive published messages', async () => {
const TEST_USER_ID = 'test-user-connect';
const TEST_CHANNEL = Channels.org('test-connect');

// Generate a real token using the token generation function
const getToken = async () => {
return generateConnectionToken(TEST_USER_ID);
};

// Initialize the RealtimeManager
RealtimeManager.initialize({
wsUrl: WS_URL,
getToken,
supabaseUrl: SUPABASE_URL,
supabaseAnonKey: SUPABASE_ANON_KEY,
});

const manager = RealtimeManager.getInstance();
Expand Down Expand Up @@ -81,113 +70,16 @@ describe.concurrent('RealtimeManager', () => {
expect(receivedMessage).toEqual(testMessage);
});

it.each([
{ token: '', description: 'empty token' },
{ token: 'random-token', description: 'invalid token' },
])(
'should not allow unauthenticated users to connect to channels with $description',
async ({ token, description }) => {
const TEST_CHANNEL = Channels.org(
`test-unauth-${description.replace(/\s+/g, '-')}`,
);

// Create a new instance to avoid interference with other tests
// Force reset the singleton by accessing private static property
(RealtimeManager as any).instance = null;

// Initialize with an invalid/empty token to simulate unauthenticated user
const getToken = async () => {
return token;
};

RealtimeManager.initialize({
wsUrl: WS_URL,
getToken,
});

const manager = RealtimeManager.getInstance();

// Track disconnect reason to verify it's auth-related
let disconnectReason: any = null;

const disconnectionPromise = new Promise<boolean>((resolve) => {
manager.addConnectionListener((isConnected: boolean) => {
if (!isConnected) {
resolve(true);
}
});

// If no disconnection happens within 1 second, assume connection might have succeeded
setTimeout(() => resolve(false), 1000);
});

// Trigger connection and hook into centrifuge events immediately
const messageReceived = new Promise<boolean>((resolve) => {
const handler = () => {
resolve(true);
};

// Attempt to subscribe - this triggers ensureConnected()
manager.subscribe(TEST_CHANNEL, handler);

// Immediately after subscribe, access centrifuge and hook into events
// Use setTimeout to ensure centrifuge is initialized
setTimeout(() => {
const centrifugeInstance = (manager as any).centrifuge;
if (centrifugeInstance) {
centrifugeInstance.on('disconnected', (ctx: any) => {
disconnectReason = ctx;
});
}
}, 0);

// Set a timer to resolve false if no message is received
setTimeout(() => resolve(false), 1000);
});

// Wait for disconnection or timeout
const disconnected = await disconnectionPromise;

// Publish a test message
const testMessage: RealtimeMessage = { mutationId: 'test-mutation-2' };

await realtimeClient.publish({
channel: TEST_CHANNEL,
data: testMessage,
});

// Wait to see if the message is received
const received = await messageReceived;

// Verify that either:
// 1. The connection was never established (disconnected)
// 2. No message was received (because connection failed)
expect(disconnected || !received).toBe(true);

// Verify the disconnect reason indicates authentication failure
expect(disconnectReason).toBeDefined();
// Both 3500 (invalid token) and 3501 (bad request) are auth-related errors
expect([3500, 3501]).toContain(disconnectReason.code);
expect(disconnectReason.reason).toBeDefined();
},
);

it('should deliver messages to multiple subscribers on the same channel', async () => {
const TEST_USER_ID = 'test-user-multiple';
const TEST_CHANNEL = Channels.org('test-multiple-subscribers');

// Reset the singleton
(RealtimeManager as any).instance = null;

// Generate a real token
const getToken = async () => {
return generateConnectionToken(TEST_USER_ID);
};

// Initialize the RealtimeManager
RealtimeManager.initialize({
wsUrl: WS_URL,
getToken,
supabaseUrl: SUPABASE_URL,
supabaseAnonKey: SUPABASE_ANON_KEY,
});

const manager = RealtimeManager.getInstance();
Expand Down
Loading
Loading