diff --git a/package.json b/package.json index 9473069..a8e323e 100644 --- a/package.json +++ b/package.json @@ -45,7 +45,8 @@ "socket.io-client": "^4.8.3" "jsonwebtoken": "^9.0.3", "mongoose": "^9.1.5", - "morgan": "^1.10.1" + "morgan": "^1.10.1", + "socket.io": "^4.8.3" }, "devDependencies": { "@babel/eslint-parser": "^7.28.6", diff --git a/server.js b/server.js index 0c2965d..f4dc82b 100644 --- a/server.js +++ b/server.js @@ -25,10 +25,12 @@ const MONGODB_URI = process.env.MONGODB_URI; const httpServer = createServer(app); const io = new Server(httpServer, { cors: { - origin: process.env.CLIENT_URL ? [process.env.CLIENT_URL] : ['http://localhost:5173', 'http://localhost:3000'], + origin: process.env.CLIENT_URL + ? [process.env.CLIENT_URL] + : ['http://localhost:5173', 'http://localhost:3000'], methods: ['GET', 'POST'], - credentials: true - } + credentials: true, + }, }); // Recreate __dirname since it is not available in ES Modules by default @@ -169,6 +171,142 @@ app.get('/health', (req,res)=>{ }); }); +// ============== WEBRTC SIGNALING SERVER ============== +// Track active streams: { roomId: { broadcaster: socketId, viewers: [socketId] } } +const activeStreams = new Map(); + +// API: Get list of active live streams +app.get('/api/live-streams', (req, res) => { + const streams = []; + activeStreams.forEach((value, key) => { + streams.push({ roomId: key, viewerCount: value.viewers.length }); + }); + res.json(streams); +}); + +/** + * Validate that a roomId exists in activeStreams and that the socket + * has joined the corresponding Socket.IO room. + * Returns the stream object on success, or null after emitting an error. + */ +function validateRoom(socket, roomId, eventName) { + if (!roomId || !activeStreams.has(roomId)) { + socket.emit('error', { + event: eventName, + message: `Room "${roomId}" does not exist in active streams`, + }); + return null; + } + + if (!socket.rooms.has(roomId)) { + socket.emit('error', { + event: eventName, + message: `Socket is not a member of room "${roomId}"`, + }); + return null; + } + + return activeStreams.get(roomId); +} + +io.on('connection', (socket) => { + console.log(`[SOCKET] User connected: ${socket.id}`); + + // Broadcaster starts a stream + socket.on('start-stream', (roomId) => { + if (activeStreams.has(roomId)) { + socket.emit('error', { message: 'Room already exists' }); + return; + } + socket.join(roomId); + activeStreams.set(roomId, { broadcaster: socket.id, viewers: [] }); + console.log(`[LIVE] Stream started: ${roomId} by ${socket.id}`); + socket.emit('stream-started', { roomId }); + }); + + // Viewer joins a stream + socket.on('join-stream', (roomId) => { + const stream = activeStreams.get(roomId); + if (!stream) { + socket.emit('error', { message: 'Stream not found' }); + return; + } + socket.join(roomId); + stream.viewers.push(socket.id); + console.log(`[VIEWER] ${socket.id} joined ${roomId}`); + + // Notify broadcaster that a new viewer joined + io.to(stream.broadcaster).emit('viewer-joined', { + viewerId: socket.id, + }); + }); + + // WebRTC signaling: Offer (broadcaster -> viewer) + socket.on('offer', ({ roomId, offer, viewerId }) => { + const stream = validateRoom(socket, roomId, 'offer'); + if (!stream) return; + + io.to(viewerId).emit('offer', { offer, broadcasterId: socket.id }); + }); + + // WebRTC signaling: Answer (viewer -> broadcaster) + socket.on('answer', ({ roomId, answer, broadcasterId }) => { + const stream = validateRoom(socket, roomId, 'answer'); + if (!stream) return; + + io.to(broadcasterId).emit('answer', { answer, viewerId: socket.id }); + }); + + // WebRTC signaling: ICE Candidate + socket.on('ice-candidate', ({ roomId, candidate, targetId }) => { + const stream = validateRoom(socket, roomId, 'ice-candidate'); + if (!stream) return; + + io.to(targetId).emit('ice-candidate', { + candidate, + senderId: socket.id, + }); + }); + + // Stop stream + socket.on('stop-stream', (roomId) => { + if (activeStreams.has(roomId)) { + io.to(roomId).emit('stream-ended'); + activeStreams.delete(roomId); + console.log(`[ENDED] Stream ended: ${roomId}`); + } + }); + + // Disconnect handling + socket.on('disconnect', () => { + console.log(`[SOCKET] User disconnected: ${socket.id}`); + + // Check all streams for this socket + activeStreams.forEach((stream, roomId) => { + if (stream.broadcaster === socket.id) { + // Broadcaster disconnected - end the stream for all viewers + io.to(roomId).emit('stream-ended', { + reason: 'Broadcaster disconnected', + }); + activeStreams.delete(roomId); + console.log( + `[ENDED] Stream ${roomId} ended (broadcaster disconnected)` + ); + } else if (stream.viewers.includes(socket.id)) { + // Viewer disconnected - remove from list and notify broadcaster + stream.viewers = stream.viewers.filter((id) => id !== socket.id); + io.to(stream.broadcaster).emit('viewer-left', { + viewerId: socket.id, + viewerCount: stream.viewers.length, + }); + console.log( + `[VIEWER] ${socket.id} left ${roomId}. Viewers remaining: ${stream.viewers.length}` + ); + } + }); + }); +}); + // 404 Not Found handler (must be after all routes) app.use((req, res) => { res.status(404).json({ @@ -189,6 +327,7 @@ app.get('/api/live-streams', (req, res) => { res.json(streams); }); +httpServer.listen(PORT, () => { io.on('connection', (socket) => { console.log(`[SOCKET] User connected: ${socket.id}`); @@ -274,5 +413,5 @@ app.listen(PORT, () => { console.log(`Home: http://localhost:${PORT}`); console.log('\nāœ… Server successfully started!'); console.log(`šŸ  Home: http://localhost:${PORT}`); - console.log(`šŸ“” WebRTC Signaling: ws://localhost:${PORT}`); -}); \ No newline at end of file + console.log(`WebRTC Signaling: ws://localhost:${PORT}`); +});