Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 39 additions & 70 deletions src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,30 +11,34 @@ import { getServerConfig } from "./config.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";

let httpServer: Server | null = null;
const transports = {
streamable: {} as Record<string, StreamableHTTPServerTransport>,
sse: {} as Record<string, SSEServerTransport>,

type Session = {
transport: StreamableHTTPServerTransport | SSEServerTransport;
server: McpServer;
};
const sessions: Record<string, Session> = {};

/**
* Start the MCP server in either stdio or HTTP mode.
*/
export async function startServer(): Promise<void> {
const config = getServerConfig();

const server = createServer(config.auth, {
const serverOptions = {
isHTTP: !config.isStdioMode,
outputFormat: config.outputFormat,
outputFormat: config.outputFormat as "yaml" | "json",
skipImageDownloads: config.skipImageDownloads,
imageDir: config.imageDir,
});
};

if (config.isStdioMode) {
const server = createServer(config.auth, serverOptions);
const transport = new StdioServerTransport();
await server.connect(transport);
} else {
const createMcpServer = () => createServer(config.auth, serverOptions);
console.log(`Initializing Figma MCP Server in HTTP mode on ${config.host}:${config.port}...`);
await startHttpServer(config.host, config.port, server);
await startHttpServer(config.host, config.port, createMcpServer);

process.on("SIGINT", async () => {
Logger.log("Shutting down server...");
Expand All @@ -48,7 +52,7 @@ export async function startServer(): Promise<void> {
export async function startHttpServer(
host: string,
port: number,
mcpServer: McpServer,
createMcpServer: () => McpServer,
): Promise<Server> {
if (httpServer) {
throw new Error("HTTP server is already running");
Expand All @@ -63,44 +67,27 @@ export async function startHttpServer(
app.post("/mcp", async (req, res) => {
Logger.log("Received StreamableHTTP request");
const sessionId = req.headers["mcp-session-id"] as string | undefined;
// Logger.log("Session ID:", sessionId);
// Logger.log("Headers:", req.headers);
// Logger.log("Body:", req.body);
// Logger.log("Is Initialize Request:", isInitializeRequest(req.body));
let transport: StreamableHTTPServerTransport;

if (sessionId && transports.streamable[sessionId]) {
if (sessionId && sessions[sessionId]) {
// Reuse existing transport
Logger.log("Reusing existing StreamableHTTP transport for sessionId", sessionId);
transport = transports.streamable[sessionId];
transport = sessions[sessionId].transport as StreamableHTTPServerTransport;
} else if (!sessionId && isInitializeRequest(req.body)) {
Logger.log("New initialization request for StreamableHTTP sessionId", sessionId);
transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID(),
onsessioninitialized: (sessionId) => {
// Store the transport by session ID
transports.streamable[sessionId] = transport;
onsessioninitialized: (newSessionId) => {
sessions[newSessionId] = { transport, server: mcpServer };
},
});
transport.onclose = () => {
if (transport.sessionId) {
delete transports.streamable[transport.sessionId];
delete sessions[transport.sessionId];
}
};
// SDK 1.21+ throws if already connected to another transport. A single
// McpServer can only serve one transport at a time — this is a known
// architectural limitation (see multi-client test in server.test.ts).
try {
await mcpServer.connect(transport);
} catch (error) {
Logger.error("Failed to connect Streamable HTTP transport:", error);
res.status(500).json({
jsonrpc: "2.0",
error: { code: -32000, message: "Server transport conflict" },
id: null,
});
return;
}
const mcpServer = createMcpServer();
await mcpServer.connect(transport);
} else {
// Invalid request
Logger.log("Invalid request:", req.body);
Expand All @@ -117,15 +104,14 @@ export async function startHttpServer(

let progressInterval: NodeJS.Timeout | null = null;
const progressToken = req.body.params?._meta?.progressToken;
// Logger.log("Progress token:", progressToken);
let progress = 0;
if (progressToken) {
if (progressToken && sessionId && sessions[sessionId]) {
Logger.log(
`Setting up progress notifications for token ${progressToken} on session ${sessionId}`,
);
progressInterval = setInterval(async () => {
Logger.log("Sending progress notification", progress);
await mcpServer.server.notification({
await sessions[sessionId].server.server.notification({
method: "notifications/progress",
params: {
progress,
Expand All @@ -148,15 +134,15 @@ export async function startHttpServer(
// Handle GET requests for SSE streams (using built-in support from StreamableHTTP)
const handleSessionRequest = async (req: Request, res: Response) => {
const sessionId = req.headers["mcp-session-id"] as string | undefined;
if (!sessionId || !transports.streamable[sessionId]) {
if (!sessionId || !sessions[sessionId]) {
res.status(400).send("Invalid or missing session ID");
return;
}

console.log(`Received session termination request for session ${sessionId}`);

try {
const transport = transports.streamable[sessionId];
const transport = sessions[sessionId].transport as StreamableHTTPServerTransport;
await transport.handleRequest(req, res);
} catch (error) {
console.error("Error handling session termination:", error);
Expand All @@ -179,33 +165,23 @@ export async function startHttpServer(
Logger.log("/sse request headers:", req.headers);
Logger.log("/sse request body:", req.body);

transports.sse[transport.sessionId] = transport;
const mcpServer = createMcpServer();
sessions[transport.sessionId] = { transport, server: mcpServer };
res.on("close", () => {
delete transports.sse[transport.sessionId];
delete sessions[transport.sessionId];
});

// SDK 1.21+ throws if already connected to another transport (e.g. a
// Streamable HTTP session). This is a known architectural limitation —
// a single McpServer instance can only serve one transport at a time.
try {
await mcpServer.connect(transport);
} catch (error) {
delete transports.sse[transport.sessionId];
Logger.error("Failed to connect SSE transport:", error);
if (!res.headersSent) {
res.status(500).send("Failed to establish SSE connection");
}
}
await mcpServer.connect(transport);
});

app.post("/messages", async (req, res) => {
const sessionId = req.query.sessionId as string;
const transport = transports.sse[sessionId];
if (transport) {
const session = sessions[sessionId];
if (session) {
Logger.log(`Received SSE message for sessionId ${sessionId}`);
Logger.log("/messages request headers:", req.headers);
Logger.log("/messages request body:", req.body);
await transport.handlePostMessage(req, res);
await (session.transport as SSEServerTransport).handlePostMessage(req, res);
} else {
res.status(400).send(`No transport found for sessionId ${sessionId}`);
return;
Expand All @@ -228,27 +204,20 @@ export async function startHttpServer(
});
}

async function closeTransports(
transports: Record<string, SSEServerTransport | StreamableHTTPServerTransport>,
) {
for (const sessionId in transports) {
try {
await transports[sessionId]?.close();
delete transports[sessionId];
} catch (error) {
console.error(`Error closing transport for session ${sessionId}:`, error);
}
}
}

export async function stopHttpServer(): Promise<void> {
if (!httpServer) {
throw new Error("HTTP server is not running");
}

// Close all transports FIRST so connections drain
await closeTransports(transports.sse);
await closeTransports(transports.streamable);
// Close all sessions FIRST so connections drain
for (const sessionId in sessions) {
try {
await sessions[sessionId].transport.close();
delete sessions[sessionId];
} catch (error) {
console.error(`Error closing session ${sessionId}:`, error);
}
}

// Then close the HTTP server
return new Promise((resolve, reject) => {
Expand Down
Loading
Loading