Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@
"socket.io-client": "^4.8.3"
"jsonwebtoken": "^9.0.3",
"mongoose": "^9.1.5",
"morgan": "^1.10.1"
"morgan": "^1.10.1",
"socket.io": "^4.8.3"
},
"devDependencies": {
"@babel/eslint-parser": "^7.28.6",
Expand Down
149 changes: 144 additions & 5 deletions server.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ const MONGODB_URI = process.env.MONGODB_URI;
const httpServer = createServer(app);
const io = new Server(httpServer, {
cors: {
origin: process.env.CLIENT_URL ? [process.env.CLIENT_URL] : ['http://localhost:5173', 'http://localhost:3000'],
origin: process.env.CLIENT_URL
? [process.env.CLIENT_URL]
: ['http://localhost:5173', 'http://localhost:3000'],
methods: ['GET', 'POST'],
credentials: true
}
credentials: true,
},
});

// Recreate __dirname since it is not available in ES Modules by default
Expand Down Expand Up @@ -169,6 +171,142 @@ app.get('/health', (req,res)=>{
});
});

// ============== WEBRTC SIGNALING SERVER ==============
// Track active streams: { roomId: { broadcaster: socketId, viewers: [socketId] } }
const activeStreams = new Map();

// API: Get list of active live streams
app.get('/api/live-streams', (req, res) => {
const streams = [];
activeStreams.forEach((value, key) => {
streams.push({ roomId: key, viewerCount: value.viewers.length });
});
res.json(streams);
});

/**
* Validate that a roomId exists in activeStreams and that the socket
* has joined the corresponding Socket.IO room.
* Returns the stream object on success, or null after emitting an error.
*/
function validateRoom(socket, roomId, eventName) {
if (!roomId || !activeStreams.has(roomId)) {
socket.emit('error', {
event: eventName,
message: `Room "${roomId}" does not exist in active streams`,
});
return null;
}

if (!socket.rooms.has(roomId)) {
socket.emit('error', {
event: eventName,
message: `Socket is not a member of room "${roomId}"`,
});
return null;
}

return activeStreams.get(roomId);
}

io.on('connection', (socket) => {
console.log(`[SOCKET] User connected: ${socket.id}`);

// Broadcaster starts a stream
socket.on('start-stream', (roomId) => {
if (activeStreams.has(roomId)) {
socket.emit('error', { message: 'Room already exists' });
return;
}
socket.join(roomId);
activeStreams.set(roomId, { broadcaster: socket.id, viewers: [] });
console.log(`[LIVE] Stream started: ${roomId} by ${socket.id}`);
socket.emit('stream-started', { roomId });
});

// Viewer joins a stream
socket.on('join-stream', (roomId) => {
const stream = activeStreams.get(roomId);
if (!stream) {
socket.emit('error', { message: 'Stream not found' });
return;
}
socket.join(roomId);
stream.viewers.push(socket.id);
console.log(`[VIEWER] ${socket.id} joined ${roomId}`);

// Notify broadcaster that a new viewer joined
io.to(stream.broadcaster).emit('viewer-joined', {
viewerId: socket.id,
});
});

// WebRTC signaling: Offer (broadcaster -> viewer)
socket.on('offer', ({ roomId, offer, viewerId }) => {
const stream = validateRoom(socket, roomId, 'offer');
if (!stream) return;

io.to(viewerId).emit('offer', { offer, broadcasterId: socket.id });
});

// WebRTC signaling: Answer (viewer -> broadcaster)
socket.on('answer', ({ roomId, answer, broadcasterId }) => {
const stream = validateRoom(socket, roomId, 'answer');
if (!stream) return;

io.to(broadcasterId).emit('answer', { answer, viewerId: socket.id });
});

// WebRTC signaling: ICE Candidate
socket.on('ice-candidate', ({ roomId, candidate, targetId }) => {
const stream = validateRoom(socket, roomId, 'ice-candidate');
if (!stream) return;

io.to(targetId).emit('ice-candidate', {
candidate,
senderId: socket.id,
});
});

// Stop stream
socket.on('stop-stream', (roomId) => {
if (activeStreams.has(roomId)) {
io.to(roomId).emit('stream-ended');
activeStreams.delete(roomId);
console.log(`[ENDED] Stream ended: ${roomId}`);
}
});

// Disconnect handling
socket.on('disconnect', () => {
console.log(`[SOCKET] User disconnected: ${socket.id}`);

// Check all streams for this socket
activeStreams.forEach((stream, roomId) => {
if (stream.broadcaster === socket.id) {
// Broadcaster disconnected - end the stream for all viewers
io.to(roomId).emit('stream-ended', {
reason: 'Broadcaster disconnected',
});
activeStreams.delete(roomId);
console.log(
`[ENDED] Stream ${roomId} ended (broadcaster disconnected)`
);
} else if (stream.viewers.includes(socket.id)) {
// Viewer disconnected - remove from list and notify broadcaster
stream.viewers = stream.viewers.filter((id) => id !== socket.id);
io.to(stream.broadcaster).emit('viewer-left', {
viewerId: socket.id,
viewerCount: stream.viewers.length,
});
console.log(
`[VIEWER] ${socket.id} left ${roomId}. Viewers remaining: ${stream.viewers.length}`
);
}
});
});
});

// 404 Not Found handler (must be after all routes)
app.use((req, res) => {
res.status(404).json({
Expand All @@ -189,6 +327,7 @@ app.get('/api/live-streams', (req, res) => {
res.json(streams);
});

httpServer.listen(PORT, () => {
io.on('connection', (socket) => {
console.log(`[SOCKET] User connected: ${socket.id}`);

Expand Down Expand Up @@ -274,5 +413,5 @@ app.listen(PORT, () => {
console.log(`Home: http://localhost:${PORT}`);
console.log('\n✅ Server successfully started!');
console.log(`🏠 Home: http://localhost:${PORT}`);
console.log(`📡 WebRTC Signaling: ws://localhost:${PORT}`);
});
console.log(`WebRTC Signaling: ws://localhost:${PORT}`);
});
Loading