diff --git a/.gitignore b/.gitignore index 1a896b2b..fb31baa1 100644 --- a/.gitignore +++ b/.gitignore @@ -27,6 +27,9 @@ auth-*.json users.csv cache/ +# Usage statistics data +data/ + # Temporary files tmp/ ui/dist/ diff --git a/src/core/ProxyServerSystem.js b/src/core/ProxyServerSystem.js index 4511cb44..b5345050 100644 --- a/src/core/ProxyServerSystem.js +++ b/src/core/ProxyServerSystem.js @@ -12,6 +12,7 @@ const http = require("http"); const https = require("https"); const fs = require("fs"); const net = require("net"); +const path = require("path"); const { URL } = require("url"); const LoggingService = require("../utils/LoggingService"); @@ -19,6 +20,7 @@ const AuthSource = require("../auth/AuthSource"); const BrowserManager = require("./BrowserManager"); const ConnectionRegistry = require("./ConnectionRegistry"); const RequestHandler = require("./RequestHandler"); +const UsageStatsService = require("./UsageStatsService"); const ConfigLoader = require("../utils/ConfigLoader"); const WebRoutes = require("../routes/WebRoutes"); @@ -40,6 +42,7 @@ class ProxyServerSystem extends EventEmitter { this.authSource = new AuthSource(this.logger); this.browserManager = new BrowserManager(this.logger, this.config, this.authSource); + this.usageStatsService = new UsageStatsService(this.authSource, this.logger, path.join(process.cwd(), "data")); // Create ConnectionRegistry with lightweight reconnect callback // When WebSocket connection is lost but browser is still running, @@ -351,6 +354,7 @@ class ProxyServerSystem extends EventEmitter { app.use((req, res, next) => { if ( req.path !== "/api/status" && + req.path !== "/api/usage-stats" && req.path !== "/" && req.path !== "/favicon.ico" && req.path !== "/login" && diff --git a/src/core/RequestHandler.js b/src/core/RequestHandler.js index 4c53b8a8..e78889f2 100644 --- a/src/core/RequestHandler.js +++ b/src/core/RequestHandler.js @@ -58,6 +58,108 @@ class RequestHandler { return this.authSwitcher.isSystemBusy; } + _getUsageStatsService() { + return this.serverSystem.usageStatsService || null; + } + + _getAccountNameForIndex(authIndex) { + if (!Number.isInteger(authIndex) || authIndex < 0) { + return null; + } + + return this.authSource?.accountNameMap?.get(authIndex) || null; + } + + _getClientIp(req) { + return this.serverSystem.webRoutes.authRoutes.getClientIP(req); + } + + _extractModelFromPath(pathValue) { + if (typeof pathValue !== "string") return null; + + const match = pathValue.match(/\/models\/([^:/?]+)(?::|$)/); + return match?.[1] || null; + } + + _categorizeRequest(pathValue, fallback = "request") { + if (typeof pathValue !== "string") return fallback; + if (pathValue.includes("countTokens") || pathValue.includes("input_tokens")) return "count_tokens"; + if (pathValue.includes("generateContent") || pathValue.includes("streamGenerateContent")) return "generation"; + if (pathValue.includes("/upload/")) return "upload"; + return fallback; + } + + _startTrackedRequest(requestId, req, meta = {}) { + const usageStatsService = this._getUsageStatsService(); + if (!usageStatsService) return; + + usageStatsService.startRequest(requestId, { + clientIp: this._getClientIp(req), + initialAccountName: this._getAccountNameForIndex(this.currentAuthIndex), + initialAuthIndex: this.currentAuthIndex, + method: req.method, + path: req.path, + ...meta, + }); + } + + _updateTrackedRequest(requestId, patch = {}) { + const usageStatsService = this._getUsageStatsService(); + if (!usageStatsService) return; + usageStatsService.updateRequest(requestId, patch); + } + + _finalizeTrackedRequest(requestId, res, overrides = {}) { + const usageStatsService = this._getUsageStatsService(); + if (!usageStatsService) return; + + let outcome = overrides.outcome; + if (!outcome) { + if (res.__usageTrackingClientAborted) { + outcome = "aborted"; + } else if (res.__usageTrackingOutcome) { + outcome = res.__usageTrackingOutcome; + } else { + const statusCode = Number.isFinite(res.statusCode) ? Number(res.statusCode) : null; + outcome = statusCode !== null && statusCode >= 400 ? "error" : "success"; + } + } + + const statusCode = + overrides.statusCode ?? + res.__usageTrackingErrorStatus ?? + (Number.isFinite(res.statusCode) && res.statusCode > 0 ? Number(res.statusCode) : null); + + const errorMessage = + overrides.errorMessage ?? + res.__usageTrackingErrorMessage ?? + (outcome === "error" ? "Request failed" : null); + + usageStatsService.finishRequest(requestId, { + errorMessage, + finalAccountName: overrides.finalAccountName, + finalAuthIndex: overrides.finalAuthIndex, + outcome, + statusCode, + }); + } + + _markTrackedResponseError(res, message, statusCode = null, outcome = "error") { + if (!res) return; + res.__usageTrackingOutcome = outcome; + res.__usageTrackingErrorMessage = message || null; + if (Number.isFinite(statusCode)) { + res.__usageTrackingErrorStatus = Number(statusCode); + } + } + + _markTrackedClientAbort(res, message = "Client disconnected") { + if (!res) return; + res.__usageTrackingClientAborted = true; + res.__usageTrackingOutcome = "aborted"; + res.__usageTrackingErrorMessage = message; + } + // Delegate methods to AuthSwitcher async _switchToNextAuth() { return this.authSwitcher.switchToNextAuth(); @@ -126,6 +228,7 @@ class RequestHandler { if (isClientDisconnect) { // Client disconnected or queue closed due to client disconnect - no error needed + this._markTrackedClientAbort(res, error.message || "Client disconnected"); this.logger.debug( `[Request] ${format} stream interrupted by client disconnect (reason: ${error.reason || "connection_lost"})` ); @@ -144,6 +247,7 @@ class RequestHandler { try { const errorMessage = `Stream interrupted: ${error.reason === "page_closed" ? "Account context closed" : error.reason || "Connection lost"}`; + this._markTrackedResponseError(res, errorMessage, 503); if (format === "claude") { // Claude format: event: error\ndata: {...} @@ -213,9 +317,13 @@ class RequestHandler { try { let errorPayload; + let trackingStatus = 500; + let trackingMessage = String(error?.message ?? error); if (error.code === "QUEUE_TIMEOUT" || error instanceof QueueTimeoutError) { // True timeout error - 504 + trackingStatus = 504; + trackingMessage = `Stream timeout: ${trackingMessage}`; if (format === "openai") { errorPayload = { error: { @@ -268,6 +376,8 @@ class RequestHandler { } } else if (error.code === "QUEUE_CLOSED" || error instanceof QueueClosedError) { // Queue closed (account switch, system reset, etc.) - 503 + trackingStatus = 503; + trackingMessage = `Service unavailable: ${trackingMessage}`; if (format === "openai") { errorPayload = { error: { @@ -322,6 +432,8 @@ class RequestHandler { // Other unexpected errors - rethrow to outer handler throw error; } + + this._markTrackedResponseError(res, trackingMessage, trackingStatus); } catch (writeError) { this.logger.debug(`[Request] Failed to write fake stream error to client: ${writeError.message}`); // If write failed or unexpected error, rethrow original error @@ -623,88 +735,107 @@ class RequestHandler { // Process standard Google API requests async processRequest(req, res) { const requestId = this._generateRequestId(); + this._startTrackedRequest(requestId, req, { + apiFormat: "gemini", + requestCategory: this._categorizeRequest(req.path, "request"), + }); res.__proxyResponseStreamMode = null; - // Check current account's browser connection - if (!this.connectionRegistry.getConnectionByAuth(this.currentAuthIndex)) { - this.logger.warn(`[Request] No WebSocket connection for current account #${this.currentAuthIndex}`); - const recovered = await this._handleBrowserRecovery(res); - if (!recovered) return; - } + try { + // Check current account's browser connection + if (!this.connectionRegistry.getConnectionByAuth(this.currentAuthIndex)) { + this.logger.warn(`[Request] No WebSocket connection for current account #${this.currentAuthIndex}`); + const recovered = await this._handleBrowserRecovery(res); + if (!recovered) return; + } - // Wait for system to become ready if it's busy - { - const ready = await this._waitForSystemAndConnectionIfBusy(res); - if (!ready) return; - } - if (this.browserManager) { - this.browserManager.notifyUserActivity(); - } - // Handle usage-based account switching - const isGenerativeRequest = - req.method === "POST" && - (req.path.includes("generateContent") || req.path.includes("streamGenerateContent")); + // Wait for system to become ready if it's busy + { + const ready = await this._waitForSystemAndConnectionIfBusy(res); + if (!ready) return; + } + if (this.browserManager) { + this.browserManager.notifyUserActivity(); + } + // Handle usage-based account switching + const isGenerativeRequest = + req.method === "POST" && + (req.path.includes("generateContent") || req.path.includes("streamGenerateContent")); - if (isGenerativeRequest) { - const usageCount = this.authSwitcher.incrementUsageCount(); - if (usageCount > 0) { - const rotationCountText = - this.config.switchOnUses > 0 ? `${usageCount}/${this.config.switchOnUses}` : `${usageCount}`; - this.logger.info( - `[Request] Generation request - account rotation count: ${rotationCountText} (Current account: ${this.currentAuthIndex})` - ); - if (this.authSwitcher.shouldSwitchByUsage()) { - this.needsSwitchingAfterRequest = true; + if (isGenerativeRequest) { + const usageCount = this.authSwitcher.incrementUsageCount(); + if (usageCount > 0) { + const rotationCountText = + this.config.switchOnUses > 0 ? `${usageCount}/${this.config.switchOnUses}` : `${usageCount}`; + this.logger.info( + `[Request] Generation request - account rotation count: ${rotationCountText} (Current account: ${this.currentAuthIndex})` + ); + if (this.authSwitcher.shouldSwitchByUsage()) { + this.needsSwitchingAfterRequest = true; + } } } - } - const proxyRequest = this._buildProxyRequest(req, requestId); - proxyRequest.is_generative = isGenerativeRequest; - this._initializeProxyRequestAttempt(proxyRequest); + const proxyRequest = this._buildProxyRequest(req, requestId); + proxyRequest.is_generative = isGenerativeRequest; + this._initializeProxyRequestAttempt(proxyRequest); - const wantsStream = req.path.includes(":streamGenerateContent"); - res.__proxyResponseStreamMode = wantsStream ? proxyRequest.streaming_mode : null; + const wantsStream = req.path.includes(":streamGenerateContent"); + res.__proxyResponseStreamMode = wantsStream ? proxyRequest.streaming_mode : null; - try { - // Create message queue inside try-catch to handle invalid authIndex - const messageQueue = this.connectionRegistry.createMessageQueue( - requestId, - this.currentAuthIndex, - proxyRequest.request_attempt_id - ); - this._setupClientDisconnectHandler(res, requestId); + this._updateTrackedRequest(requestId, { + isStreaming: wantsStream, + model: this._extractModelFromPath(proxyRequest.path), + path: proxyRequest.path, + requestCategory: this._categorizeRequest( + proxyRequest.path, + isGenerativeRequest ? "generation" : "request" + ), + streamMode: wantsStream ? proxyRequest.streaming_mode : null, + }); - if (wantsStream) { - this.logger.info( - `[Request] Client enabled streaming (${proxyRequest.streaming_mode}), entering streaming processing mode...` + try { + // Create message queue inside try-catch to handle invalid authIndex + const messageQueue = this.connectionRegistry.createMessageQueue( + requestId, + this.currentAuthIndex, + proxyRequest.request_attempt_id ); - if (proxyRequest.streaming_mode === "fake") { - await this._handlePseudoStreamResponse(proxyRequest, messageQueue, req, res); + this._setupClientDisconnectHandler(res, requestId); + + if (wantsStream) { + this.logger.info( + `[Request] Client enabled streaming (${proxyRequest.streaming_mode}), entering streaming processing mode...` + ); + if (proxyRequest.streaming_mode === "fake") { + await this._handlePseudoStreamResponse(proxyRequest, messageQueue, req, res); + } else { + await this._handleRealStreamResponse(proxyRequest, messageQueue, req, res); + } } else { - await this._handleRealStreamResponse(proxyRequest, messageQueue, req, res); + proxyRequest.streaming_mode = "fake"; + await this._handleNonStreamResponse(proxyRequest, messageQueue, req, res); } - } else { - proxyRequest.streaming_mode = "fake"; - await this._handleNonStreamResponse(proxyRequest, messageQueue, req, res); - } - } catch (error) { - // Handle queue timeout by notifying browser - this._handleQueueTimeout(error, requestId); + } catch (error) { + // Handle queue timeout by notifying browser + this._handleQueueTimeout(error, requestId); - this._handleRequestError(error, res, "gemini"); - } finally { - this.connectionRegistry.removeMessageQueue(requestId, "request_complete"); - if (this.needsSwitchingAfterRequest) { - this.logger.info( - `[Auth] Rotation count reached switching threshold (${this.authSwitcher.usageCount}/${this.config.switchOnUses}), will automatically switch account in background...` - ); - this.authSwitcher.switchToNextAuth().catch(err => { - this.logger.error(`[Auth] Background account switching task failed: ${err.message}`); - }); - this.needsSwitchingAfterRequest = false; + this._handleRequestError(error, res, "gemini"); + } finally { + this.connectionRegistry.removeMessageQueue(requestId, "request_complete"); + if (this.needsSwitchingAfterRequest) { + this.logger.info( + `[Auth] Rotation count reached switching threshold (${this.authSwitcher.usageCount}/${this.config.switchOnUses}), will automatically switch account in background...` + ); + this.authSwitcher.switchToNextAuth().catch(err => { + this.logger.error(`[Auth] Background account switching task failed: ${err.message}`); + }); + this.needsSwitchingAfterRequest = false; + } + if (!res.writableEnded) res.end(); } - if (!res.writableEnded) res.end(); + } finally { + this._finalizeTrackedRequest(requestId, res); } } @@ -712,634 +843,621 @@ class RequestHandler { async processUploadRequest(req, res) { const requestId = this._generateRequestId(); this.logger.info(`[Upload] Processing upload request ${req.method} ${req.path} (ID: ${requestId})`); + this._startTrackedRequest(requestId, req, { + apiFormat: "upload", + isStreaming: false, + requestCategory: "upload", + streamMode: null, + }); - // Check current account's browser connection - if (!this.connectionRegistry.getConnectionByAuth(this.currentAuthIndex)) { - this.logger.warn(`[Upload] No WebSocket connection for current account #${this.currentAuthIndex}`); - const recovered = await this._handleBrowserRecovery(res); - if (!recovered) return; - } + try { + // Check current account's browser connection + if (!this.connectionRegistry.getConnectionByAuth(this.currentAuthIndex)) { + this.logger.warn(`[Upload] No WebSocket connection for current account #${this.currentAuthIndex}`); + const recovered = await this._handleBrowserRecovery(res); + if (!recovered) return; + } - // Wait for system to become ready if it's busy - { - const ready = await this._waitForSystemAndConnectionIfBusy(res); - if (!ready) return; - } + // Wait for system to become ready if it's busy + { + const ready = await this._waitForSystemAndConnectionIfBusy(res); + if (!ready) return; + } - if (this.browserManager) { - this.browserManager.notifyUserActivity(); - } + if (this.browserManager) { + this.browserManager.notifyUserActivity(); + } - const proxyRequest = { - body_b64: req.rawBody ? req.rawBody.toString("base64") : undefined, - headers: req.headers, - is_generative: false, // Uploads are never generative - method: req.method, - path: req.path.replace(/^\/proxy/, ""), - query_params: req.query || {}, - request_id: requestId, - streaming_mode: "fake", // Uploads always return a single JSON response - }; - this._initializeProxyRequestAttempt(proxyRequest); + const proxyRequest = { + body_b64: req.rawBody ? req.rawBody.toString("base64") : undefined, + headers: req.headers, + is_generative: false, // Uploads are never generative + method: req.method, + path: req.path.replace(/^\/proxy/, ""), + query_params: req.query || {}, + request_id: requestId, + streaming_mode: "fake", // Uploads always return a single JSON response + }; + this._initializeProxyRequestAttempt(proxyRequest); + this._updateTrackedRequest(requestId, { + path: proxyRequest.path, + }); - try { - // Create message queue inside try-catch to handle invalid authIndex - const messageQueue = this.connectionRegistry.createMessageQueue( - requestId, - this.currentAuthIndex, - proxyRequest.request_attempt_id - ); - this._setupClientDisconnectHandler(res, requestId); + try { + // Create message queue inside try-catch to handle invalid authIndex + const messageQueue = this.connectionRegistry.createMessageQueue( + requestId, + this.currentAuthIndex, + proxyRequest.request_attempt_id + ); + this._setupClientDisconnectHandler(res, requestId); - await this._handleNonStreamResponse(proxyRequest, messageQueue, req, res); - } catch (error) { - this._handleRequestError(error, res); + await this._handleNonStreamResponse(proxyRequest, messageQueue, req, res); + } catch (error) { + this._handleRequestError(error, res); + } finally { + this.connectionRegistry.removeMessageQueue(requestId, "request_complete"); + if (!res.writableEnded) res.end(); + } } finally { - this.connectionRegistry.removeMessageQueue(requestId, "request_complete"); - if (!res.writableEnded) res.end(); + this._finalizeTrackedRequest(requestId, res); } } // Process OpenAI format requests async processOpenAIRequest(req, res) { const requestId = this._generateRequestId(); + this._startTrackedRequest(requestId, req, { + apiFormat: "openai", + isStreaming: req.body.stream === true, + requestCategory: "generation", + streamMode: req.body.stream === true ? this.serverSystem.streamingMode : null, + }); res.__proxyResponseStreamMode = null; - // Check current account's browser connection - if (!this.connectionRegistry.getConnectionByAuth(this.currentAuthIndex)) { - this.logger.warn(`[Request] No WebSocket connection for current account #${this.currentAuthIndex}`); - const recovered = await this._handleBrowserRecovery(res); - if (!recovered) return; - } - - // Wait for system to become ready if it's busy - { - const ready = await this._waitForSystemAndConnectionIfBusy(res); - if (!ready) return; - } - if (this.browserManager) { - this.browserManager.notifyUserActivity(); - } - - const isOpenAIStream = req.body.stream === true; - const systemStreamMode = this.serverSystem.streamingMode; - - // Handle usage counting - const usageCount = this.authSwitcher.incrementUsageCount(); - if (usageCount > 0) { - const rotationCountText = - this.config.switchOnUses > 0 ? `${usageCount}/${this.config.switchOnUses}` : `${usageCount}`; - this.logger.info( - `[Request] OpenAI generation request - account rotation count: ${rotationCountText} (Current account: ${this.currentAuthIndex})` - ); - if (this.authSwitcher.shouldSwitchByUsage()) { - this.needsSwitchingAfterRequest = true; + try { + // Check current account's browser connection + if (!this.connectionRegistry.getConnectionByAuth(this.currentAuthIndex)) { + this.logger.warn(`[Request] No WebSocket connection for current account #${this.currentAuthIndex}`); + const recovered = await this._handleBrowserRecovery(res); + if (!recovered) return; } - } - // Translate OpenAI format to Google format (also handles model name suffix parsing) - let googleBody, model, modelStreamingMode; - try { - const result = await this.formatConverter.translateOpenAIToGoogle(req.body); - googleBody = result.googleRequest; - model = result.cleanModelName; - modelStreamingMode = result.modelStreamingMode || null; - } catch (error) { - this.logger.error(`[Adapter] OpenAI request translation failed: ${error.message}`); - return this._sendErrorResponse(res, 400, "Invalid OpenAI request format."); - } + // Wait for system to become ready if it's busy + { + const ready = await this._waitForSystemAndConnectionIfBusy(res); + if (!ready) return; + } + if (this.browserManager) { + this.browserManager.notifyUserActivity(); + } - const effectiveStreamMode = modelStreamingMode || systemStreamMode; - const useRealStream = isOpenAIStream && effectiveStreamMode === "real"; + const isOpenAIStream = req.body.stream === true; + const systemStreamMode = this.serverSystem.streamingMode; - const googleEndpoint = useRealStream ? "streamGenerateContent" : "generateContent"; - const proxyRequest = { - body: JSON.stringify(googleBody), - headers: { "Content-Type": "application/json" }, - is_generative: true, - method: "POST", - path: `/v1beta/models/${model}:${googleEndpoint}`, - query_params: useRealStream ? { alt: "sse" } : {}, - request_id: requestId, - streaming_mode: useRealStream ? "real" : "fake", - }; - this._initializeProxyRequestAttempt(proxyRequest); - res.__proxyResponseStreamMode = isOpenAIStream ? (useRealStream ? "real" : "fake") : null; + // Handle usage counting + const usageCount = this.authSwitcher.incrementUsageCount(); + if (usageCount > 0) { + const rotationCountText = + this.config.switchOnUses > 0 ? `${usageCount}/${this.config.switchOnUses}` : `${usageCount}`; + this.logger.info( + `[Request] OpenAI generation request - account rotation count: ${rotationCountText} (Current account: ${this.currentAuthIndex})` + ); + if (this.authSwitcher.shouldSwitchByUsage()) { + this.needsSwitchingAfterRequest = true; + } + } - try { - // Create message queue inside try-catch to handle invalid authIndex - const messageQueue = this.connectionRegistry.createMessageQueue( - requestId, - this.currentAuthIndex, - proxyRequest.request_attempt_id - ); - this._setupClientDisconnectHandler(res, requestId); + // Translate OpenAI format to Google format (also handles model name suffix parsing) + let googleBody, model, modelStreamingMode; + try { + const result = await this.formatConverter.translateOpenAIToGoogle(req.body); + googleBody = result.googleRequest; + model = result.cleanModelName; + modelStreamingMode = result.modelStreamingMode || null; + } catch (error) { + this.logger.error(`[Adapter] OpenAI request translation failed: ${error.message}`); + return this._sendErrorResponse(res, 400, "Invalid OpenAI request format."); + } + + const effectiveStreamMode = modelStreamingMode || systemStreamMode; + const useRealStream = isOpenAIStream && effectiveStreamMode === "real"; + const googleEndpoint = useRealStream ? "streamGenerateContent" : "generateContent"; + const proxyRequest = { + body: JSON.stringify(googleBody), + headers: { "Content-Type": "application/json" }, + is_generative: true, + method: "POST", + path: `/v1beta/models/${model}:${googleEndpoint}`, + query_params: useRealStream ? { alt: "sse" } : {}, + request_id: requestId, + streaming_mode: useRealStream ? "real" : "fake", + }; + this._initializeProxyRequestAttempt(proxyRequest); + res.__proxyResponseStreamMode = isOpenAIStream ? (useRealStream ? "real" : "fake") : null; + this._updateTrackedRequest(requestId, { + isStreaming: isOpenAIStream, + model, + path: proxyRequest.path, + requestCategory: "generation", + ...(isOpenAIStream ? { streamMode: useRealStream ? "real" : "fake" } : {}), + }); - if (useRealStream) { - let currentQueue = messageQueue; - let initialMessage; - let skipFinalFailureSwitch = false; - const immediateSwitchTracker = this._createImmediateSwitchTracker(); + try { + // Create message queue inside try-catch to handle invalid authIndex + const messageQueue = this.connectionRegistry.createMessageQueue( + requestId, + this.currentAuthIndex, + proxyRequest.request_attempt_id + ); + this._setupClientDisconnectHandler(res, requestId); - // eslint-disable-next-line no-constant-condition - while (true) { - this._forwardRequest(proxyRequest); - initialMessage = await currentQueue.dequeue(); - - const initialStatus = Number(initialMessage?.status); - if ( - initialMessage.event_type === "error" && - !isUserAbortedError(initialMessage) && - Number.isFinite(initialStatus) && - this.config?.immediateSwitchStatusCodes?.includes(initialStatus) - ) { - this.logger.warn( - `[Request] OpenAI real stream received ${initialStatus}, switching account and retrying...` - ); - const switched = await this._performImmediateSwitchRetry( - initialMessage, - requestId, - immediateSwitchTracker - ); - if (!switched) { - skipFinalFailureSwitch = true; - break; - } + if (useRealStream) { + let currentQueue = messageQueue; + let initialMessage; + let skipFinalFailureSwitch = false; + const immediateSwitchTracker = this._createImmediateSwitchTracker(); - try { - currentQueue.close("retry_after_429"); - } catch { - /* empty */ - } - this._advanceProxyRequestAttempt(proxyRequest); - currentQueue = this.connectionRegistry.createMessageQueue( - requestId, + // eslint-disable-next-line no-constant-condition + while (true) { + this._getUsageStatsService()?.recordAttempt( + proxyRequest.request_id, this.currentAuthIndex, - proxyRequest.request_attempt_id + this._getAccountNameForIndex(this.currentAuthIndex) ); - continue; - } - - break; - } - - if (initialMessage.event_type === "error") { - this._logFinalRequestFailure(initialMessage, "OpenAI real stream"); + this._forwardRequest(proxyRequest); + initialMessage = await currentQueue.dequeue(); + + const initialStatus = Number(initialMessage?.status); + if ( + initialMessage.event_type === "error" && + !isUserAbortedError(initialMessage) && + Number.isFinite(initialStatus) && + this.config?.immediateSwitchStatusCodes?.includes(initialStatus) + ) { + this.logger.warn( + `[Request] OpenAI real stream received ${initialStatus}, switching account and retrying...` + ); + const switched = await this._performImmediateSwitchRetry( + initialMessage, + requestId, + immediateSwitchTracker + ); + if (!switched) { + skipFinalFailureSwitch = true; + break; + } - // Send standard HTTP error response - this._sendErrorResponse(res, initialMessage.status || 500, initialMessage.message); + try { + currentQueue.close("retry_after_429"); + } catch { + /* empty */ + } + this._advanceProxyRequestAttempt(proxyRequest); + currentQueue = this.connectionRegistry.createMessageQueue( + requestId, + this.currentAuthIndex, + proxyRequest.request_attempt_id + ); + continue; + } - // Avoid switching account if the error is just a connection reset - if (!skipFinalFailureSwitch && !this._isConnectionResetError(initialMessage)) { - await this.authSwitcher.handleRequestFailureAndSwitch(initialMessage, null); - } else if (skipFinalFailureSwitch) { - this.logger.info( - "[Request] Immediate-switch retries exhausted, skipping additional account switch." - ); - } else { - this.logger.info( - "[Request] Failure due to connection reset (Real Stream), skipping account switch." - ); + break; } - return; - } - - if (this.authSwitcher.failureCount > 0) { - this.logger.debug( - `✅ [Auth] OpenAI interface request successful - failure count reset from ${this.authSwitcher.failureCount} to 0` - ); - this.authSwitcher.failureCount = 0; - } - - res.status(200).set({ - "Cache-Control": "no-cache", - Connection: "keep-alive", - "Content-Type": "text/event-stream", - }); - this.logger.info(`[Request] OpenAI streaming response (Real Mode) started...`); - await this._streamOpenAIResponse(currentQueue, res, model); - } else { - // OpenAI Fake Stream / Non-Stream mode - // Set up keep-alive timer for fake stream mode to prevent client timeout - let connectionMaintainer; - if (isOpenAIStream) { - const scheduleNextKeepAlive = () => { - const randomInterval = 12000 + Math.floor(Math.random() * 6000); // 12 - 18 seconds - connectionMaintainer = setTimeout(() => { - if (!res.headersSent) { - res.status(200).set({ - "Cache-Control": "no-cache", - Connection: "keep-alive", - "Content-Type": "text/event-stream", - }); - } - if (!res.writableEnded) { - res.write(": keep-alive\n\n"); - scheduleNextKeepAlive(); - } - }, randomInterval); - }; - scheduleNextKeepAlive(); - } - try { - const result = await this._executeRequestWithRetries(proxyRequest, messageQueue); + if (initialMessage.event_type === "error") { + this._logFinalRequestFailure(initialMessage, "OpenAI real stream"); - if (!result.success) { - this._logFinalRequestFailure(result.error, "OpenAI fake/non-stream"); - // Send standard HTTP error response for both streaming and non-streaming - if (connectionMaintainer) clearTimeout(connectionMaintainer); - if (isOpenAIStream && res.headersSent) { - // If keep-alives already started the SSE response, send an SSE error event instead of JSON. - this._handleRequestError(result.error, res, "openai"); - } else { - this._sendErrorResponse(res, result.error.status || 500, result.error.message); - } + // Send standard HTTP error response + this._sendErrorResponse(res, initialMessage.status || 500, initialMessage.message); // Avoid switching account if the error is just a connection reset - if (!result.error.skipAccountSwitch && !this._isConnectionResetError(result.error)) { - await this.authSwitcher.handleRequestFailureAndSwitch(result.error, null); - } else if (result.error.skipAccountSwitch) { + if (!skipFinalFailureSwitch && !this._isConnectionResetError(initialMessage)) { + await this.authSwitcher.handleRequestFailureAndSwitch(initialMessage, null); + } else if (skipFinalFailureSwitch) { this.logger.info( "[Request] Immediate-switch retries exhausted, skipping additional account switch." ); } else { this.logger.info( - "[Request] Failure due to connection reset (OpenAI), skipping account switch." + "[Request] Failure due to connection reset (Real Stream), skipping account switch." ); } return; } if (this.authSwitcher.failureCount > 0) { - this.logger.debug(`✅ [Auth] OpenAI interface request successful - failure count reset to 0`); + this.logger.debug( + `✅ [Auth] OpenAI interface request successful - failure count reset from ${this.authSwitcher.failureCount} to 0` + ); this.authSwitcher.failureCount = 0; } - // Use the queue that successfully received the initial message - const activeQueue = result.queue; - + res.status(200).set({ + "Cache-Control": "no-cache", + Connection: "keep-alive", + "Content-Type": "text/event-stream", + }); + this.logger.info(`[Request] OpenAI streaming response (Real Mode) started...`); + await this._streamOpenAIResponse(currentQueue, res, model); + } else { + // OpenAI Fake Stream / Non-Stream mode + // Set up keep-alive timer for fake stream mode to prevent client timeout + let connectionMaintainer; if (isOpenAIStream) { - // Fake stream - ensure headers are set before sending data - if (!res.headersSent) { - res.status(200).set({ - "Cache-Control": "no-cache", - Connection: "keep-alive", - "Content-Type": "text/event-stream", - }); - } - // Clear keep-alive timer as we are about to send real data - if (connectionMaintainer) clearTimeout(connectionMaintainer); - - this.logger.info(`[Request] OpenAI streaming response (Fake Mode) started...`); - let fullBody = ""; - let hadStreamError = false; - try { - // eslint-disable-next-line no-constant-condition - while (true) { - const message = await activeQueue.dequeue(this.timeouts.FAKE_STREAM); - if (message.type === "STREAM_END") { - break; + const scheduleNextKeepAlive = () => { + const randomInterval = 12000 + Math.floor(Math.random() * 6000); // 12 - 18 seconds + connectionMaintainer = setTimeout(() => { + if (!res.headersSent) { + res.status(200).set({ + "Cache-Control": "no-cache", + Connection: "keep-alive", + "Content-Type": "text/event-stream", + }); } - - if (message.event_type === "error") { - this.logger.error( - `[Request] Error received during OpenAI fake stream: ${message.message}` - ); - hadStreamError = true; - // Check if response is still writable before attempting to write - if (this._isResponseWritable(res)) { - try { - res.write( - `data: ${JSON.stringify({ error: { code: 500, message: message.message, type: "api_error" } })}\n\n` - ); - } catch (writeError) { - this.logger.debug( - `[Request] Failed to write error to OpenAI fake stream: ${writeError.message}` - ); - } - } - break; + if (!res.writableEnded) { + res.write(": keep-alive\n\n"); + scheduleNextKeepAlive(); } + }, randomInterval); + }; + scheduleNextKeepAlive(); + } - if (message.data) fullBody += message.data; - } - if (hadStreamError) { - // Backend errored; don't attempt to translate/send a "normal" stream afterwards. - return; + try { + const result = await this._executeRequestWithRetries(proxyRequest, messageQueue); + + if (!result.success) { + this._logFinalRequestFailure(result.error, "OpenAI fake/non-stream"); + // Send standard HTTP error response for both streaming and non-streaming + if (connectionMaintainer) clearTimeout(connectionMaintainer); + if (isOpenAIStream && res.headersSent) { + // If keep-alives already started the SSE response, send an SSE error event instead of JSON. + this._handleRequestError(result.error, res, "openai"); + } else { + this._sendErrorResponse(res, result.error.status || 500, result.error.message); } - const streamState = {}; - const translatedChunk = this.formatConverter.translateGoogleToOpenAIStream( - fullBody, - model, - streamState - ); - if (this._isResponseWritable(res)) { - try { - if (translatedChunk) { - res.write(translatedChunk); - } - res.write("data: [DONE]\n\n"); - } catch (writeError) { - this.logger.debug( - `[Request] Failed to write final fake OpenAI stream chunks: ${writeError.message}` - ); - } + + // Avoid switching account if the error is just a connection reset + if (!result.error.skipAccountSwitch && !this._isConnectionResetError(result.error)) { + await this.authSwitcher.handleRequestFailureAndSwitch(result.error, null); + } else if (result.error.skipAccountSwitch) { + this.logger.info( + "[Request] Immediate-switch retries exhausted, skipping additional account switch." + ); } else { - this.logger.debug( - "[Request] Response no longer writable before final fake OpenAI stream chunks." + this.logger.info( + "[Request] Failure due to connection reset (OpenAI), skipping account switch." ); } - this.logger.info("[Request] Fake mode: Complete content sent at once."); - } catch (error) { - // Classify error type and send appropriate response - this._handleFakeStreamError(error, res, "openai"); + return; } - } else { - // Non-stream - await this._sendOpenAINonStreamResponse(activeQueue, res, model); - } - } finally { - if (connectionMaintainer) clearTimeout(connectionMaintainer); - } - } - } catch (error) { - // Handle queue timeout by notifying browser - this._handleQueueTimeout(error, requestId); - this._handleRequestError(error, res); - } finally { - this.connectionRegistry.removeMessageQueue(requestId, "request_complete"); - if (this.needsSwitchingAfterRequest) { - this.logger.info( - `[Auth] Rotation count reached switching threshold (${this.authSwitcher.usageCount}/${this.config.switchOnUses}), will automatically switch account in background...` - ); - this.authSwitcher.switchToNextAuth().catch(err => { - this.logger.error(`[Auth] Background account switching task failed: ${err.message}`); - }); - this.needsSwitchingAfterRequest = false; + if (this.authSwitcher.failureCount > 0) { + this.logger.debug( + `✅ [Auth] OpenAI interface request successful - failure count reset to 0` + ); + this.authSwitcher.failureCount = 0; + } + + // Use the queue that successfully received the initial message + const activeQueue = result.queue; + + if (isOpenAIStream) { + // Fake stream - ensure headers are set before sending data + if (!res.headersSent) { + res.status(200).set({ + "Cache-Control": "no-cache", + Connection: "keep-alive", + "Content-Type": "text/event-stream", + }); + } + // Clear keep-alive timer as we are about to send real data + if (connectionMaintainer) clearTimeout(connectionMaintainer); + + this.logger.info(`[Request] OpenAI streaming response (Fake Mode) started...`); + let fullBody = ""; + let hadStreamError = false; + try { + // eslint-disable-next-line no-constant-condition + while (true) { + const message = await activeQueue.dequeue(this.timeouts.FAKE_STREAM); + if (message.type === "STREAM_END") { + break; + } + + if (message.event_type === "error") { + this.logger.error( + `[Request] Error received during OpenAI fake stream: ${message.message}` + ); + this._markTrackedResponseError(res, message.message, 500); + hadStreamError = true; + // Check if response is still writable before attempting to write + if (this._isResponseWritable(res)) { + try { + res.write( + `data: ${JSON.stringify({ error: { code: 500, message: message.message, type: "api_error" } })}\n\n` + ); + } catch (writeError) { + this.logger.debug( + `[Request] Failed to write error to OpenAI fake stream: ${writeError.message}` + ); + } + } + break; + } + + if (message.data) fullBody += message.data; + } + if (hadStreamError) { + // Backend errored; don't attempt to translate/send a "normal" stream afterwards. + return; + } + const streamState = {}; + const translatedChunk = this.formatConverter.translateGoogleToOpenAIStream( + fullBody, + model, + streamState + ); + if (this._isResponseWritable(res)) { + try { + if (translatedChunk) { + res.write(translatedChunk); + } + res.write("data: [DONE]\n\n"); + } catch (writeError) { + this.logger.debug( + `[Request] Failed to write final fake OpenAI stream chunks: ${writeError.message}` + ); + } + } else { + this.logger.debug( + "[Request] Response no longer writable before final fake OpenAI stream chunks." + ); + } + this.logger.info("[Request] Fake mode: Complete content sent at once."); + } catch (error) { + // Classify error type and send appropriate response + this._handleFakeStreamError(error, res, "openai"); + } + } else { + // Non-stream + await this._sendOpenAINonStreamResponse(activeQueue, res, model); + } + } finally { + if (connectionMaintainer) clearTimeout(connectionMaintainer); + } + } + } catch (error) { + // Handle queue timeout by notifying browser + this._handleQueueTimeout(error, requestId); + + this._handleRequestError(error, res); + } finally { + this.connectionRegistry.removeMessageQueue(requestId, "request_complete"); + if (this.needsSwitchingAfterRequest) { + this.logger.info( + `[Auth] Rotation count reached switching threshold (${this.authSwitcher.usageCount}/${this.config.switchOnUses}), will automatically switch account in background...` + ); + this.authSwitcher.switchToNextAuth().catch(err => { + this.logger.error(`[Auth] Background account switching task failed: ${err.message}`); + }); + this.needsSwitchingAfterRequest = false; + } + if (!res.writableEnded) res.end(); } - if (!res.writableEnded) res.end(); + } finally { + this._finalizeTrackedRequest(requestId, res); } } // Process OpenAI Response API format requests async processOpenAIResponseRequest(req, res) { const requestId = this._generateRequestId(); + this._startTrackedRequest(requestId, req, { + apiFormat: "response_api", + isStreaming: req.body.stream === true, + requestCategory: "generation", + streamMode: req.body.stream === true ? this.serverSystem.streamingMode : null, + }); res.__proxyResponseStreamMode = null; - // Check current account's browser connection - if (!this.connectionRegistry.getConnectionByAuth(this.currentAuthIndex)) { - this.logger.warn(`[Request] No WebSocket connection for current account #${this.currentAuthIndex}`); - const recovered = await this._handleBrowserRecovery(res); - if (!recovered) return; - } - - // Wait for system to become ready if it's busy - { - const ready = await this._waitForSystemAndConnectionIfBusy(res); - if (!ready) return; - } - if (this.browserManager) { - this.browserManager.notifyUserActivity(); - } - - const isOpenAIStream = req.body.stream === true; - const normalizeInstructions = value => { - if (typeof value === "string") return value; - if (!Array.isArray(value)) return null; - const chunks = []; - for (const item of value) { - if (!item || typeof item !== "object") continue; - const content = item.content; - if (typeof content === "string") { - chunks.push(content); - continue; - } - if (!Array.isArray(content)) continue; - for (const part of content) { - if (!part || typeof part !== "object") continue; - if (part.type === "text" || part.type === "input_text") { - if (typeof part.text === "string" && part.text) chunks.push(part.text); + try { + // Check current account's browser connection + if (!this.connectionRegistry.getConnectionByAuth(this.currentAuthIndex)) { + this.logger.warn(`[Request] No WebSocket connection for current account #${this.currentAuthIndex}`); + const recovered = await this._handleBrowserRecovery(res); + if (!recovered) return; + } + + // Wait for system to become ready if it's busy + { + const ready = await this._waitForSystemAndConnectionIfBusy(res); + if (!ready) return; + } + if (this.browserManager) { + this.browserManager.notifyUserActivity(); + } + + const isOpenAIStream = req.body.stream === true; + const normalizeInstructions = value => { + if (typeof value === "string") return value; + if (!Array.isArray(value)) return null; + const chunks = []; + for (const item of value) { + if (!item || typeof item !== "object") continue; + const content = item.content; + if (typeof content === "string") { + chunks.push(content); + continue; + } + if (!Array.isArray(content)) continue; + for (const part of content) { + if (!part || typeof part !== "object") continue; + if (part.type === "text" || part.type === "input_text") { + if (typeof part.text === "string" && part.text) chunks.push(part.text); + } } } - } - return chunks.length > 0 ? chunks.join("\n") : null; - }; - const responseDefaultsRaw = { - instructions: normalizeInstructions(req.body?.instructions), - max_output_tokens: req.body?.max_output_tokens ?? null, - metadata: - req.body?.metadata && typeof req.body.metadata === "object" && !Array.isArray(req.body.metadata) - ? req.body.metadata - : {}, - parallel_tool_calls: - typeof req.body?.parallel_tool_calls === "boolean" ? req.body.parallel_tool_calls : true, - reasoning: - req.body?.reasoning && typeof req.body.reasoning === "object" && !Array.isArray(req.body.reasoning) - ? req.body.reasoning - : undefined, - temperature: typeof req.body?.temperature === "number" ? req.body.temperature : undefined, - text: - req.body?.text && typeof req.body.text === "object" && !Array.isArray(req.body.text) - ? req.body.text - : undefined, - tool_choice: req.body?.tool_choice ?? undefined, - tools: Array.isArray(req.body?.tools) ? req.body.tools : undefined, - top_p: typeof req.body?.top_p === "number" ? req.body.top_p : undefined, - truncation: typeof req.body?.truncation === "string" ? req.body.truncation : undefined, - user: typeof req.body?.user === "string" ? req.body.user : undefined, - }; - - const responseDefaults = Object.fromEntries( - Object.entries(responseDefaultsRaw).filter(([, v]) => v !== undefined) - ); - const systemStreamMode = this.serverSystem.streamingMode; + return chunks.length > 0 ? chunks.join("\n") : null; + }; + const responseDefaultsRaw = { + instructions: normalizeInstructions(req.body?.instructions), + max_output_tokens: req.body?.max_output_tokens ?? null, + metadata: + req.body?.metadata && typeof req.body.metadata === "object" && !Array.isArray(req.body.metadata) + ? req.body.metadata + : {}, + parallel_tool_calls: + typeof req.body?.parallel_tool_calls === "boolean" ? req.body.parallel_tool_calls : true, + reasoning: + req.body?.reasoning && typeof req.body.reasoning === "object" && !Array.isArray(req.body.reasoning) + ? req.body.reasoning + : undefined, + temperature: typeof req.body?.temperature === "number" ? req.body.temperature : undefined, + text: + req.body?.text && typeof req.body.text === "object" && !Array.isArray(req.body.text) + ? req.body.text + : undefined, + tool_choice: req.body?.tool_choice ?? undefined, + tools: Array.isArray(req.body?.tools) ? req.body.tools : undefined, + top_p: typeof req.body?.top_p === "number" ? req.body.top_p : undefined, + truncation: typeof req.body?.truncation === "string" ? req.body.truncation : undefined, + user: typeof req.body?.user === "string" ? req.body.user : undefined, + }; - // Handle usage counting - const usageCount = this.authSwitcher.incrementUsageCount(); - if (usageCount > 0) { - const rotationCountText = - this.config.switchOnUses > 0 ? `${usageCount}/${this.config.switchOnUses}` : `${usageCount}`; - this.logger.info( - `[Request] OpenAI Response generation request - account rotation count: ${rotationCountText} (Current account: ${this.currentAuthIndex})` + const responseDefaults = Object.fromEntries( + Object.entries(responseDefaultsRaw).filter(([, v]) => v !== undefined) ); - if (this.authSwitcher.shouldSwitchByUsage()) { - this.needsSwitchingAfterRequest = true; - } - } - - // Translate OpenAI Response format to Google format - let googleBody, model, modelStreamingMode; - try { - const result = await this.formatConverter.translateOpenAIResponseToGoogle(req.body); - googleBody = result.googleRequest; - model = result.cleanModelName; - modelStreamingMode = result.modelStreamingMode || null; - } catch (error) { - this.logger.error(`[Adapter] OpenAI Response request translation failed: ${error.message}`); - return this._sendErrorResponse(res, 400, "Invalid OpenAI Response request format."); - } + const systemStreamMode = this.serverSystem.streamingMode; - const effectiveStreamMode = modelStreamingMode || systemStreamMode; - const useRealStream = isOpenAIStream && effectiveStreamMode === "real"; - - const googleEndpoint = useRealStream ? "streamGenerateContent" : "generateContent"; - const proxyRequest = { - body: JSON.stringify(googleBody), - headers: { "Content-Type": "application/json" }, - is_generative: true, - method: "POST", - path: `/v1beta/models/${model}:${googleEndpoint}`, - query_params: useRealStream ? { alt: "sse" } : {}, - request_id: requestId, - streaming_mode: useRealStream ? "real" : "fake", - }; - this._initializeProxyRequestAttempt(proxyRequest); - res.__proxyResponseStreamMode = isOpenAIStream ? (useRealStream ? "real" : "fake") : null; + // Handle usage counting + const usageCount = this.authSwitcher.incrementUsageCount(); + if (usageCount > 0) { + const rotationCountText = + this.config.switchOnUses > 0 ? `${usageCount}/${this.config.switchOnUses}` : `${usageCount}`; + this.logger.info( + `[Request] OpenAI Response generation request - account rotation count: ${rotationCountText} (Current account: ${this.currentAuthIndex})` + ); + if (this.authSwitcher.shouldSwitchByUsage()) { + this.needsSwitchingAfterRequest = true; + } + } - try { - // Create message queue inside try-catch to handle invalid authIndex - const messageQueue = this.connectionRegistry.createMessageQueue( - requestId, - this.currentAuthIndex, - proxyRequest.request_attempt_id - ); - this._setupClientDisconnectHandler(res, requestId); + // Translate OpenAI Response format to Google format + let googleBody, model, modelStreamingMode; + try { + const result = await this.formatConverter.translateOpenAIResponseToGoogle(req.body); + googleBody = result.googleRequest; + model = result.cleanModelName; + modelStreamingMode = result.modelStreamingMode || null; + } catch (error) { + this.logger.error(`[Adapter] OpenAI Response request translation failed: ${error.message}`); + return this._sendErrorResponse(res, 400, "Invalid OpenAI Response request format."); + } + + const effectiveStreamMode = modelStreamingMode || systemStreamMode; + const useRealStream = isOpenAIStream && effectiveStreamMode === "real"; + + const googleEndpoint = useRealStream ? "streamGenerateContent" : "generateContent"; + const proxyRequest = { + body: JSON.stringify(googleBody), + headers: { "Content-Type": "application/json" }, + is_generative: true, + method: "POST", + path: `/v1beta/models/${model}:${googleEndpoint}`, + query_params: useRealStream ? { alt: "sse" } : {}, + request_id: requestId, + streaming_mode: useRealStream ? "real" : "fake", + }; + this._initializeProxyRequestAttempt(proxyRequest); + res.__proxyResponseStreamMode = isOpenAIStream ? (useRealStream ? "real" : "fake") : null; + this._updateTrackedRequest(requestId, { + isStreaming: isOpenAIStream, + model, + path: proxyRequest.path, + requestCategory: "generation", + ...(isOpenAIStream ? { streamMode: useRealStream ? "real" : "fake" } : {}), + }); - if (useRealStream) { - let currentQueue = messageQueue; - let initialMessage; - let skipFinalFailureSwitch = false; - const immediateSwitchTracker = this._createImmediateSwitchTracker(); + try { + // Create message queue inside try-catch to handle invalid authIndex + const messageQueue = this.connectionRegistry.createMessageQueue( + requestId, + this.currentAuthIndex, + proxyRequest.request_attempt_id + ); + this._setupClientDisconnectHandler(res, requestId); - // eslint-disable-next-line no-constant-condition - while (true) { - this._forwardRequest(proxyRequest); - initialMessage = await currentQueue.dequeue(); - - const initialStatus = Number(initialMessage?.status); - if ( - initialMessage.event_type === "error" && - !isUserAbortedError(initialMessage) && - Number.isFinite(initialStatus) && - this.config?.immediateSwitchStatusCodes?.includes(initialStatus) - ) { - this.logger.warn( - `[Request] OpenAI Response API real stream received ${initialStatus}, switching account and retrying...` - ); - const switched = await this._performImmediateSwitchRetry( - initialMessage, - requestId, - immediateSwitchTracker - ); - if (!switched) { - skipFinalFailureSwitch = true; - break; - } + if (useRealStream) { + let currentQueue = messageQueue; + let initialMessage; + let skipFinalFailureSwitch = false; + const immediateSwitchTracker = this._createImmediateSwitchTracker(); - try { - currentQueue.close("retry_after_429"); - } catch { - /* empty */ - } - this._advanceProxyRequestAttempt(proxyRequest); - currentQueue = this.connectionRegistry.createMessageQueue( - requestId, + // eslint-disable-next-line no-constant-condition + while (true) { + this._getUsageStatsService()?.recordAttempt( + proxyRequest.request_id, this.currentAuthIndex, - proxyRequest.request_attempt_id + this._getAccountNameForIndex(this.currentAuthIndex) ); - continue; - } - - break; - } - - if (initialMessage.event_type === "error") { - this._logFinalRequestFailure(initialMessage, "OpenAI Response API real stream"); + this._forwardRequest(proxyRequest); + initialMessage = await currentQueue.dequeue(); + + const initialStatus = Number(initialMessage?.status); + if ( + initialMessage.event_type === "error" && + !isUserAbortedError(initialMessage) && + Number.isFinite(initialStatus) && + this.config?.immediateSwitchStatusCodes?.includes(initialStatus) + ) { + this.logger.warn( + `[Request] OpenAI Response API real stream received ${initialStatus}, switching account and retrying...` + ); + const switched = await this._performImmediateSwitchRetry( + initialMessage, + requestId, + immediateSwitchTracker + ); + if (!switched) { + skipFinalFailureSwitch = true; + break; + } - // Send standard HTTP error response - this._sendErrorResponse(res, initialMessage.status || 500, initialMessage.message); + try { + currentQueue.close("retry_after_429"); + } catch { + /* empty */ + } + this._advanceProxyRequestAttempt(proxyRequest); + currentQueue = this.connectionRegistry.createMessageQueue( + requestId, + this.currentAuthIndex, + proxyRequest.request_attempt_id + ); + continue; + } - // Avoid switching account if the error is just a connection reset - if (!skipFinalFailureSwitch && !this._isConnectionResetError(initialMessage)) { - await this.authSwitcher.handleRequestFailureAndSwitch(initialMessage, null); - } else if (skipFinalFailureSwitch) { - this.logger.info( - "[Request] Immediate-switch retries exhausted, skipping additional account switch." - ); - } else { - this.logger.info( - "[Request] Failure due to connection reset (Real Stream), skipping account switch." - ); + break; } - return; - } - - if (this.authSwitcher.failureCount > 0) { - this.logger.debug( - `✅ [Auth] OpenAI Response API request successful - failure count reset from ${this.authSwitcher.failureCount} to 0` - ); - this.authSwitcher.failureCount = 0; - } - - res.status(200).set({ - "Cache-Control": "no-cache", - Connection: "keep-alive", - "Content-Type": "text/event-stream", - }); - this.logger.info(`[Request] OpenAI Response API streaming response (Real Mode) started...`); - await this._streamOpenAIResponseAPIResponse(currentQueue, res, model, { - responseDefaults, - }); - } else { - // OpenAI Response API Fake Stream / Non-Stream mode - // Set up keep-alive timer for fake stream mode to prevent client timeout - let connectionMaintainer; - if (isOpenAIStream) { - const scheduleNextKeepAlive = () => { - const randomInterval = 12000 + Math.floor(Math.random() * 6000); // 12 - 18 seconds - connectionMaintainer = setTimeout(() => { - if (!res.headersSent) { - res.status(200).set({ - "Cache-Control": "no-cache", - Connection: "keep-alive", - "Content-Type": "text/event-stream", - }); - } - if (!res.writableEnded) { - res.write(": keep-alive\n\n"); - scheduleNextKeepAlive(); - } - }, randomInterval); - }; - scheduleNextKeepAlive(); - } - try { - const result = await this._executeRequestWithRetries(proxyRequest, messageQueue); + if (initialMessage.event_type === "error") { + this._logFinalRequestFailure(initialMessage, "OpenAI Response API real stream"); - if (!result.success) { - this._logFinalRequestFailure(result.error, "OpenAI Response API fake/non-stream"); - // Send standard HTTP error response for both streaming and non-streaming - if (connectionMaintainer) clearTimeout(connectionMaintainer); - if (isOpenAIStream && res.headersSent) { - // If keep-alives already started the SSE response, send an SSE error event instead of JSON. - this._handleRequestError(result.error, res, "response_api"); - } else { - this._sendErrorResponse(res, result.error.status || 500, result.error.message); - } + // Send standard HTTP error response + this._sendErrorResponse(res, initialMessage.status || 500, initialMessage.message); // Avoid switching account if the error is just a connection reset - if (!result.error.skipAccountSwitch && !this._isConnectionResetError(result.error)) { - await this.authSwitcher.handleRequestFailureAndSwitch(result.error, null); - } else if (result.error.skipAccountSwitch) { + if (!skipFinalFailureSwitch && !this._isConnectionResetError(initialMessage)) { + await this.authSwitcher.handleRequestFailureAndSwitch(initialMessage, null); + } else if (skipFinalFailureSwitch) { this.logger.info( "[Request] Immediate-switch retries exhausted, skipping additional account switch." ); } else { this.logger.info( - "[Request] Failure due to connection reset (Response API), skipping account switch." + "[Request] Failure due to connection reset (Real Stream), skipping account switch." ); } return; @@ -1347,327 +1465,368 @@ class RequestHandler { if (this.authSwitcher.failureCount > 0) { this.logger.debug( - `✅ [Auth] OpenAI Response API request successful - failure count reset to 0` + `✅ [Auth] OpenAI Response API request successful - failure count reset from ${this.authSwitcher.failureCount} to 0` ); this.authSwitcher.failureCount = 0; } - // Use the queue that successfully received the initial message - const activeQueue = result.queue; - + res.status(200).set({ + "Cache-Control": "no-cache", + Connection: "keep-alive", + "Content-Type": "text/event-stream", + }); + this.logger.info(`[Request] OpenAI Response API streaming response (Real Mode) started...`); + await this._streamOpenAIResponseAPIResponse(currentQueue, res, model, { + responseDefaults, + }); + } else { + // OpenAI Response API Fake Stream / Non-Stream mode + // Set up keep-alive timer for fake stream mode to prevent client timeout + let connectionMaintainer; if (isOpenAIStream) { - // Fake stream - ensure headers are set before sending data - if (!res.headersSent) { - res.status(200).set({ - "Cache-Control": "no-cache", - Connection: "keep-alive", - "Content-Type": "text/event-stream", - }); - } - // Clear keep-alive timer as we are about to send real data - if (connectionMaintainer) clearTimeout(connectionMaintainer); - - this.logger.info(`[Request] OpenAI Response API streaming response (Fake Mode) started...`); - let fullBody = ""; - if (res.__responseApiSeq == null) res.__responseApiSeq = 0; - let hadStreamError = false; - try { - // eslint-disable-next-line no-constant-condition - while (true) { - const message = await activeQueue.dequeue(this.timeouts.FAKE_STREAM); - if (message.type === "STREAM_END") { - break; + const scheduleNextKeepAlive = () => { + const randomInterval = 12000 + Math.floor(Math.random() * 6000); // 12 - 18 seconds + connectionMaintainer = setTimeout(() => { + if (!res.headersSent) { + res.status(200).set({ + "Cache-Control": "no-cache", + Connection: "keep-alive", + "Content-Type": "text/event-stream", + }); } - - if (message.event_type === "error") { - this.logger.error( - `[Request] Error received during OpenAI Response API fake stream: ${message.message}` - ); - hadStreamError = true; - // Check if response is still writable before attempting to write - if (this._isResponseWritable(res)) { - try { - res.__responseApiSeq += 1; - res.write( - `event: error\ndata: ${JSON.stringify({ - code: "api_error", - message: message.message, - param: null, - sequence_number: res.__responseApiSeq, - type: "error", - })}\n\n` - ); - } catch (writeError) { - this.logger.debug( - `[Request] Failed to write error to OpenAI Response API fake stream: ${writeError.message}` - ); - } - } - break; + if (!res.writableEnded) { + res.write(": keep-alive\n\n"); + scheduleNextKeepAlive(); } + }, randomInterval); + }; + scheduleNextKeepAlive(); + } - if (message.data) fullBody += message.data; + try { + const result = await this._executeRequestWithRetries(proxyRequest, messageQueue); + + if (!result.success) { + this._logFinalRequestFailure(result.error, "OpenAI Response API fake/non-stream"); + // Send standard HTTP error response for both streaming and non-streaming + if (connectionMaintainer) clearTimeout(connectionMaintainer); + if (isOpenAIStream && res.headersSent) { + // If keep-alives already started the SSE response, send an SSE error event instead of JSON. + this._handleRequestError(result.error, res, "response_api"); + } else { + this._sendErrorResponse(res, result.error.status || 500, result.error.message); } - // If backend errored, don't attempt to translate/send a "normal" Responses stream afterwards. - if (hadStreamError) { - return; + // Avoid switching account if the error is just a connection reset + if (!result.error.skipAccountSwitch && !this._isConnectionResetError(result.error)) { + await this.authSwitcher.handleRequestFailureAndSwitch(result.error, null); + } else if (result.error.skipAccountSwitch) { + this.logger.info( + "[Request] Immediate-switch retries exhausted, skipping additional account switch." + ); + } else { + this.logger.info( + "[Request] Failure due to connection reset (Response API), skipping account switch." + ); } + return; + } - const streamState = {}; - streamState.responseDefaults = responseDefaults; - const translatedChunk = this.formatConverter.translateGoogleToResponseAPIStream( - fullBody, - model, - streamState + if (this.authSwitcher.failureCount > 0) { + this.logger.debug( + `✅ [Auth] OpenAI Response API request successful - failure count reset to 0` ); - if (this._isResponseWritable(res)) { - try { - if (translatedChunk) { - res.write(translatedChunk); + this.authSwitcher.failureCount = 0; + } + + // Use the queue that successfully received the initial message + const activeQueue = result.queue; + + if (isOpenAIStream) { + // Fake stream - ensure headers are set before sending data + if (!res.headersSent) { + res.status(200).set({ + "Cache-Control": "no-cache", + Connection: "keep-alive", + "Content-Type": "text/event-stream", + }); + } + // Clear keep-alive timer as we are about to send real data + if (connectionMaintainer) clearTimeout(connectionMaintainer); + + this.logger.info(`[Request] OpenAI Response API streaming response (Fake Mode) started...`); + let fullBody = ""; + if (res.__responseApiSeq == null) res.__responseApiSeq = 0; + let hadStreamError = false; + try { + // eslint-disable-next-line no-constant-condition + while (true) { + const message = await activeQueue.dequeue(this.timeouts.FAKE_STREAM); + if (message.type === "STREAM_END") { + break; + } + + if (message.event_type === "error") { + this.logger.error( + `[Request] Error received during OpenAI Response API fake stream: ${message.message}` + ); + this._markTrackedResponseError(res, message.message, 500); + hadStreamError = true; + // Check if response is still writable before attempting to write + if (this._isResponseWritable(res)) { + try { + res.__responseApiSeq += 1; + res.write( + `event: error\ndata: ${JSON.stringify({ + code: "api_error", + message: message.message, + param: null, + sequence_number: res.__responseApiSeq, + type: "error", + })}\n\n` + ); + } catch (writeError) { + this.logger.debug( + `[Request] Failed to write error to OpenAI Response API fake stream: ${writeError.message}` + ); + } + } + break; + } + + if (message.data) fullBody += message.data; + } + + // If backend errored, don't attempt to translate/send a "normal" Responses stream afterwards. + if (hadStreamError) { + return; + } + + const streamState = {}; + streamState.responseDefaults = responseDefaults; + const translatedChunk = this.formatConverter.translateGoogleToResponseAPIStream( + fullBody, + model, + streamState + ); + if (this._isResponseWritable(res)) { + try { + if (translatedChunk) { + res.write(translatedChunk); + } + } catch (writeError) { + this.logger.debug( + `[Request] Failed to write final fake OpenAI Response API stream chunks: ${writeError.message}` + ); } - } catch (writeError) { + } else { this.logger.debug( - `[Request] Failed to write final fake OpenAI Response API stream chunks: ${writeError.message}` + "[Request] Response no longer writable before final fake OpenAI Response API stream chunks." ); } - } else { - this.logger.debug( - "[Request] Response no longer writable before final fake OpenAI Response API stream chunks." - ); + this.logger.info("[Request] Fake mode: Complete content sent at once."); + } catch (error) { + // Classify error type and send appropriate response + this._handleFakeStreamError(error, res, "response_api"); } - this.logger.info("[Request] Fake mode: Complete content sent at once."); - } catch (error) { - // Classify error type and send appropriate response - this._handleFakeStreamError(error, res, "response_api"); + } else { + // Non-stream + await this._sendOpenAIResponseAPINonStreamResponse( + activeQueue, + res, + model, + responseDefaults + ); } - } else { - // Non-stream - await this._sendOpenAIResponseAPINonStreamResponse(activeQueue, res, model, responseDefaults); + } finally { + if (connectionMaintainer) clearTimeout(connectionMaintainer); } - } finally { - if (connectionMaintainer) clearTimeout(connectionMaintainer); } - } - } catch (error) { - // Handle queue timeout by notifying browser - this._handleQueueTimeout(error, requestId); + } catch (error) { + // Handle queue timeout by notifying browser + this._handleQueueTimeout(error, requestId); - this._handleRequestError(error, res, "response_api"); - } finally { - this.connectionRegistry.removeMessageQueue(requestId, "request_complete"); - if (this.needsSwitchingAfterRequest) { - this.logger.info( - `[Auth] Rotation count reached switching threshold (${this.authSwitcher.usageCount}/${this.config.switchOnUses}), will automatically switch account in background...` - ); - this.authSwitcher.switchToNextAuth().catch(err => { - this.logger.error(`[Auth] Background account switching task failed: ${err.message}`); - }); - this.needsSwitchingAfterRequest = false; + this._handleRequestError(error, res, "response_api"); + } finally { + this.connectionRegistry.removeMessageQueue(requestId, "request_complete"); + if (this.needsSwitchingAfterRequest) { + this.logger.info( + `[Auth] Rotation count reached switching threshold (${this.authSwitcher.usageCount}/${this.config.switchOnUses}), will automatically switch account in background...` + ); + this.authSwitcher.switchToNextAuth().catch(err => { + this.logger.error(`[Auth] Background account switching task failed: ${err.message}`); + }); + this.needsSwitchingAfterRequest = false; + } + if (!res.writableEnded) res.end(); } - if (!res.writableEnded) res.end(); + } finally { + this._finalizeTrackedRequest(requestId, res); } } // Process Claude API format requests async processClaudeRequest(req, res) { const requestId = this._generateRequestId(); + this._startTrackedRequest(requestId, req, { + apiFormat: "claude", + isStreaming: req.body.stream === true, + requestCategory: "generation", + streamMode: req.body.stream === true ? this.serverSystem.streamingMode : null, + }); res.__proxyResponseStreamMode = null; - // Check current account's browser connection - if (!this.connectionRegistry.getConnectionByAuth(this.currentAuthIndex)) { - this.logger.warn(`[Request] No WebSocket connection for current account #${this.currentAuthIndex}`); - const recovered = await this._handleBrowserRecovery(res); - if (!recovered) return; - } - - // Wait for system to become ready if it's busy - { - const ready = await this._waitForSystemAndConnectionIfBusy(res, { - sendError: (status, message) => this._sendClaudeErrorResponse(res, status, "overloaded_error", message), - }); - if (!ready) return; - } - - if (this.browserManager) { - this.browserManager.notifyUserActivity(); - } - - const isClaudeStream = req.body.stream === true; - const systemStreamMode = this.serverSystem.streamingMode; - - // Handle usage counting - const usageCount = this.authSwitcher.incrementUsageCount(); - if (usageCount > 0) { - const rotationCountText = - this.config.switchOnUses > 0 ? `${usageCount}/${this.config.switchOnUses}` : `${usageCount}`; - this.logger.info( - `[Request] Claude generation request - account rotation count: ${rotationCountText} (Current account: ${this.currentAuthIndex})` - ); - if (this.authSwitcher.shouldSwitchByUsage()) { - this.needsSwitchingAfterRequest = true; - } - } - - // Translate Claude format to Google format - let googleBody, model, modelStreamingMode; try { - const result = await this.formatConverter.translateClaudeToGoogle(req.body); - googleBody = result.googleRequest; - model = result.cleanModelName; - modelStreamingMode = result.modelStreamingMode || null; - } catch (error) { - this.logger.error(`[Adapter] Claude request translation failed: ${error.message}`); - return this._sendClaudeErrorResponse(res, 400, "invalid_request_error", "Invalid Claude request format."); - } - - const effectiveStreamMode = modelStreamingMode || systemStreamMode; - const useRealStream = isClaudeStream && effectiveStreamMode === "real"; - - const googleEndpoint = useRealStream ? "streamGenerateContent" : "generateContent"; - const proxyRequest = { - body: JSON.stringify(googleBody), - headers: { "Content-Type": "application/json" }, - is_generative: true, - method: "POST", - path: `/v1beta/models/${model}:${googleEndpoint}`, - query_params: useRealStream ? { alt: "sse" } : {}, - request_id: requestId, - streaming_mode: useRealStream ? "real" : "fake", - }; - this._initializeProxyRequestAttempt(proxyRequest); - res.__proxyResponseStreamMode = isClaudeStream ? (useRealStream ? "real" : "fake") : null; + // Check current account's browser connection + if (!this.connectionRegistry.getConnectionByAuth(this.currentAuthIndex)) { + this.logger.warn(`[Request] No WebSocket connection for current account #${this.currentAuthIndex}`); + const recovered = await this._handleBrowserRecovery(res); + if (!recovered) return; + } + + // Wait for system to become ready if it's busy + { + const ready = await this._waitForSystemAndConnectionIfBusy(res, { + sendError: (status, message) => + this._sendClaudeErrorResponse(res, status, "overloaded_error", message), + }); + if (!ready) return; + } - try { - // Create message queue inside try-catch to handle invalid authIndex - const messageQueue = this.connectionRegistry.createMessageQueue( - requestId, - this.currentAuthIndex, - proxyRequest.request_attempt_id - ); - this._setupClientDisconnectHandler(res, requestId); + if (this.browserManager) { + this.browserManager.notifyUserActivity(); + } - if (useRealStream) { - let currentQueue = messageQueue; - let initialMessage; - let skipFinalFailureSwitch = false; - const immediateSwitchTracker = this._createImmediateSwitchTracker(); + const isClaudeStream = req.body.stream === true; + const systemStreamMode = this.serverSystem.streamingMode; - // eslint-disable-next-line no-constant-condition - while (true) { - this._forwardRequest(proxyRequest); - initialMessage = await currentQueue.dequeue(); - - const initialStatus = Number(initialMessage?.status); - if ( - initialMessage.event_type === "error" && - !isUserAbortedError(initialMessage) && - Number.isFinite(initialStatus) && - this.config?.immediateSwitchStatusCodes?.includes(initialStatus) - ) { - this.logger.warn( - `[Request] Claude real stream received ${initialStatus}, switching account and retrying...` - ); - const switched = await this._performImmediateSwitchRetry( - initialMessage, - requestId, - immediateSwitchTracker - ); - if (!switched) { - skipFinalFailureSwitch = true; - break; - } + // Handle usage counting + const usageCount = this.authSwitcher.incrementUsageCount(); + if (usageCount > 0) { + const rotationCountText = + this.config.switchOnUses > 0 ? `${usageCount}/${this.config.switchOnUses}` : `${usageCount}`; + this.logger.info( + `[Request] Claude generation request - account rotation count: ${rotationCountText} (Current account: ${this.currentAuthIndex})` + ); + if (this.authSwitcher.shouldSwitchByUsage()) { + this.needsSwitchingAfterRequest = true; + } + } - try { - currentQueue.close("retry_after_429"); - } catch { - /* empty */ - } - this._advanceProxyRequestAttempt(proxyRequest); - currentQueue = this.connectionRegistry.createMessageQueue( - requestId, - this.currentAuthIndex, - proxyRequest.request_attempt_id - ); - continue; - } + // Translate Claude format to Google format + let googleBody, model, modelStreamingMode; + try { + const result = await this.formatConverter.translateClaudeToGoogle(req.body); + googleBody = result.googleRequest; + model = result.cleanModelName; + modelStreamingMode = result.modelStreamingMode || null; + } catch (error) { + this.logger.error(`[Adapter] Claude request translation failed: ${error.message}`); + return this._sendClaudeErrorResponse( + res, + 400, + "invalid_request_error", + "Invalid Claude request format." + ); + } - break; - } + const effectiveStreamMode = modelStreamingMode || systemStreamMode; + const useRealStream = isClaudeStream && effectiveStreamMode === "real"; - if (initialMessage.event_type === "error") { - this._logFinalRequestFailure(initialMessage, "Claude real stream"); - this._sendClaudeErrorResponse( - res, - initialMessage.status || 500, - "api_error", - initialMessage.message - ); - if (!skipFinalFailureSwitch && !this._isConnectionResetError(initialMessage)) { - await this.authSwitcher.handleRequestFailureAndSwitch(initialMessage, null); - } else if (skipFinalFailureSwitch) { - this.logger.info( - "[Request] Immediate-switch retries exhausted, skipping additional account switch." - ); - } - return; - } + const googleEndpoint = useRealStream ? "streamGenerateContent" : "generateContent"; + const proxyRequest = { + body: JSON.stringify(googleBody), + headers: { "Content-Type": "application/json" }, + is_generative: true, + method: "POST", + path: `/v1beta/models/${model}:${googleEndpoint}`, + query_params: useRealStream ? { alt: "sse" } : {}, + request_id: requestId, + streaming_mode: useRealStream ? "real" : "fake", + }; + this._initializeProxyRequestAttempt(proxyRequest); + res.__proxyResponseStreamMode = isClaudeStream ? (useRealStream ? "real" : "fake") : null; + this._updateTrackedRequest(requestId, { + isStreaming: isClaudeStream, + model, + path: proxyRequest.path, + requestCategory: "generation", + ...(isClaudeStream ? { streamMode: useRealStream ? "real" : "fake" } : {}), + }); - if (this.authSwitcher.failureCount > 0) { - this.logger.debug(`✅ [Auth] Claude request successful - failure count reset to 0`); - this.authSwitcher.failureCount = 0; - } + try { + // Create message queue inside try-catch to handle invalid authIndex + const messageQueue = this.connectionRegistry.createMessageQueue( + requestId, + this.currentAuthIndex, + proxyRequest.request_attempt_id + ); + this._setupClientDisconnectHandler(res, requestId); - res.status(200).set({ - "Cache-Control": "no-cache", - Connection: "keep-alive", - "Content-Type": "text/event-stream", - }); - this.logger.info(`[Request] Claude streaming response (Real Mode) started...`); - await this._streamClaudeResponse(currentQueue, res, model); - } else { - // Claude Fake Stream / Non-Stream mode - let connectionMaintainer; - if (isClaudeStream) { - const scheduleNextKeepAlive = () => { - const randomInterval = 12000 + Math.floor(Math.random() * 6000); - connectionMaintainer = setTimeout(() => { - if (!res.headersSent) { - res.status(200).set({ - "Cache-Control": "no-cache", - Connection: "keep-alive", - "Content-Type": "text/event-stream", - }); - } - if (!res.writableEnded) { - res.write("event: ping\ndata: {}\n\n"); - scheduleNextKeepAlive(); - } - }, randomInterval); - }; - scheduleNextKeepAlive(); - } + if (useRealStream) { + let currentQueue = messageQueue; + let initialMessage; + let skipFinalFailureSwitch = false; + const immediateSwitchTracker = this._createImmediateSwitchTracker(); - try { - const result = await this._executeRequestWithRetries(proxyRequest, messageQueue); + // eslint-disable-next-line no-constant-condition + while (true) { + this._getUsageStatsService()?.recordAttempt( + proxyRequest.request_id, + this.currentAuthIndex, + this._getAccountNameForIndex(this.currentAuthIndex) + ); + this._forwardRequest(proxyRequest); + initialMessage = await currentQueue.dequeue(); + + const initialStatus = Number(initialMessage?.status); + if ( + initialMessage.event_type === "error" && + !isUserAbortedError(initialMessage) && + Number.isFinite(initialStatus) && + this.config?.immediateSwitchStatusCodes?.includes(initialStatus) + ) { + this.logger.warn( + `[Request] Claude real stream received ${initialStatus}, switching account and retrying...` + ); + const switched = await this._performImmediateSwitchRetry( + initialMessage, + requestId, + immediateSwitchTracker + ); + if (!switched) { + skipFinalFailureSwitch = true; + break; + } - if (!result.success) { - this._logFinalRequestFailure(result.error, "Claude fake/non-stream"); - if (connectionMaintainer) clearTimeout(connectionMaintainer); - if (isClaudeStream && res.headersSent) { - // If keep-alives already started the SSE response, send an SSE error event instead of JSON. - this._handleClaudeRequestError(result.error, res); - } else { - this._sendClaudeErrorResponse( - res, - result.error.status || 500, - "api_error", - result.error.message + try { + currentQueue.close("retry_after_429"); + } catch { + /* empty */ + } + this._advanceProxyRequestAttempt(proxyRequest); + currentQueue = this.connectionRegistry.createMessageQueue( + requestId, + this.currentAuthIndex, + proxyRequest.request_attempt_id ); + continue; } - if (!result.error.skipAccountSwitch && !this._isConnectionResetError(result.error)) { - await this.authSwitcher.handleRequestFailureAndSwitch(result.error, null); - } else if (result.error.skipAccountSwitch) { + + break; + } + + if (initialMessage.event_type === "error") { + this._logFinalRequestFailure(initialMessage, "Claude real stream"); + this._sendClaudeErrorResponse( + res, + initialMessage.status || 500, + "api_error", + initialMessage.message + ); + if (!skipFinalFailureSwitch && !this._isConnectionResetError(initialMessage)) { + await this.authSwitcher.handleRequestFailureAndSwitch(initialMessage, null); + } else if (skipFinalFailureSwitch) { this.logger.info( "[Request] Immediate-switch retries exhausted, skipping additional account switch." ); @@ -1680,237 +1839,330 @@ class RequestHandler { this.authSwitcher.failureCount = 0; } - // Use the queue that successfully received the initial message - const activeQueue = result.queue; - + res.status(200).set({ + "Cache-Control": "no-cache", + Connection: "keep-alive", + "Content-Type": "text/event-stream", + }); + this.logger.info(`[Request] Claude streaming response (Real Mode) started...`); + await this._streamClaudeResponse(currentQueue, res, model); + } else { + // Claude Fake Stream / Non-Stream mode + let connectionMaintainer; if (isClaudeStream) { - // Fake stream - if (!res.headersSent) { - res.status(200).set({ - "Cache-Control": "no-cache", - Connection: "keep-alive", - "Content-Type": "text/event-stream", - }); + const scheduleNextKeepAlive = () => { + const randomInterval = 12000 + Math.floor(Math.random() * 6000); + connectionMaintainer = setTimeout(() => { + if (!res.headersSent) { + res.status(200).set({ + "Cache-Control": "no-cache", + Connection: "keep-alive", + "Content-Type": "text/event-stream", + }); + } + if (!res.writableEnded) { + res.write("event: ping\ndata: {}\n\n"); + scheduleNextKeepAlive(); + } + }, randomInterval); + }; + scheduleNextKeepAlive(); + } + + try { + const result = await this._executeRequestWithRetries(proxyRequest, messageQueue); + + if (!result.success) { + this._logFinalRequestFailure(result.error, "Claude fake/non-stream"); + if (connectionMaintainer) clearTimeout(connectionMaintainer); + if (isClaudeStream && res.headersSent) { + // If keep-alives already started the SSE response, send an SSE error event instead of JSON. + this._handleClaudeRequestError(result.error, res); + } else { + this._sendClaudeErrorResponse( + res, + result.error.status || 500, + "api_error", + result.error.message + ); + } + if (!result.error.skipAccountSwitch && !this._isConnectionResetError(result.error)) { + await this.authSwitcher.handleRequestFailureAndSwitch(result.error, null); + } else if (result.error.skipAccountSwitch) { + this.logger.info( + "[Request] Immediate-switch retries exhausted, skipping additional account switch." + ); + } + return; } - if (connectionMaintainer) clearTimeout(connectionMaintainer); - this.logger.info(`[Request] Claude streaming response (Fake Mode) started...`); - let fullBody = ""; - let hadStreamError = false; - try { - // eslint-disable-next-line no-constant-condition - while (true) { - const message = await activeQueue.dequeue(this.timeouts.FAKE_STREAM); - if (message.type === "STREAM_END") { - break; - } + if (this.authSwitcher.failureCount > 0) { + this.logger.debug(`✅ [Auth] Claude request successful - failure count reset to 0`); + this.authSwitcher.failureCount = 0; + } - if (message.event_type === "error") { - this.logger.error( - `[Request] Error received during Claude fake stream: ${message.message}` - ); - hadStreamError = true; - // Check if response is still writable before attempting to write - if (this._isResponseWritable(res)) { - try { - res.write( - `event: error\ndata: ${JSON.stringify({ - error: { - message: message.message, - type: "api_error", - }, - type: "error", - })}\n\n` - ); - } catch (writeError) { - this.logger.debug( - `[Request] Failed to write error to Claude fake stream: ${writeError.message}` - ); + // Use the queue that successfully received the initial message + const activeQueue = result.queue; + + if (isClaudeStream) { + // Fake stream + if (!res.headersSent) { + res.status(200).set({ + "Cache-Control": "no-cache", + Connection: "keep-alive", + "Content-Type": "text/event-stream", + }); + } + if (connectionMaintainer) clearTimeout(connectionMaintainer); + + this.logger.info(`[Request] Claude streaming response (Fake Mode) started...`); + let fullBody = ""; + let hadStreamError = false; + try { + // eslint-disable-next-line no-constant-condition + while (true) { + const message = await activeQueue.dequeue(this.timeouts.FAKE_STREAM); + if (message.type === "STREAM_END") { + break; + } + + if (message.event_type === "error") { + this.logger.error( + `[Request] Error received during Claude fake stream: ${message.message}` + ); + this._markTrackedResponseError(res, message.message, 500); + hadStreamError = true; + // Check if response is still writable before attempting to write + if (this._isResponseWritable(res)) { + try { + res.write( + `event: error\ndata: ${JSON.stringify({ + error: { + message: message.message, + type: "api_error", + }, + type: "error", + })}\n\n` + ); + } catch (writeError) { + this.logger.debug( + `[Request] Failed to write error to Claude fake stream: ${writeError.message}` + ); + } } + break; } - break; - } - if (message.data) fullBody += message.data; - } - if (hadStreamError) { - // Backend errored; don't attempt to translate/send a "normal" stream afterwards. - return; - } - const streamState = {}; - const translatedChunk = this.formatConverter.translateGoogleToClaudeStream( - fullBody, - model, - streamState - ); - if (this._isResponseWritable(res)) { - try { - if (translatedChunk) { - res.write(translatedChunk); + if (message.data) fullBody += message.data; + } + if (hadStreamError) { + // Backend errored; don't attempt to translate/send a "normal" stream afterwards. + return; + } + const streamState = {}; + const translatedChunk = this.formatConverter.translateGoogleToClaudeStream( + fullBody, + model, + streamState + ); + if (this._isResponseWritable(res)) { + try { + if (translatedChunk) { + res.write(translatedChunk); + } + } catch (writeError) { + this.logger.debug( + `[Request] Failed to write final fake Claude stream chunk: ${writeError.message}` + ); } - } catch (writeError) { + } else { this.logger.debug( - `[Request] Failed to write final fake Claude stream chunk: ${writeError.message}` + "[Request] Response no longer writable before final fake Claude stream chunk." ); } - } else { - this.logger.debug( - "[Request] Response no longer writable before final fake Claude stream chunk." - ); + this.logger.info("[Request] Claude fake mode: Complete content sent at once."); + } catch (error) { + // Classify error type and send appropriate response + this._handleFakeStreamError(error, res, "claude"); } - this.logger.info("[Request] Claude fake mode: Complete content sent at once."); - } catch (error) { - // Classify error type and send appropriate response - this._handleFakeStreamError(error, res, "claude"); + } else { + // Non-stream + await this._sendClaudeNonStreamResponse(activeQueue, res, model); } - } else { - // Non-stream - await this._sendClaudeNonStreamResponse(activeQueue, res, model); + } finally { + if (connectionMaintainer) clearTimeout(connectionMaintainer); } - } finally { - if (connectionMaintainer) clearTimeout(connectionMaintainer); } - } - } catch (error) { - // Handle queue timeout by notifying browser - this._handleQueueTimeout(error, requestId); + } catch (error) { + // Handle queue timeout by notifying browser + this._handleQueueTimeout(error, requestId); - this._handleClaudeRequestError(error, res); - } finally { - this.connectionRegistry.removeMessageQueue(requestId, "request_complete"); - if (this.needsSwitchingAfterRequest) { - this.logger.info( - `[Auth] Rotation count reached switching threshold (${this.authSwitcher.usageCount}/${this.config.switchOnUses}), will automatically switch account in background...` - ); - this.authSwitcher.switchToNextAuth().catch(err => { - this.logger.error(`[Auth] Background account switching task failed: ${err.message}`); - }); - this.needsSwitchingAfterRequest = false; + this._handleClaudeRequestError(error, res); + } finally { + this.connectionRegistry.removeMessageQueue(requestId, "request_complete"); + if (this.needsSwitchingAfterRequest) { + this.logger.info( + `[Auth] Rotation count reached switching threshold (${this.authSwitcher.usageCount}/${this.config.switchOnUses}), will automatically switch account in background...` + ); + this.authSwitcher.switchToNextAuth().catch(err => { + this.logger.error(`[Auth] Background account switching task failed: ${err.message}`); + }); + this.needsSwitchingAfterRequest = false; + } + if (!res.writableEnded) res.end(); } - if (!res.writableEnded) res.end(); + } finally { + this._finalizeTrackedRequest(requestId, res); } } // Process Claude count tokens request async processClaudeCountTokens(req, res) { const requestId = this._generateRequestId(); + this._startTrackedRequest(requestId, req, { + apiFormat: "claude", + isStreaming: false, + requestCategory: "count_tokens", + streamMode: null, + }); - // Check current account's browser connection - if (!this.connectionRegistry.getConnectionByAuth(this.currentAuthIndex)) { - this.logger.warn(`[Request] No WebSocket connection for current account #${this.currentAuthIndex}`); - const recovered = await this._handleBrowserRecovery(res); - if (!recovered) return; - } - - // Wait for system to become ready if it's busy - { - const ready = await this._waitForSystemAndConnectionIfBusy(res, { - sendError: (status, message) => this._sendClaudeErrorResponse(res, status, "overloaded_error", message), - }); - if (!ready) return; - } + try { + // Check current account's browser connection + if (!this.connectionRegistry.getConnectionByAuth(this.currentAuthIndex)) { + this.logger.warn(`[Request] No WebSocket connection for current account #${this.currentAuthIndex}`); + const recovered = await this._handleBrowserRecovery(res); + if (!recovered) return; + } + + // Wait for system to become ready if it's busy + { + const ready = await this._waitForSystemAndConnectionIfBusy(res, { + sendError: (status, message) => + this._sendClaudeErrorResponse(res, status, "overloaded_error", message), + }); + if (!ready) return; + } - if (this.browserManager) { - this.browserManager.notifyUserActivity(); - } + if (this.browserManager) { + this.browserManager.notifyUserActivity(); + } - // Translate Claude format to Google format - let googleBody, model; - try { - const result = await this.formatConverter.translateClaudeToGoogle(req.body); - googleBody = result.googleRequest; - model = result.cleanModelName; - } catch (error) { - this.logger.error(`[Adapter] Claude request translation failed: ${error.message}`); - return this._sendClaudeErrorResponse(res, 400, "invalid_request_error", "Invalid Claude request format."); - } - - // Build countTokens request - // Per Gemini API docs, countTokens accepts: - // - contents[] (simple mode) - // - generateContentRequest (full request with model, contents, tools, systemInstruction, etc.) - const countTokensBody = { - generateContentRequest: { - model: `models/${model}`, - ...googleBody, - }, - }; + // Translate Claude format to Google format + let googleBody, model; + try { + const result = await this.formatConverter.translateClaudeToGoogle(req.body); + googleBody = result.googleRequest; + model = result.cleanModelName; + } catch (error) { + this.logger.error(`[Adapter] Claude request translation failed: ${error.message}`); + return this._sendClaudeErrorResponse( + res, + 400, + "invalid_request_error", + "Invalid Claude request format." + ); + } - const proxyRequest = { - body: JSON.stringify(countTokensBody), - headers: { "Content-Type": "application/json" }, - is_generative: false, - method: "POST", - path: `/v1beta/models/${model}:countTokens`, - query_params: {}, - request_id: requestId, - }; - this._initializeProxyRequestAttempt(proxyRequest); + // Build countTokens request + // Per Gemini API docs, countTokens accepts: + // - contents[] (simple mode) + // - generateContentRequest (full request with model, contents, tools, systemInstruction, etc.) + const countTokensBody = { + generateContentRequest: { + model: `models/${model}`, + ...googleBody, + }, + }; - try { - // Create message queue inside try-catch to handle invalid authIndex - const messageQueue = this.connectionRegistry.createMessageQueue( - requestId, - this.currentAuthIndex, - proxyRequest.request_attempt_id - ); - this._setupClientDisconnectHandler(res, requestId); + const proxyRequest = { + body: JSON.stringify(countTokensBody), + headers: { "Content-Type": "application/json" }, + is_generative: false, + method: "POST", + path: `/v1beta/models/${model}:countTokens`, + query_params: {}, + request_id: requestId, + }; + this._initializeProxyRequestAttempt(proxyRequest); + this._updateTrackedRequest(requestId, { + model, + path: proxyRequest.path, + requestCategory: "count_tokens", + }); - this._forwardRequest(proxyRequest); - const response = await messageQueue.dequeue(); + try { + // Create message queue inside try-catch to handle invalid authIndex + const messageQueue = this.connectionRegistry.createMessageQueue( + requestId, + this.currentAuthIndex, + proxyRequest.request_attempt_id + ); + this._setupClientDisconnectHandler(res, requestId); - if (response.event_type === "error") { - this.logger.error( - `[Request] Received error from browser, will trigger switching logic. Status code: ${response.status}, message: ${response.message}` + this._getUsageStatsService()?.recordAttempt( + requestId, + this.currentAuthIndex, + this._getAccountNameForIndex(this.currentAuthIndex) ); - this._sendClaudeErrorResponse(res, response.status || 500, "api_error", response.message); - if (!this._isConnectionResetError(response)) { - await this.authSwitcher.handleRequestFailureAndSwitch(response, null); - } - return; - } + this._forwardRequest(proxyRequest); + const response = await messageQueue.dequeue(); - // For non-streaming requests, consume all chunks until STREAM_END - let fullBody = ""; - if (response.type !== "STREAM_END") { - if (response.data) fullBody += response.data; - // eslint-disable-next-line no-constant-condition - while (true) { - const message = await messageQueue.dequeue(); - if (message.type === "STREAM_END") { - break; + if (response.event_type === "error") { + this.logger.error( + `[Request] Received error from browser, will trigger switching logic. Status code: ${response.status}, message: ${response.message}` + ); + this._sendClaudeErrorResponse(res, response.status || 500, "api_error", response.message); + if (!this._isConnectionResetError(response)) { + await this.authSwitcher.handleRequestFailureAndSwitch(response, null); } - if (message.event_type === "error") { - this.logger.error(`[Request] Error received during count tokens: ${message.message}`); - return this._sendClaudeErrorResponse(res, 500, "api_error", message.message); + return; + } + + // For non-streaming requests, consume all chunks until STREAM_END + let fullBody = ""; + if (response.type !== "STREAM_END") { + if (response.data) fullBody += response.data; + // eslint-disable-next-line no-constant-condition + while (true) { + const message = await messageQueue.dequeue(); + if (message.type === "STREAM_END") { + break; + } + if (message.event_type === "error") { + this.logger.error(`[Request] Error received during count tokens: ${message.message}`); + this._markTrackedResponseError(res, message.message, 500); + return this._sendClaudeErrorResponse(res, 500, "api_error", message.message); + } + if (message.data) fullBody += message.data; } - if (message.data) fullBody += message.data; } - } - // Parse Gemini response - const geminiResponse = JSON.parse(fullBody || response.body); - const totalTokens = geminiResponse.totalTokens || 0; + // Parse Gemini response + const geminiResponse = JSON.parse(fullBody || response.body); + const totalTokens = geminiResponse.totalTokens || 0; - // Reset failure count on success - if (this.authSwitcher.failureCount > 0) { - this.logger.debug( - `✅ [Auth] Count tokens request successful - failure count reset from ${this.authSwitcher.failureCount} to 0` - ); - this.authSwitcher.failureCount = 0; - } + // Reset failure count on success + if (this.authSwitcher.failureCount > 0) { + this.logger.debug( + `✅ [Auth] Count tokens request successful - failure count reset from ${this.authSwitcher.failureCount} to 0` + ); + this.authSwitcher.failureCount = 0; + } - // Return Claude-compatible response - res.status(200).json({ - input_tokens: totalTokens, - }); + // Return Claude-compatible response + res.status(200).json({ + input_tokens: totalTokens, + }); - this.logger.info(`[Request] Claude count tokens completed: ${totalTokens} input tokens`); - } catch (error) { - this._handleClaudeRequestError(error, res); + this.logger.info(`[Request] Claude count tokens completed: ${totalTokens} input tokens`); + } catch (error) { + this._handleClaudeRequestError(error, res); + } finally { + this.connectionRegistry.removeMessageQueue(requestId, "request_complete"); + if (!res.writableEnded) res.end(); + } } finally { - this.connectionRegistry.removeMessageQueue(requestId, "request_complete"); - if (!res.writableEnded) res.end(); + this._finalizeTrackedRequest(requestId, res); } } @@ -1918,134 +2170,155 @@ class RequestHandler { // Mirrors OpenAI's /v1/responses/input_tokens by returning only the request-side token count. async processOpenAIResponseInputTokens(req, res) { const requestId = this._generateRequestId(); + this._startTrackedRequest(requestId, req, { + apiFormat: "response_api", + isStreaming: false, + requestCategory: "count_tokens", + streamMode: null, + }); - // Check current account's browser connection - if (!this.connectionRegistry.getConnectionByAuth(this.currentAuthIndex)) { - this.logger.warn(`[Request] No WebSocket connection for current account #${this.currentAuthIndex}`); - const recovered = await this._handleBrowserRecovery(res); - if (!recovered) return; - } - - // Wait for system to become ready if it's busy - { - const ready = await this._waitForSystemAndConnectionIfBusy(res); - if (!ready) return; - } + try { + // Check current account's browser connection + if (!this.connectionRegistry.getConnectionByAuth(this.currentAuthIndex)) { + this.logger.warn(`[Request] No WebSocket connection for current account #${this.currentAuthIndex}`); + const recovered = await this._handleBrowserRecovery(res); + if (!recovered) return; + } - if (this.browserManager) { - this.browserManager.notifyUserActivity(); - } + // Wait for system to become ready if it's busy + { + const ready = await this._waitForSystemAndConnectionIfBusy(res); + if (!ready) return; + } - // Translate OpenAI Response format to Google format (so we can use Gemini countTokens) - let googleBody, model; - try { - const result = await this.formatConverter.translateOpenAIResponseToGoogle(req.body); - googleBody = result.googleRequest; - model = result.cleanModelName; - } catch (error) { - this.logger.error(`[Adapter] OpenAI Response input_tokens translation failed: ${error.message}`); - return this._sendErrorResponse(res, 400, "Invalid OpenAI Response request format."); - } - - // Gemini countTokens accepts either: - // - contents[] - // - generateContentRequest (full request; required here because tools/systemInstruction/etc may be present) - const countTokensBody = { - generateContentRequest: { - model: `models/${model}`, - ...googleBody, - }, - }; + if (this.browserManager) { + this.browserManager.notifyUserActivity(); + } - const proxyRequest = { - body: JSON.stringify(countTokensBody), - headers: { "Content-Type": "application/json" }, - is_generative: false, - method: "POST", - path: `/v1beta/models/${model}:countTokens`, - query_params: {}, - request_id: requestId, - }; - this._initializeProxyRequestAttempt(proxyRequest); + // Translate OpenAI Response format to Google format (so we can use Gemini countTokens) + let googleBody, model; + try { + const result = await this.formatConverter.translateOpenAIResponseToGoogle(req.body); + googleBody = result.googleRequest; + model = result.cleanModelName; + } catch (error) { + this.logger.error(`[Adapter] OpenAI Response input_tokens translation failed: ${error.message}`); + return this._sendErrorResponse(res, 400, "Invalid OpenAI Response request format."); + } - try { - const messageQueue = this.connectionRegistry.createMessageQueue( - requestId, - this.currentAuthIndex, - proxyRequest.request_attempt_id - ); - this._setupClientDisconnectHandler(res, requestId); + // Gemini countTokens accepts either: + // - contents[] + // - generateContentRequest (full request; required here because tools/systemInstruction/etc may be present) + const countTokensBody = { + generateContentRequest: { + model: `models/${model}`, + ...googleBody, + }, + }; - this._forwardRequest(proxyRequest); - const response = await messageQueue.dequeue(); + const proxyRequest = { + body: JSON.stringify(countTokensBody), + headers: { "Content-Type": "application/json" }, + is_generative: false, + method: "POST", + path: `/v1beta/models/${model}:countTokens`, + query_params: {}, + request_id: requestId, + }; + this._initializeProxyRequestAttempt(proxyRequest); + this._updateTrackedRequest(requestId, { + model, + path: proxyRequest.path, + requestCategory: "count_tokens", + }); - if (response.event_type === "error") { - this.logger.error( - `[Request] Received error from browser for input_tokens, will trigger switching logic. Status code: ${response.status}, message: ${response.message}` + try { + const messageQueue = this.connectionRegistry.createMessageQueue( + requestId, + this.currentAuthIndex, + proxyRequest.request_attempt_id ); + this._setupClientDisconnectHandler(res, requestId); - this._sendErrorResponse(res, response.status || 500, response.message); + this._getUsageStatsService()?.recordAttempt( + requestId, + this.currentAuthIndex, + this._getAccountNameForIndex(this.currentAuthIndex) + ); + this._forwardRequest(proxyRequest); + const response = await messageQueue.dequeue(); - // Avoid switching account if the error is just a connection reset - if (!this._isConnectionResetError(response)) { - await this.authSwitcher.handleRequestFailureAndSwitch(response, null); - } else { - this.logger.info( - "[Request] Failure due to connection reset (input_tokens), skipping account switch." + if (response.event_type === "error") { + this.logger.error( + `[Request] Received error from browser for input_tokens, will trigger switching logic. Status code: ${response.status}, message: ${response.message}` ); - } - return; - } - // For non-streaming requests, consume all chunks until STREAM_END - let fullBody = ""; - if (response.type !== "STREAM_END") { - if (response.data) fullBody += response.data; - // eslint-disable-next-line no-constant-condition - while (true) { - const message = await messageQueue.dequeue(); - if (message.type === "STREAM_END") { - break; + this._sendErrorResponse(res, response.status || 500, response.message); + + // Avoid switching account if the error is just a connection reset + if (!this._isConnectionResetError(response)) { + await this.authSwitcher.handleRequestFailureAndSwitch(response, null); + } else { + this.logger.info( + "[Request] Failure due to connection reset (input_tokens), skipping account switch." + ); } - if (message.event_type === "error") { - this.logger.error(`[Request] Error received during input_tokens count: ${message.message}`); - this._sendErrorResponse(res, 500, message.message); - return; + return; + } + + // For non-streaming requests, consume all chunks until STREAM_END + let fullBody = ""; + if (response.type !== "STREAM_END") { + if (response.data) fullBody += response.data; + // eslint-disable-next-line no-constant-condition + while (true) { + const message = await messageQueue.dequeue(); + if (message.type === "STREAM_END") { + break; + } + if (message.event_type === "error") { + this.logger.error(`[Request] Error received during input_tokens count: ${message.message}`); + this._markTrackedResponseError(res, message.message, 500); + this._sendErrorResponse(res, 500, message.message); + return; + } + if (message.data) fullBody += message.data; } - if (message.data) fullBody += message.data; } - } - // Parse Gemini response - let geminiResponse; - try { - geminiResponse = JSON.parse(fullBody || response.body); - } catch (parseError) { - this.logger.error(`[Request] Failed to parse countTokens response: ${parseError.message}`); - this._sendErrorResponse(res, 500, "Failed to parse backend response"); - return; - } + // Parse Gemini response + let geminiResponse; + try { + geminiResponse = JSON.parse(fullBody || response.body); + } catch (parseError) { + this.logger.error(`[Request] Failed to parse countTokens response: ${parseError.message}`); + this._sendErrorResponse(res, 500, "Failed to parse backend response"); + return; + } - const totalTokens = geminiResponse.totalTokens || 0; + const totalTokens = geminiResponse.totalTokens || 0; - // Reset failure count on success - if (this.authSwitcher.failureCount > 0) { - this.logger.debug( - `✅ [Auth] input_tokens request successful - failure count reset from ${this.authSwitcher.failureCount} to 0` - ); - this.authSwitcher.failureCount = 0; - } + // Reset failure count on success + if (this.authSwitcher.failureCount > 0) { + this.logger.debug( + `✅ [Auth] input_tokens request successful - failure count reset from ${this.authSwitcher.failureCount} to 0` + ); + this.authSwitcher.failureCount = 0; + } - res.status(200).json({ - input_tokens: totalTokens, - }); + res.status(200).json({ + input_tokens: totalTokens, + }); - this.logger.info(`[Request] OpenAI Response input_tokens completed: ${totalTokens} input tokens`); - } catch (error) { - this._handleRequestError(error, res); + this.logger.info(`[Request] OpenAI Response input_tokens completed: ${totalTokens} input tokens`); + } catch (error) { + this._handleRequestError(error, res); + } finally { + this.connectionRegistry.removeMessageQueue(requestId, "request_complete"); + if (!res.writableEnded) res.end(); + } } finally { - this.connectionRegistry.removeMessageQueue(requestId, "request_complete"); - if (!res.writableEnded) res.end(); + this._finalizeTrackedRequest(requestId, res); } } @@ -2066,6 +2339,7 @@ class RequestHandler { if (message.event_type === "error") { this.logger.error(`[Request] Error received during Claude stream: ${message.message}`); + this._markTrackedResponseError(res, message.message, 500); // Attempt to send error event to client if headers allowed, then close // Check if response is still writable before attempting to write if (this._isResponseWritable(res)) { @@ -2162,6 +2436,7 @@ class RequestHandler { _sendClaudeErrorResponse(res, status, errorType, message) { if (!res.headersSent) { + this._markTrackedResponseError(res, message, status || 500); res.status(status) .type("application/json") .send( @@ -2184,6 +2459,7 @@ class RequestHandler { if (this._isConnectionResetError(error)) { const isClientDisconnect = error.reason === "client_disconnect" || !this._isResponseWritable(res); if (isClientDisconnect) { + this._markTrackedClientAbort(res, errorMsg); this.logger.info(`[Request] Request terminated: Queue closed (${error.reason || "connection_lost"})`); if (!res.writableEnded) { try { @@ -2207,16 +2483,21 @@ class RequestHandler { try { let errorType = "api_error"; let errorMessage = `Processing failed: ${errorMsg}`; + let errorStatus = 500; // Use precise error type checking instead of string matching if (error instanceof QueueTimeoutError || error.code === "QUEUE_TIMEOUT") { errorType = "timeout_error"; errorMessage = `Stream timeout: ${errorMsg}`; + errorStatus = 504; } else if (this._isConnectionResetError(error)) { errorType = "overloaded_error"; errorMessage = `Service unavailable: ${errorMsg}`; + errorStatus = 503; } + this._markTrackedResponseError(res, errorMessage, errorStatus); + res.write( `event: error\ndata: ${JSON.stringify({ error: { @@ -2343,6 +2624,7 @@ class RequestHandler { if (message.event_type === "error") { this.logger.error(`[Request] Error received during Gemini pseudo-stream: ${message.message}`); + this._markTrackedResponseError(res, message.message, 500); hadStreamError = true; this._handleRequestError({ message: message.message }, res, "gemini"); break; @@ -2544,6 +2826,12 @@ class RequestHandler { // eslint-disable-next-line no-constant-condition while (true) { + // Record attempt before forwarding, so failed attempts are also counted + this._getUsageStatsService()?.recordAttempt( + proxyRequest.request_id, + this.currentAuthIndex, + this._getAccountNameForIndex(this.currentAuthIndex) + ); this._forwardRequest(proxyRequest); headerMessage = await currentQueue.dequeue(); @@ -2636,6 +2924,7 @@ class RequestHandler { if (dataMessage.event_type === "error") { this.logger.error(`[Request] Error received during Gemini real stream: ${dataMessage.message}`); + this._markTrackedResponseError(res, dataMessage.message, 500); // Check if response is still writable before attempting to write if (this._isResponseWritable(res)) { try { @@ -2755,6 +3044,7 @@ class RequestHandler { if (message.event_type === "error") { this.logger.error(`[Request] Error received during Gemini non-stream: ${message.message}`); + this._markTrackedResponseError(res, message.message, 500); this._sendErrorResponse(res, 500, message.message); return; } @@ -2837,6 +3127,13 @@ class RequestHandler { const immediateSwitchTracker = this._createImmediateSwitchTracker(); while (retryAttempt <= this.maxRetries) { + // Record attempt at the start of each retry, before forwarding. + // This ensures failed attempts (e.g. 429 before any response) are also counted. + this._getUsageStatsService()?.recordAttempt( + proxyRequest.request_id, + this.currentAuthIndex, + this._getAccountNameForIndex(this.currentAuthIndex) + ); try { this._forwardRequest(proxyRequest); @@ -3014,6 +3311,7 @@ class RequestHandler { if (message.event_type === "error") { this.logger.error(`[Request] Error received during Response API stream: ${message.message}`); + this._markTrackedResponseError(res, message.message, 500); if (this._isResponseWritable(res)) { try { if (!streamState.sequenceNumber) streamState.sequenceNumber = 0; @@ -3099,6 +3397,7 @@ class RequestHandler { if (message.event_type === "error") { this.logger.error(`[Request] Error received during OpenAI stream: ${message.message}`); + this._markTrackedResponseError(res, message.message, 500); // Attempt to send error event to client if headers allowed, then close // Check if response is still writable before attempting to write if (this._isResponseWritable(res)) { @@ -3279,6 +3578,7 @@ class RequestHandler { if (this._isConnectionResetError(error)) { const isClientDisconnect = error.reason === "client_disconnect" || !this._isResponseWritable(res); if (isClientDisconnect) { + this._markTrackedClientAbort(res, errorMsg); this.logger.info(`[Request] Request terminated: Queue closed (${error.reason || "connection_lost"})`); if (!res.writableEnded) { try { @@ -3317,6 +3617,8 @@ class RequestHandler { errorMessage = `Service unavailable: ${errorMsg}`; } + this._markTrackedResponseError(res, errorMessage, errorCode); + if (format === "response_api") { if (res.__responseApiSeq == null) res.__responseApiSeq = 0; res.__responseApiSeq += 1; @@ -3400,6 +3702,7 @@ class RequestHandler { _sendErrorResponse(res, status, message) { if (!res.headersSent) { + this._markTrackedResponseError(res, message, status || 500); const errorPayload = { error: { code: status || 500, @@ -3427,6 +3730,7 @@ class RequestHandler { res.setHeader("Cache-Control", "no-cache"); res.setHeader("Connection", "keep-alive"); } + this._markTrackedResponseError(res, message, 500); // Check if response is still writable before attempting to write if (this._isResponseWritable(res)) { try { @@ -3440,6 +3744,7 @@ class RequestHandler { _setupClientDisconnectHandler(res, requestId) { res.on("close", () => { if (!res.writableEnded) { + this._markTrackedClientAbort(res); this.logger.warn(`[Request] Client closed request #${requestId} connection prematurely.`); // Dynamically look up the current authIndex from the connection registry @@ -3587,7 +3892,6 @@ class RequestHandler { if (!bodyObj.generationConfig) { bodyObj.generationConfig = {}; } - if ( !bodyObj.generationConfig.thinkingConfig || bodyObj.generationConfig.thinkingConfig.includeThoughts === undefined @@ -3687,8 +3991,6 @@ class RequestHandler { this.logger.debug(`[Proxy] Debug: Final Gemini Request (Google Native) = ${JSON.stringify(bodyObj, null, 2)}`); - const effectiveStreamMode = modelStreamingMode || this.serverSystem.streamingMode; - return { body: req.method !== "GET" ? JSON.stringify(bodyObj) : undefined, headers: req.headers, @@ -3699,7 +4001,7 @@ class RequestHandler { path: cleanPath, query_params: req.query || {}, request_id: requestId, - streaming_mode: effectiveStreamMode, + streaming_mode: modelStreamingMode || this.serverSystem.streamingMode, }; } diff --git a/src/core/UsageStatsService.js b/src/core/UsageStatsService.js new file mode 100644 index 00000000..0915157b --- /dev/null +++ b/src/core/UsageStatsService.js @@ -0,0 +1,410 @@ +/** + * File: src/core/UsageStatsService.js + * Description: In-memory usage statistics and request history service + * + * Author: OpenAI Codex + */ + +const fs = require("fs"); +const path = require("path"); + +const DISPLAY_RECORDS_LIMIT = 1000; + +class UsageStatsService { + constructor(authSource, logger, dataDir) { + this.authSource = authSource; + this.logger = logger; + this.dataDir = dataDir || path.join(process.cwd(), "data"); + this.statsFilePath = path.join(this.dataDir, "usage-stats.jsonl"); + + // Ensure data directory exists + if (!fs.existsSync(this.dataDir)) { + fs.mkdirSync(this.dataDir, { recursive: true }); + } + + // Load persisted state + this._loadFromFile(); + + if (!this.startedAt) { + this.startedAtMs = Date.now(); + this.startedAt = new Date(this.startedAtMs).toISOString(); + } + if (!this.summary) { + this.summary = { + abortedCount: 0, + errorCount: 0, + successCount: 0, + totalDurationMs: 0, + totalRequests: 0, + }; + } + if (!this.activeRequests) { + this.activeRequests = new Map(); + } + if (!this.records) { + this.records = []; + } + if (!this.accountStats) { + this.accountStats = new Map(); + } + if (!this.formatStats) { + this.formatStats = new Map(); + } + if (!this.categoryStats) { + this.categoryStats = new Map(); + } + if (this.sequence === undefined) { + this.sequence = 0; + } + } + + startRequest(requestId, meta = {}) { + if (!requestId) return null; + + const tracker = { + apiFormat: meta.apiFormat || "unknown", + attemptCount: 0, + attempts: [], + clientIp: meta.clientIp || null, + initialAccountName: this._normalizeAccountName(meta.initialAccountName), + initialAuthIndex: this._normalizeAuthIndex(meta.initialAuthIndex), + isStreaming: Boolean(meta.isStreaming), + method: meta.method || "GET", + model: meta.model || null, + path: meta.path || "/", + requestCategory: meta.requestCategory || "request", + requestId, + startedAt: new Date().toISOString(), + startedAtMs: Date.now(), + streamMode: meta.streamMode || null, + }; + + this.activeRequests.set(requestId, tracker); + return tracker; + } + + updateRequest(requestId, patch = {}) { + const tracker = this.activeRequests.get(requestId); + if (!tracker) return; + + if (patch.apiFormat !== undefined) tracker.apiFormat = patch.apiFormat || tracker.apiFormat; + if (patch.clientIp !== undefined) tracker.clientIp = patch.clientIp || null; + if (patch.isStreaming !== undefined) tracker.isStreaming = Boolean(patch.isStreaming); + if (patch.method !== undefined) tracker.method = patch.method || tracker.method; + if (patch.model !== undefined) tracker.model = patch.model || null; + if (patch.path !== undefined) tracker.path = patch.path || tracker.path; + if (patch.requestCategory !== undefined) + tracker.requestCategory = patch.requestCategory || tracker.requestCategory; + if (patch.streamMode !== undefined) tracker.streamMode = patch.streamMode || null; + + if (patch.initialAuthIndex !== undefined) { + tracker.initialAuthIndex = this._normalizeAuthIndex(patch.initialAuthIndex); + } + if (patch.initialAccountName !== undefined) { + tracker.initialAccountName = this._normalizeAccountName(patch.initialAccountName); + } + } + + recordAttempt(requestId, authIndex, accountName = undefined) { + const tracker = this.activeRequests.get(requestId); + if (!tracker) return; + + const normalizedAuthIndex = this._normalizeAuthIndex(authIndex); + if (normalizedAuthIndex === null) return; + + const resolvedAccountName = + accountName !== undefined + ? this._normalizeAccountName(accountName) + : this._resolveAccountName(normalizedAuthIndex); + + this._pushAttempt(tracker, normalizedAuthIndex, resolvedAccountName); + } + + finishRequest(requestId, result = {}) { + const tracker = this.activeRequests.get(requestId); + if (!tracker) return null; + + this.activeRequests.delete(requestId); + + const finishedAtMs = Date.now(); + const lastAttempt = tracker.attempts[tracker.attempts.length - 1] || null; + const lastParsed = lastAttempt?.accountKey ? this._parseAccountKey(lastAttempt.accountKey) : {}; + const finalAuthIndex = + this._normalizeAuthIndex(result.finalAuthIndex) ?? lastParsed.authIndex ?? tracker.initialAuthIndex ?? null; + const finalAccountName = + this._normalizeAccountName(result.finalAccountName) ?? + lastParsed.accountName ?? + tracker.initialAccountName ?? + null; + const outcome = this._normalizeOutcome(result.outcome); + const statusCode = Number.isFinite(result.statusCode) ? Number(result.statusCode) : null; + const durationMs = Math.max(0, finishedAtMs - tracker.startedAtMs); + const accountKey = this._buildAccountKey(finalAuthIndex, finalAccountName); + + const record = { + accountKey, + apiFormat: tracker.apiFormat, + attemptCount: tracker.attemptCount, + attempts: tracker.attempts.map(item => ({ accountKey: item.accountKey })), + clientIp: tracker.clientIp, + durationMs, + errorMessage: result.errorMessage || null, + finalAccountName, + finalAuthIndex, + finishedAt: new Date(finishedAtMs).toISOString(), + initialAccountName: tracker.initialAccountName, + initialAuthIndex: tracker.initialAuthIndex, + isStreaming: tracker.isStreaming, + method: tracker.method, + model: tracker.model, + outcome, + path: tracker.path, + requestCategory: tracker.requestCategory, + requestId: tracker.requestId, + sequence: ++this.sequence, + startedAt: tracker.startedAt, + statusCode, + streamMode: tracker.requestCategory === "generation" ? tracker.streamMode || "non" : null, + }; + + this.records.push(record); + this._updateSummary(record); + this._updateBreakdown(this.formatStats, record.apiFormat); + this._updateBreakdown(this.categoryStats, record.requestCategory); + this._updateAccountStats(record); + + // Append record to file (one line per record) + this._appendRecord(record); + + return record; + } + + getSnapshot() { + const totalRequests = this.summary.totalRequests; + const avgDurationMs = totalRequests > 0 ? Math.round(this.summary.totalDurationMs / totalRequests) : 0; + const successRate = + totalRequests > 0 ? Number(((this.summary.successCount / totalRequests) * 100).toFixed(1)) : 0; + + const accounts = Array.from(this.accountStats.values()) + .map(item => ({ + abortedCount: item.abortedCount, + accountKey: item.accountKey, + accountName: item.accountName, + authIndex: item.authIndex, + avgDurationMs: item.totalRequests > 0 ? Math.round(item.totalDurationMs / item.totalRequests) : 0, + errorCount: item.errorCount, + lastPath: item.lastPath, + lastUsedAt: item.lastUsedAt, + modelCounts: Object.entries(item.modelCounts || {}) + .map(([key, count]) => ({ count, key })) + .sort((a, b) => b.count - a.count), + successCount: item.successCount, + successRate: + item.totalRequests > 0 ? Number(((item.successCount / item.totalRequests) * 100).toFixed(1)) : 0, + totalDurationMs: item.totalDurationMs, + totalRequests: item.totalRequests, + })) + .sort((a, b) => { + if (b.totalRequests !== a.totalRequests) return b.totalRequests - a.totalRequests; + return b.lastUsedAt.localeCompare(a.lastUsedAt); + }); + + return { + accounts, + // Return only recent records for display + records: this.records.slice(-DISPLAY_RECORDS_LIMIT).reverse(), + startedAt: this.startedAt, + summary: { + abortedCount: this.summary.abortedCount, + activeRequests: this.activeRequests.size, + avgDurationMs, + errorCount: this.summary.errorCount, + formatBreakdown: this._serializeBreakdown(this.formatStats), + requestCategoryBreakdown: this._serializeBreakdown(this.categoryStats), + successCount: this.summary.successCount, + successRate, + totalRequests, + uniqueAccountPairs: this.accountStats.size, + uptimeSeconds: Math.max(0, Math.floor((Date.now() - this.startedAtMs) / 1000)), + }, + }; + } + + _loadFromFile() { + try { + if (!fs.existsSync(this.statsFilePath)) return; + + const lines = fs.readFileSync(this.statsFilePath, "utf-8").split("\n").filter(Boolean); + this.records = []; + this.sequence = 0; + + for (const line of lines) { + try { + const record = JSON.parse(line); + this.records.push(record); + if (record.sequence > this.sequence) { + this.sequence = record.sequence; + } + } catch { + // Skip malformed lines + } + } + + // Recalculate aggregates from all loaded records + this._recalculateFromRecords(); + + if (this.logger) { + this.logger.info(`[UsageStats] Loaded ${this.records.length} records from ${this.statsFilePath}`); + } + } catch (err) { + if (this.logger) { + this.logger.warn(`[UsageStats] Failed to load stats file: ${err.message}`); + } + } + } + + _appendRecord(record) { + try { + fs.appendFileSync(this.statsFilePath, JSON.stringify(record) + "\n", "utf-8"); + } catch (err) { + if (this.logger) { + this.logger.warn(`[UsageStats] Failed to append record: ${err.message}`); + } + } + } + + _recalculateFromRecords() { + this.startedAtMs = Date.now(); + this.startedAt = new Date(this.startedAtMs).toISOString(); + this.summary = { + abortedCount: 0, + errorCount: 0, + successCount: 0, + totalDurationMs: 0, + totalRequests: 0, + }; + this.accountStats = new Map(); + this.formatStats = new Map(); + this.categoryStats = new Map(); + + for (const record of this.records) { + this._updateSummary(record); + this._updateBreakdown(this.formatStats, record.apiFormat); + this._updateBreakdown(this.categoryStats, record.requestCategory); + this._updateAccountStats(record); + } + } + + _pushAttempt(tracker, authIndex, accountName) { + const normalizedAccountName = this._normalizeAccountName(accountName); + const accountKey = this._buildAccountKey(authIndex, normalizedAccountName); + + tracker.attemptCount += 1; + tracker.attempts.push({ accountKey }); + } + + _updateSummary(record) { + this.summary.totalRequests += 1; + this.summary.totalDurationMs += record.durationMs; + + if (record.outcome === "success") { + this.summary.successCount += 1; + } else if (record.outcome === "aborted") { + this.summary.abortedCount += 1; + } else { + this.summary.errorCount += 1; + } + } + + _updateBreakdown(targetMap, key) { + const currentKey = key || "unknown"; + const existing = targetMap.get(currentKey) || { count: 0, key: currentKey }; + existing.count += 1; + targetMap.set(currentKey, existing); + } + + _updateAccountStats(record) { + const accountKey = record.accountKey || this._buildAccountKey(record.finalAuthIndex, record.finalAccountName); + const existing = this.accountStats.get(accountKey) || { + abortedCount: 0, + accountKey, + accountName: record.finalAccountName, + authIndex: record.finalAuthIndex, + errorCount: 0, + lastPath: null, + lastUsedAt: record.finishedAt, + modelCounts: {}, + successCount: 0, + totalDurationMs: 0, + totalRequests: 0, + }; + + existing.accountName = record.finalAccountName; + existing.authIndex = record.finalAuthIndex; + const modelKey = record.model || "unknown"; + existing.modelCounts[modelKey] = (existing.modelCounts[modelKey] || 0) + 1; + existing.lastPath = record.path || existing.lastPath; + existing.lastUsedAt = record.finishedAt; + existing.totalDurationMs += record.durationMs; + existing.totalRequests += 1; + + if (record.outcome === "success") { + existing.successCount += 1; + } else if (record.outcome === "aborted") { + existing.abortedCount += 1; + } else { + existing.errorCount += 1; + } + + this.accountStats.set(accountKey, existing); + } + + _serializeBreakdown(sourceMap) { + return Array.from(sourceMap.values()).sort((a, b) => b.count - a.count); + } + + _normalizeOutcome(outcome) { + if (outcome === "success" || outcome === "aborted") { + return outcome; + } + return "error"; + } + + _normalizeAuthIndex(value) { + return Number.isInteger(value) && value >= 0 ? value : null; + } + + _normalizeAccountName(value) { + if (typeof value !== "string") return null; + const trimmed = value.trim(); + return trimmed ? trimmed : null; + } + + _resolveAccountName(authIndex) { + if (!this.authSource?.accountNameMap || authIndex === null) return null; + return this._normalizeAccountName(this.authSource.accountNameMap.get(authIndex)); + } + + _buildAccountKey(authIndex, accountName) { + if (authIndex === null) { + return accountName ? `unassigned:${accountName}` : "unassigned"; + } + + return `${authIndex}:${accountName || "N/A"}`; + } + + _parseAccountKey(accountKey) { + if (!accountKey || accountKey === "unassigned") { + return { accountName: null, authIndex: null }; + } + const colonIdx = accountKey.indexOf(":"); + if (colonIdx === -1) { + return { accountName: accountKey, authIndex: null }; + } + const authIndex = Number(accountKey.slice(0, colonIdx)); + const accountName = accountKey.slice(colonIdx + 1); + return { accountName, authIndex: Number.isFinite(authIndex) ? authIndex : null }; + } +} + +module.exports = UsageStatsService; diff --git a/src/routes/StatusRoutes.js b/src/routes/StatusRoutes.js index 1e8353be..4bedac60 100644 --- a/src/routes/StatusRoutes.js +++ b/src/routes/StatusRoutes.js @@ -168,6 +168,30 @@ class StatusRoutes { res.json(this._getStatusData()); }); + app.get("/api/usage-stats", isAuthenticated, (req, res) => { + const snapshot = this.serverSystem.usageStatsService?.getSnapshot(); + res.json( + snapshot || { + accounts: [], + records: [], + startedAt: new Date().toISOString(), + summary: { + abortedCount: 0, + activeRequests: 0, + avgDurationMs: 0, + errorCount: 0, + formatBreakdown: [], + requestCategoryBreakdown: [], + successCount: 0, + successRate: 0, + totalRequests: 0, + uniqueAccountPairs: 0, + uptimeSeconds: 0, + }, + } + ); + }); + app.put("/api/accounts/current", isAuthenticated, async (req, res) => { try { if (this._rejectIfSystemBusy(res)) return; diff --git a/ui/app/pages/StatusPage.vue b/ui/app/pages/StatusPage.vue index a1df70ba..7161406f 100644 --- a/ui/app/pages/StatusPage.vue +++ b/ui/app/pages/StatusPage.vue @@ -78,6 +78,27 @@ + + +
+ + +
+
+
+ + + +
+
+ {{ t("requestRecords") }} + {{ filteredSummary.totalRequests }} +
+
+
+
+ + + + +
+
+ {{ t("successRate") }} + {{ filteredSummary.successRate }}% +
+
+
+
+ + + + +
+
+ {{ t("avgDuration") }} + {{ formatDuration(filteredSummary.avgDurationMs) }} +
+
+
+
+ + + +
+
+ {{ t("activeRequests") }} + {{ filteredSummary.activeRequests }} +
+
+
+
+ + + + + + +
+
+ {{ t("uniqueAccountPairs") }} + {{ filteredSummary.uniqueAccountPairs }} +
+
+
+ +
+
+
+

