diff --git a/apps/app/src/components/QueryInvalidationSubscriber.tsx b/apps/app/src/components/QueryInvalidationSubscriber.tsx index d529cec7e..0d549b0c1 100644 --- a/apps/app/src/components/QueryInvalidationSubscriber.tsx +++ b/apps/app/src/components/QueryInvalidationSubscriber.tsx @@ -1,6 +1,5 @@ 'use client'; -import { trpc } from '@op/api/client'; import type { ChannelName, RegistryEvents } from '@op/common/realtime'; import { queryChannelRegistry } from '@op/common/realtime'; import { RealtimeManager } from '@op/realtime/client'; @@ -41,15 +40,12 @@ export function QueryInvalidationSubscriber() { function useInvalidateQueries(): void { const queryClient = useRequiredQueryClient(); const invalidatedMutationIds = useRef(new Set()); - const utils = trpc.useUtils(); const unsubscribersRef = useRef void>>(new Map()); const initializedRef = useRef(false); // Store refs to avoid effect re-runs const queryClientRef = useRef(queryClient); - const utilsRef = useRef(utils); queryClientRef.current = queryClient; - utilsRef.current = utils; const handleInvalidation = useCallback( async ({ channels, mutationId }: RegistryEvents['mutation:added']) => { @@ -87,21 +83,19 @@ function useInvalidateQueries(): void { * Subscribe to WebSocket channel when a query registers interest in channels */ useEffect(() => { - const wsUrl = process.env.NEXT_PUBLIC_CENTRIFUGO_WS_URL; + const supabaseUrl = process.env.NEXT_PUBLIC_SUPABASE_URL; + const supabaseAnonKey = process.env.NEXT_PUBLIC_SUPABASE_ANON_KEY; - // Skip realtime subscriptions entirely if WebSocket URL is not configured - if (!wsUrl) { + // Skip realtime subscriptions entirely if Supabase config is not available + if (!supabaseUrl || !supabaseAnonKey) { return; } // Initialize RealtimeManager only once if (!initializedRef.current) { RealtimeManager.initialize({ - wsUrl, - getToken: async () => { - const { token } = await utilsRef.current.realtime.getToken.fetch(); - return token; - }, + supabaseUrl, + supabaseAnonKey, }); initializedRef.current = true; } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index f8aba9c30..40503237c 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1395,9 +1395,9 @@ importers: '@op/common': specifier: workspace:* version: link:../../packages/common - centrifuge: - specifier: ^5.5.2 - version: 5.5.2 + '@supabase/supabase-js': + specifier: ^2.49.3 + version: 2.49.3 jsonwebtoken: specifier: ^9.0.2 version: 9.0.2 @@ -5825,9 +5825,6 @@ packages: ccount@2.0.1: resolution: {integrity: sha512-eyrF0jiFpY+3drT6383f1qhkbGsLSifNAjA61IUjZjmLCWjItY6LB9ft9YhoDgwfmclB2zhu51Lc7+95b8NRAg==} - centrifuge@5.5.2: - resolution: {integrity: sha512-ewr0sYidOWKevUq9WyaddlYSg3q2Xmxc+OLl5xjV6WFTWOb0IChJjdNaUhkqL5JbFCsIe/eo3KXinKrm7JwUcA==} - chai@5.2.1: resolution: {integrity: sha512-5nFxhUrX0PqtyogoYOA8IPswy5sZFTOsBFl/9bNsmDLgsxYTzSZQJDPppDnZPTQbzSEm0hqGjWPzRemQCYbD6A==} engines: {node: '>=18'} @@ -14761,11 +14758,6 @@ snapshots: ccount@2.0.1: {} - centrifuge@5.5.2: - dependencies: - events: 3.3.0 - protobufjs: 7.5.4 - chai@5.2.1: dependencies: assertion-error: 2.0.1 diff --git a/services/realtime/package.json b/services/realtime/package.json index 9870ac935..df80e65f1 100644 --- a/services/realtime/package.json +++ b/services/realtime/package.json @@ -17,7 +17,7 @@ }, "dependencies": { "@op/common": "workspace:*", - "centrifuge": "^5.5.2", + "@supabase/supabase-js": "^2.49.3", "jsonwebtoken": "^9.0.2", "zod": "catalog:" }, diff --git a/services/realtime/src/client/index.ts b/services/realtime/src/client/index.ts index 92ec3cca0..fbc14e39b 100644 --- a/services/realtime/src/client/index.ts +++ b/services/realtime/src/client/index.ts @@ -1,2 +1,6 @@ -export { type RealtimeConfig, RealtimeManager } from './manager'; +export { + type RealtimeConfig, + type RealtimeHandler, + RealtimeManager, +} from './manager'; export type { RealtimeMessage } from '../schemas'; diff --git a/services/realtime/src/client/manager.test.ts b/services/realtime/src/client/manager.test.ts index 8db80fac4..480178eae 100644 --- a/services/realtime/src/client/manager.test.ts +++ b/services/realtime/src/client/manager.test.ts @@ -1,26 +1,21 @@ import { Channels } from '@op/common/realtime'; import { afterAll, beforeAll, describe, expect, it } from 'vitest'; -import WebSocket from 'ws'; import type { RealtimeMessage } from '../schemas'; import { RealtimeClient } from '../server/client'; -import { generateConnectionToken } from '../server/token'; import { type RealtimeHandler, RealtimeManager } from './manager'; -// Make WebSocket available globally for Centrifuge -global.WebSocket = WebSocket as any; - describe.concurrent('RealtimeManager', () => { let realtimeClient: RealtimeClient; - const WS_URL = process.env.CENTRIFUGO_WS_URL!; - const API_URL = process.env.CENTRIFUGO_API_URL!; - const API_KEY = process.env.CENTRIFUGO_API_KEY!; + const SUPABASE_URL = process.env.SUPABASE_URL!; + const SUPABASE_ANON_KEY = process.env.SUPABASE_ANON_KEY!; + const SUPABASE_SERVICE_ROLE_KEY = process.env.SUPABASE_SERVICE_ROLE_KEY!; beforeAll(() => { // Initialize the server client for publishing messages realtimeClient = new RealtimeClient({ - apiUrl: API_URL, - apiKey: API_KEY, + supabaseUrl: SUPABASE_URL, + serviceRoleKey: SUPABASE_SERVICE_ROLE_KEY, }); }); @@ -32,18 +27,12 @@ describe.concurrent('RealtimeManager', () => { }); it('should connect, subscribe to a channel, and receive published messages', async () => { - const TEST_USER_ID = 'test-user-connect'; const TEST_CHANNEL = Channels.org('test-connect'); - // Generate a real token using the token generation function - const getToken = async () => { - return generateConnectionToken(TEST_USER_ID); - }; - // Initialize the RealtimeManager RealtimeManager.initialize({ - wsUrl: WS_URL, - getToken, + supabaseUrl: SUPABASE_URL, + supabaseAnonKey: SUPABASE_ANON_KEY, }); const manager = RealtimeManager.getInstance(); @@ -81,113 +70,16 @@ describe.concurrent('RealtimeManager', () => { expect(receivedMessage).toEqual(testMessage); }); - it.each([ - { token: '', description: 'empty token' }, - { token: 'random-token', description: 'invalid token' }, - ])( - 'should not allow unauthenticated users to connect to channels with $description', - async ({ token, description }) => { - const TEST_CHANNEL = Channels.org( - `test-unauth-${description.replace(/\s+/g, '-')}`, - ); - - // Create a new instance to avoid interference with other tests - // Force reset the singleton by accessing private static property - (RealtimeManager as any).instance = null; - - // Initialize with an invalid/empty token to simulate unauthenticated user - const getToken = async () => { - return token; - }; - - RealtimeManager.initialize({ - wsUrl: WS_URL, - getToken, - }); - - const manager = RealtimeManager.getInstance(); - - // Track disconnect reason to verify it's auth-related - let disconnectReason: any = null; - - const disconnectionPromise = new Promise((resolve) => { - manager.addConnectionListener((isConnected: boolean) => { - if (!isConnected) { - resolve(true); - } - }); - - // If no disconnection happens within 1 second, assume connection might have succeeded - setTimeout(() => resolve(false), 1000); - }); - - // Trigger connection and hook into centrifuge events immediately - const messageReceived = new Promise((resolve) => { - const handler = () => { - resolve(true); - }; - - // Attempt to subscribe - this triggers ensureConnected() - manager.subscribe(TEST_CHANNEL, handler); - - // Immediately after subscribe, access centrifuge and hook into events - // Use setTimeout to ensure centrifuge is initialized - setTimeout(() => { - const centrifugeInstance = (manager as any).centrifuge; - if (centrifugeInstance) { - centrifugeInstance.on('disconnected', (ctx: any) => { - disconnectReason = ctx; - }); - } - }, 0); - - // Set a timer to resolve false if no message is received - setTimeout(() => resolve(false), 1000); - }); - - // Wait for disconnection or timeout - const disconnected = await disconnectionPromise; - - // Publish a test message - const testMessage: RealtimeMessage = { mutationId: 'test-mutation-2' }; - - await realtimeClient.publish({ - channel: TEST_CHANNEL, - data: testMessage, - }); - - // Wait to see if the message is received - const received = await messageReceived; - - // Verify that either: - // 1. The connection was never established (disconnected) - // 2. No message was received (because connection failed) - expect(disconnected || !received).toBe(true); - - // Verify the disconnect reason indicates authentication failure - expect(disconnectReason).toBeDefined(); - // Both 3500 (invalid token) and 3501 (bad request) are auth-related errors - expect([3500, 3501]).toContain(disconnectReason.code); - expect(disconnectReason.reason).toBeDefined(); - }, - ); - it('should deliver messages to multiple subscribers on the same channel', async () => { - const TEST_USER_ID = 'test-user-multiple'; const TEST_CHANNEL = Channels.org('test-multiple-subscribers'); // Reset the singleton (RealtimeManager as any).instance = null; - // Generate a real token - const getToken = async () => { - return generateConnectionToken(TEST_USER_ID); - }; - // Initialize the RealtimeManager RealtimeManager.initialize({ - wsUrl: WS_URL, - getToken, + supabaseUrl: SUPABASE_URL, + supabaseAnonKey: SUPABASE_ANON_KEY, }); const manager = RealtimeManager.getInstance(); diff --git a/services/realtime/src/client/manager.ts b/services/realtime/src/client/manager.ts index 38d234185..15a343eb2 100644 --- a/services/realtime/src/client/manager.ts +++ b/services/realtime/src/client/manager.ts @@ -1,10 +1,6 @@ import { type ChannelName } from '@op/common/realtime'; -import { - Centrifuge, - type ErrorContext, - type PublicationContext, - type Subscription, -} from 'centrifuge'; +import { type SupabaseClient, createClient } from '@supabase/supabase-js'; +import type { RealtimeChannel } from '@supabase/supabase-js'; import { type RealtimeMessage, realtimeMessageSchema } from '../schemas'; @@ -14,17 +10,17 @@ export type RealtimeHandler = (event: { }) => void; export interface RealtimeConfig { - wsUrl: string; - getToken: () => Promise; + supabaseUrl: string; + supabaseAnonKey: string; } /** - * Singleton realtime manager for WebSocket connections and channel subscriptions + * Singleton realtime manager for Supabase Realtime channel subscriptions */ export class RealtimeManager { private static instance: RealtimeManager | null = null; - private centrifuge: Centrifuge | null = null; - private subscriptions = new Map(); + private supabase: SupabaseClient | null = null; + private channels = new Map(); private channelListeners = new Map>(); private connectionListeners = new Set<(isConnected: boolean) => void>(); private config: RealtimeConfig | null = null; @@ -49,8 +45,8 @@ export class RealtimeManager { instance.config = config; } - private ensureConnected() { - if (this.centrifuge) { + private ensureClient() { + if (this.supabase) { return; } @@ -60,30 +56,10 @@ export class RealtimeManager { ); } - this.centrifuge = new Centrifuge(this.config.wsUrl, { - // Centrifuge will automatically call getToken when connecting - // and when the token is about to expire for automatic refresh - getToken: async () => { - const token = await this.config!.getToken(); - return token; - }, - }); - - this.centrifuge.on('connected', () => { - console.log('[Realtime] Connected'); - this.connectionListeners.forEach((listener) => listener(true)); - }); - - this.centrifuge.on('disconnected', () => { - console.log('[Realtime] Disconnected'); - this.connectionListeners.forEach((listener) => listener(false)); - }); - - this.centrifuge.on('error', (ctx: ErrorContext) => { - console.error('[Realtime] Error:', ctx); - }); - - this.centrifuge.connect(); + this.supabase = createClient( + this.config.supabaseUrl, + this.config.supabaseAnonKey, + ); } /** @@ -91,10 +67,10 @@ export class RealtimeManager { * Returns an unsubscribe function to clean up the subscription */ subscribe(channel: ChannelName, handler: RealtimeHandler): () => void { - this.ensureConnected(); + this.ensureClient(); - if (!this.centrifuge) { - throw new Error('Centrifuge instance not initialized'); + if (!this.supabase) { + throw new Error('Supabase client not initialized'); } // Add handler to channel listeners @@ -109,48 +85,53 @@ export class RealtimeManager { '[Realtime] Handler already subscribed to channel:', channel, ); - return () => {}; // Return no-op function + return () => {}; } listeners.add(handler); - // Create subscription if it doesn't exist - if (!this.subscriptions.has(channel)) { - const sub = this.centrifuge.newSubscription(channel); - - sub.on('publication', (ctx: PublicationContext) => { - // Validate the message with Zod schema - const parseResult = realtimeMessageSchema.safeParse(ctx.data); - - if (!parseResult.success) { - console.error( - '[Realtime] Invalid message format:', - parseResult.error, - ); - return; - } - - const data = parseResult.data; + // Create channel subscription if it doesn't exist + if (!this.channels.has(channel)) { + const realtimeChannel = this.supabase.channel(channel); + + realtimeChannel.on( + 'broadcast', + { event: 'invalidation' }, + ({ payload }) => { + // Validate the message with Zod schema + const parseResult = realtimeMessageSchema.safeParse(payload); + + if (!parseResult.success) { + console.error( + '[Realtime] Invalid message format:', + parseResult.error, + ); + return; + } + + const data = parseResult.data; + + // Notify all listeners for this channel + const channelListeners = this.channelListeners.get(channel); + if (channelListeners) { + channelListeners.forEach((listener) => listener({ channel, data })); + } + }, + ); - // Notify all listeners for this channel - const channelListeners = this.channelListeners.get(channel); - if (channelListeners) { - channelListeners.forEach((listener) => - listener({ channel: ctx.channel as ChannelName, data }), - ); + realtimeChannel.subscribe((status) => { + if (status === 'SUBSCRIBED') { + console.log('[Realtime] Subscribed to channel:', channel); + this.connectionListeners.forEach((listener) => listener(true)); + } else if (status === 'CLOSED') { + console.log('[Realtime] Unsubscribed from channel:', channel); + this.connectionListeners.forEach((listener) => listener(false)); + } else if (status === 'CHANNEL_ERROR') { + console.error('[Realtime] Channel error:', channel); } }); - sub.on('subscribed', () => { - console.log('[Realtime] Subscribed to channel:', channel); - }); - - sub.on('unsubscribed', () => { - console.log('[Realtime] Unsubscribed from channel:', channel); - }); - - sub.subscribe(); - this.subscriptions.set(channel, sub); + this.channels.set(channel, realtimeChannel); } // Return unsubscribe function @@ -171,56 +152,48 @@ export class RealtimeManager { // Remove the handler listeners.delete(handler); - // If no more handlers for this channel, unsubscribe from Centrifuge + // If no more handlers for this channel, remove the Supabase channel if (listeners.size === 0) { this.channelListeners.delete(channel); - const sub = this.subscriptions.get(channel); - if (sub) { - sub.unsubscribe(); - sub.removeAllListeners(); - this.subscriptions.delete(channel); + const realtimeChannel = this.channels.get(channel); + if (realtimeChannel && this.supabase) { + this.supabase.removeChannel(realtimeChannel); + this.channels.delete(channel); } } - // If no more active subscriptions, disconnect - if (this.subscriptions.size === 0) { + // If no more active channels, disconnect + if (this.channels.size === 0) { this.disconnect(); } } /** - * Disconnect from the WebSocket server and clean up all subscriptions + * Disconnect from Supabase Realtime and clean up all channels */ private disconnect(): void { - if (!this.centrifuge) { + if (!this.supabase) { return; } console.log('[Realtime] Disconnecting...'); - // Clean up all subscriptions - this.subscriptions.forEach((sub) => { - sub.unsubscribe(); - sub.removeAllListeners(); + // Clean up all channels + this.channels.forEach((realtimeChannel) => { + this.supabase!.removeChannel(realtimeChannel); }); - this.subscriptions.clear(); + this.channels.clear(); this.channelListeners.clear(); - // Disconnect and clean up centrifuge instance - this.centrifuge.disconnect(); - this.centrifuge = null; + this.supabase = null; } /** - * Add a connection state listener (called immediately with current state if connected) + * Add a connection state listener */ addConnectionListener(listener: (isConnected: boolean) => void) { this.connectionListeners.add(listener); - // Immediately notify with current state if connected - if (this.centrifuge?.state === 'connected') { - listener(true); - } } removeConnectionListener(listener: (isConnected: boolean) => void) { diff --git a/services/realtime/src/server/client.ts b/services/realtime/src/server/client.ts index 91e6694c3..204e3dc5f 100644 --- a/services/realtime/src/server/client.ts +++ b/services/realtime/src/server/client.ts @@ -1,19 +1,15 @@ import type { ChannelName } from '@op/common/realtime'; /** - * Realtime backend client for interacting with the real-time messaging service + * Realtime backend client for publishing via Supabase Broadcast REST API */ export class RealtimeClient { - private apiUrl: string; - private apiKey: string; + private broadcastUrl: string; + private serviceRoleKey: string; - /** - * apiURL: Base URL of the realtime API (e.g the admin API for Centrifugo) - * apiKey: API key for authenticating requests - */ - constructor(config: { apiUrl: string; apiKey: string }) { - this.apiUrl = config.apiUrl; - this.apiKey = config.apiKey; + constructor(config: { supabaseUrl: string; serviceRoleKey: string }) { + this.broadcastUrl = `${config.supabaseUrl}/realtime/v1/api/broadcast`; + this.serviceRoleKey = config.serviceRoleKey; } /** @@ -25,22 +21,26 @@ export class RealtimeClient { }): Promise { const { channel, data } = options; - const response = await fetch(`${this.apiUrl}/publish`, { + const response = await fetch(this.broadcastUrl, { method: 'POST', headers: { 'Content-Type': 'application/json', - 'X-API-Key': this.apiKey, + apikey: this.serviceRoleKey, + Authorization: `Bearer ${this.serviceRoleKey}`, }, - body: JSON.stringify({ channel, data }), + body: JSON.stringify({ + messages: [ + { + topic: channel, + event: 'invalidation', + payload: data, + }, + ], + }), }); if (!response.ok) { throw new Error(`Realtime publish failed: ${response.statusText}`); } - - const result = await response.json(); - if (result.error) { - throw new Error(`Realtime error: ${result.error.message}`); - } } } diff --git a/services/realtime/src/server/service.ts b/services/realtime/src/server/service.ts index 307b0f9de..1c4c4f710 100644 --- a/services/realtime/src/server/service.ts +++ b/services/realtime/src/server/service.ts @@ -11,24 +11,24 @@ class RealtimeService { private getClient(): RealtimeClient { if (!this.client) { - const apiUrl = process.env.CENTRIFUGO_API_URL; - const apiKey = process.env.CENTRIFUGO_API_KEY; + const supabaseUrl = process.env.NEXT_PUBLIC_SUPABASE_URL; + const serviceRoleKey = process.env.SUPABASE_SERVICE_ROLE; - if (!apiUrl) { + if (!supabaseUrl) { throw new Error( - '[Realtime] CENTRIFUGO_API_URL is not set. Realtime publishing will be disabled.', + '[Realtime] NEXT_PUBLIC_SUPABASE_URL is not set. Realtime publishing will be disabled.', ); } - if (!apiKey) { + if (!serviceRoleKey) { throw new Error( - '[Realtime] CENTRIFUGO_API_KEY is not set. Realtime publishing will be disabled.', + '[Realtime] SUPABASE_SERVICE_ROLE is not set. Realtime publishing will be disabled.', ); } this.client = new RealtimeClient({ - apiUrl, - apiKey, + supabaseUrl, + serviceRoleKey, }); } diff --git a/supabase/supabase-dev.toml b/supabase/supabase-dev.toml index e8dd7ada7..cbecded7c 100644 --- a/supabase/supabase-dev.toml +++ b/supabase/supabase-dev.toml @@ -38,7 +38,7 @@ enabled = true sql_paths = [ './seed.sql' ] [realtime] -enabled = false +enabled = true [studio] enabled = true