{{ t("requestSummary") }}

+
{{ filteredSummary.totalRequests }} {{ t("total") }}
+
+ + +
+
+ {{ t("success") }}: {{ filteredSummary.successCount }} + {{ t("failed") }}: {{ filteredSummary.errorCount }} + {{ t("aborted") }}: {{ filteredSummary.abortedCount }} +
+
+
+
+
+
+
+ +
+
+

{{ t("apiFormat") }}

+
+
+ + {{ translateLabel(item.key) }} + {{ item.count }} +
+
+
+
+

{{ t("streamingMode") }}

+
+
+ {{ translateLabel(item.key) }} + {{ item.count }} +
+
+
+
+
+
+ +
+
+
+

{{ t("accountUsageBreakdown") }}

+
+
+ {{ t("noUsageStats") }} +
+
+ + + + + + + + + + + + + + + + + + + + + + + + + +
{{ t("account") }}{{ t("total") }}{{ t("success") }}{{ t("failed") }}{{ t("aborted") }}{{ t("successRate") }}{{ t("avgDuration") }}{{ t("modelUsage") }}
{{ item.totalRequests }}{{ item.successCount }}{{ item.errorCount }}{{ item.abortedCount }}{{ item.successRate }}%{{ formatDuration(item.avgDurationMs) }} +
+ + {{ mc.key }}: {{ mc.count }} + +
+
+
+
+
+ +
+
+
+

{{ t("requestRecords") }}

+ {{ t("recentToOldest") }} +
+
+ {{ t("noRequestRecords") }} +
+
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
{{ t("requestTime") }}{{ t("requestId") }}{{ t("apiFormat") }}{{ t("streamingMode") }}{{ t("requestModel") }}{{ t("requestOutcome") }}{{ t("requestStatus") }}{{ t("requestDuration") }}{{ t("requestAccount") }}{{ t("requestAttempts") }}{{ t("requestIp") }}
{{ formatDateTime(record.startedAt) }}{{ record.requestId }}{{ translateLabel(record.apiFormat) }}{{ translateLabel(record.streamMode) }}{{ record.model || "-" }} + + {{ translateLabel(record.outcome) }} + + {{ record.statusCode ?? "-" }}{{ formatDuration(record.durationMs) }}{{ formatAccount(record.finalAuthIndex, record.finalAccountName) }} + {{ record.attemptCount }} + {{ record.clientIp || "-" }}
+
+
+
+
+