diff --git a/configs/config.json.example b/configs/config.json.example index f45655de4..1df5b9bfa 100644 --- a/configs/config.json.example +++ b/configs/config.json.example @@ -13,6 +13,7 @@ "PROMPT_LOG_MODE": "none", "REQUEST_MAX_RETRIES": 3, "REQUEST_BASE_DELAY": 1000, + "REQUEST_MAX_RETRY_TIME_MS": 30000, "CRON_NEAR_MINUTES": 1, "CRON_REFRESH_TOKEN": false, "PROVIDER_POOLS_FILE_PATH": "configs/provider_pools.json", diff --git a/src/providers/lmarena/lmarena-core.js b/src/providers/lmarena/lmarena-core.js new file mode 100644 index 000000000..c1dbfd5ee --- /dev/null +++ b/src/providers/lmarena/lmarena-core.js @@ -0,0 +1,287 @@ +/** + * LMArenaBridge Provider Core + * + * Forwards OpenAI-format chat requests to a running LMArenaBridge Python sidecar. + * LMArenaBridge (https://github.com/CloudWaddie/LMArenaBridge) exposes every model + * available on LMArena's platform (GPT-5, Claude Opus 4+, Gemini 3 Pro, etc.) as a + * single OpenAI-compatible endpoint. + * + * Configuration keys (per pool node): + * LMARENA_BRIDGE_URL Required. URL of the running LMArenaBridge sidecar. + * e.g. "http://localhost:8000" + * LMARENA_BRIDGE_API_KEY Optional. API key if the bridge requires authentication. + * LMARENA_MODEL_OVERRIDE Optional. Force a specific LMArena model for all requests. + * + * Shared retry settings (global config): + * REQUEST_MAX_RETRIES Max number of retry attempts (default: 3). + * REQUEST_BASE_DELAY Base delay in ms for exponential back-off (default: 1000). + * REQUEST_MAX_RETRY_TIME_MS Cap on total retry time in ms; retries stop once this is + * exceeded regardless of REQUEST_MAX_RETRIES (default: 30000). + * + * Setup: + * pip install lmarenabridge camoufox + * python -m lmarenabridge # or: lmarena-bridge --port 8000 + */ + +import axios from 'axios'; +import logger from '../../utils/logger.js'; +import * as http from 'http'; +import * as https from 'https'; +import { configureAxiosProxy, configureTLSSidecar } from '../../utils/proxy-utils.js'; +import { isRetryableNetworkError, MODEL_PROVIDER } from '../../utils/common.js'; +import { PROVIDER_MODELS } from '../provider-models.js'; + +const LMARENA_HEALTH_TIMEOUT_MS = 5000; +const LMARENA_REQUEST_TIMEOUT_MS = 120000; + +// Single source of truth for the model list lives in provider-models.js. +const _lmarenaModels = PROVIDER_MODELS['lmarena-bridge']; +if (!_lmarenaModels || _lmarenaModels.length === 0) { + logger.warn('[LMArena] No models found for lmarena-bridge in PROVIDER_MODELS. Check provider-models.js.'); +} +export const LMARENA_MODELS = _lmarenaModels || []; + +/** + * Parse an SSE byte stream into JSON objects. + * + * Handles: + * - JSON payloads split across multiple chunks (buffer accumulation) + * - Streams that end without a trailing newline (flush of remaining buffer) + * - The `[DONE]` sentinel that signals end-of-stream + * - Non-JSON `data:` lines (e.g. keep-alive pings) — these are silently skipped + * at debug level so logs stay clean in production + * + * @param {AsyncIterable} stream + * @yields {object} Parsed JSON chunks + */ +export async function* parseSSEStream(stream) { + let buffer = ''; + + for await (const chunk of stream) { + buffer += chunk.toString(); + let newlineIndex; + while ((newlineIndex = buffer.indexOf('\n')) !== -1) { + const line = buffer.substring(0, newlineIndex).trim(); + buffer = buffer.substring(newlineIndex + 1); + + if (!line.startsWith('data: ')) continue; + const jsonData = line.substring(6).trim(); + if (jsonData === '[DONE]') return; + + try { + yield JSON.parse(jsonData); + } catch { + // Non-JSON data lines are intentionally skipped (e.g. SSE comments, keep-alives). + logger.debug('[LMArena] Skipping non-JSON SSE line:', jsonData); + } + } + } + + // Flush any remaining buffer content after the stream ends (no trailing newline case) + const remaining = buffer.trim(); + if (remaining.length > 0 && remaining.startsWith('data: ')) { + const jsonData = remaining.substring(6).trim(); + if (jsonData !== '[DONE]') { + try { + yield JSON.parse(jsonData); + } catch { + logger.debug('[LMArena] Skipping non-JSON SSE line (flush):', jsonData); + } + } + } +} + +export class LMArenaApiService { + constructor(config) { + if (!config.LMARENA_BRIDGE_URL) { + throw new Error( + '[LMArena] LMARENA_BRIDGE_URL is required. ' + + 'Start the LMArenaBridge sidecar and set this to its base URL (e.g. http://localhost:8000).' + ); + } + + this.config = config; + this.baseUrl = config.LMARENA_BRIDGE_URL.replace(/\/$/, ''); + this.apiKey = config.LMARENA_BRIDGE_API_KEY || null; + this.modelOverride = config.LMARENA_MODEL_OVERRIDE || null; + this.isInitialized = false; + + const httpAgent = new http.Agent({ keepAlive: true, maxSockets: 50 }); + const httpsAgent = new https.Agent({ keepAlive: true, maxSockets: 50 }); + + const headers = { 'Content-Type': 'application/json' }; + if (this.apiKey) { + headers['Authorization'] = `Bearer ${this.apiKey}`; + } + + const axiosConfig = { + baseURL: this.baseUrl, + httpAgent, + httpsAgent, + headers, + proxy: false, + }; + + configureAxiosProxy(axiosConfig, config, MODEL_PROVIDER.LMARENA_BRIDGE); + this.axiosInstance = axios.create(axiosConfig); + } + + _applySidecar(axiosConfig) { + return configureTLSSidecar(axiosConfig, this.config, MODEL_PROVIDER.LMARENA_BRIDGE, this.baseUrl); + } + + /** + * Verify that the LMArenaBridge sidecar is reachable. + * Called during pool initialization. + */ + async initialize() { + try { + const axiosConfig = { method: 'get', url: '/health', timeout: LMARENA_HEALTH_TIMEOUT_MS }; + this._applySidecar(axiosConfig); + await this.axiosInstance.request(axiosConfig); + this.isInitialized = true; + logger.info(`[LMArena] Sidecar reachable at ${this.baseUrl}`); + } catch (err) { + // Sidecar not running — mark as uninitialized but do not crash. + // Requests will fail gracefully so the pool manager can rotate. + logger.warn(`[LMArena] Sidecar health check failed (${this.baseUrl}): ${err.message}`); + this.isInitialized = false; + } + return this.isInitialized; + } + + /** + * Ping the sidecar and update isInitialized status. + */ + async healthCheck() { + try { + const axiosConfig = { method: 'get', url: '/health', timeout: LMARENA_HEALTH_TIMEOUT_MS }; + this._applySidecar(axiosConfig); + await this.axiosInstance.request(axiosConfig); + this.isInitialized = true; + return true; + } catch { + this.isInitialized = false; + return false; + } + } + + /** + * Map a BAP model name to the LMArena model string. + * "lmarena-auto" lets the bridge pick any available arena model. + */ + _resolveModel(model) { + if (this.modelOverride) return this.modelOverride; + if (!model || model === 'lmarena-auto') return null; // bridge default + return model; + } + + async _callApi(body, isStream = false, retryCount = 0, retryStartTime = null) { + if (!this.isInitialized) { + await this.initialize(); + if (!this.isInitialized) { + const err = new Error('[LMArena] Sidecar is not available. Is it running?'); + err.shouldSwitchCredential = true; + throw err; + } + } + + const maxRetries = this.config.REQUEST_MAX_RETRIES ?? 3; + const baseDelay = this.config.REQUEST_BASE_DELAY ?? 1000; + const maxRetryTimeMs = this.config.REQUEST_MAX_RETRY_TIME_MS ?? 30000; + const startTime = retryStartTime ?? Date.now(); + + try { + const resolvedModel = this._resolveModel(body.model); + const payload = { ...body }; + if (resolvedModel !== null) payload.model = resolvedModel; + if (resolvedModel === null) delete payload.model; // bridge picks automatically + + const axiosConfig = { + method: 'post', + url: '/v1/chat/completions', + data: payload, + timeout: LMARENA_REQUEST_TIMEOUT_MS, + }; + if (isStream) { + axiosConfig.responseType = 'stream'; + } + this._applySidecar(axiosConfig); + const response = await this.axiosInstance.request(axiosConfig); + return response; + } catch (error) { + const status = error.response?.status; + const isNetworkError = isRetryableNetworkError(error); + const elapsed = Date.now() - startTime; + + if ((status === 503 || isNetworkError) && retryCount < maxRetries && elapsed < maxRetryTimeMs) { + const delay = baseDelay * Math.pow(2, retryCount); + logger.warn(`[LMArena] Retrying (attempt ${retryCount + 1}/${maxRetries}) after ${delay}ms`); + await new Promise(r => setTimeout(r, delay)); + return this._callApi(body, isStream, retryCount + 1, startTime); + } + + if (status === 429 || status === 401 || status === 403) { + error.shouldSwitchCredential = true; + } + if (status >= 500 && status < 600 && retryCount < maxRetries && elapsed < maxRetryTimeMs) { + const delay = baseDelay * Math.pow(2, retryCount); + logger.warn(`[LMArena] Server error ${status}, retrying in ${delay}ms`); + await new Promise(r => setTimeout(r, delay)); + return this._callApi(body, isStream, retryCount + 1, startTime); + } + + logger.error(`[LMArena] API error (status=${status || error.code}): ${error.message}`); + throw error; + } + } + + async generateContent(model, requestBody) { + // Strip internal BAP metadata fields + const body = { ...requestBody, model }; + delete body._monitorRequestId; + delete body._requestBaseUrl; + + const response = await this._callApi(body, false); + return response.data; + } + + async *generateContentStream(model, requestBody) { + const body = { ...requestBody, model }; + delete body._monitorRequestId; + delete body._requestBaseUrl; + + const response = await this._callApi(body, true); + yield* parseSSEStream(response.data); + } + + async listModels() { + try { + const axiosConfig = { + method: 'get', + url: '/v1/models', + timeout: LMARENA_HEALTH_TIMEOUT_MS, + }; + this._applySidecar(axiosConfig); + const response = await this.axiosInstance.request(axiosConfig); + return response.data; + } catch (err) { + logger.warn(`[LMArena] listModels failed: ${err.message}`); + // Fallback: return static model list + return { + object: 'list', + data: LMARENA_MODELS.map(id => ({ + id, + object: 'model', + created: 0, + owned_by: 'lmarena-bridge', + })), + }; + } + } + + isExpiryDateNear() { + // LMArenaBridge manages its own token refresh internally. + return false; + } +} diff --git a/src/providers/lmarena/lmarena-strategy.js b/src/providers/lmarena/lmarena-strategy.js new file mode 100644 index 000000000..4ec32ed47 --- /dev/null +++ b/src/providers/lmarena/lmarena-strategy.js @@ -0,0 +1,11 @@ +/** + * LMArena Provider Strategy + * + * Uses the OpenAI protocol strategy since LMArenaBridge exposes a + * fully OpenAI-compatible /v1/chat/completions endpoint. + * + * We re-export OpenAIStrategy so the gateway routing layer knows + * this provider speaks OpenAI protocol. + */ + +export { OpenAIStrategy as LMArenaStrategy } from '../openai/openai-strategy.js'; diff --git a/src/providers/provider-models.js b/src/providers/provider-models.js index be56f1278..0b2e9b0dd 100644 --- a/src/providers/provider-models.js +++ b/src/providers/provider-models.js @@ -132,6 +132,25 @@ export const PROVIDER_MODELS = { 'grok-imagine-1.0-edit', 'grok-imagine-1.0-fast', 'grok-imagine-1.0-fast-edit', + ], + 'lmarena-bridge': [ + 'lmarena-auto', + 'gpt-5', + 'gpt-5-mini', + 'claude-opus-4-7', + 'claude-opus-4-6', + 'claude-sonnet-4-6', + 'claude-opus-4-5', + 'gemini-3-pro', + 'gemini-3-flash', + 'gemini-2.5-pro', + 'gemini-2.5-flash', + 'grok-4.20', + 'grok-3', + 'deepseek-r2', + 'llama-4-scout', + 'llama-4-maverick', + 'mistral-large-3', ] }; diff --git a/src/utils/constants.js b/src/utils/constants.js index 0429b6af3..9f4bc48d6 100644 --- a/src/utils/constants.js +++ b/src/utils/constants.js @@ -65,5 +65,6 @@ export const MODEL_PROVIDER = { CODEX_API: 'openai-codex-oauth', FORWARD_API: 'forward-api', GROK_CUSTOM: 'grok-custom', + LMARENA_BRIDGE: 'lmarena-bridge', AUTO: 'auto', }; diff --git a/tests/lmarena-core.unit.test.js b/tests/lmarena-core.unit.test.js new file mode 100644 index 000000000..faf314b12 --- /dev/null +++ b/tests/lmarena-core.unit.test.js @@ -0,0 +1,280 @@ +/** + * Unit tests for lmarena-core.js + * + * Covers: + * 1. SSE parsing (parseSSEStream): + * - JSON payload split across multiple chunks (buffer accumulation) + * - Stream ending without trailing newline (flush behavior) + * - [DONE] sentinel terminates the stream + * - Non-JSON data: lines are skipped deterministically (no throw) + * 2. _callApi retry logic (LMArenaApiService): + * - Retries stop when REQUEST_MAX_RETRY_TIME_MS cap is exceeded + * - Last error is surfaced after time cap + * - Retries still respect REQUEST_MAX_RETRIES within the time cap + */ + +import { describe, test, expect, jest, beforeEach, afterEach } from '@jest/globals'; + +// --------------------------------------------------------------------------- +// Module mocks — must be declared before the module under test is imported +// (babel-jest hoists jest.mock() calls to the top of the file) +// --------------------------------------------------------------------------- + +jest.mock('../src/utils/logger.js', () => ({ + __esModule: true, + default: { debug: jest.fn(), info: jest.fn(), warn: jest.fn(), error: jest.fn() }, +})); + +jest.mock('../src/utils/proxy-utils.js', () => ({ + configureAxiosProxy: jest.fn(), + configureTLSSidecar: jest.fn(cfg => cfg), +})); + +jest.mock('../src/utils/common.js', () => ({ + MODEL_PROVIDER: { LMARENA_BRIDGE: 'lmarena-bridge' }, + isRetryableNetworkError: (err) => + ['ECONNRESET', 'ETIMEDOUT', 'ECONNREFUSED', 'ENETUNREACH'].includes(err.code), +})); + +jest.mock('../src/providers/provider-models.js', () => ({ + PROVIDER_MODELS: { 'lmarena-bridge': ['lmarena-auto', 'gpt-4o'] }, +})); + +// --------------------------------------------------------------------------- +// Import the module under test AFTER mocks are defined +// --------------------------------------------------------------------------- + +import { parseSSEStream, LMArenaApiService } from '../src/providers/lmarena/lmarena-core.js'; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +/** + * Build an async generator from an array of string/Buffer chunks, + * simulating a real network stream. + */ +async function* makeStream(...chunks) { + for (const chunk of chunks) { + yield Buffer.from(chunk); + } +} + +/** Collect all yielded values from an async generator into an array. */ +async function collect(gen) { + const items = []; + for await (const item of gen) { + items.push(item); + } + return items; +} + +/** Make a network-style error with a given code. */ +function makeNetworkError(code) { + const err = new Error(`mock network error: ${code}`); + err.code = code; + return err; +} + +/** Make an HTTP-style axios error with a given status code. */ +function makeHttpError(status) { + const err = new Error(`Request failed with status code ${status}`); + err.response = { status }; + return err; +} + +// --------------------------------------------------------------------------- +// SSE parsing tests (parseSSEStream) +// --------------------------------------------------------------------------- + +describe('parseSSEStream – SSE parsing behavior', () => { + test('parses a single complete SSE line', async () => { + const stream = makeStream('data: {"id":1}\n'); + const results = await collect(parseSSEStream(stream)); + expect(results).toEqual([{ id: 1 }]); + }); + + test('accumulates JSON payload split across multiple chunks', async () => { + // The JSON is split right in the middle; the parser must buffer correctly. + const stream = makeStream( + 'data: {"id"', // incomplete JSON, no newline yet + ':2,"tok":"hello"}\n' + ); + const results = await collect(parseSSEStream(stream)); + expect(results).toEqual([{ id: 2, tok: 'hello' }]); + }); + + test('flushes remaining buffer when stream ends without trailing newline', async () => { + // The last `data:` line has no trailing '\n' — the flush path must yield it. + const stream = makeStream( + 'data: {"id":3,"final":true}' // no trailing newline + ); + const results = await collect(parseSSEStream(stream)); + expect(results).toEqual([{ id: 3, final: true }]); + }); + + test('[DONE] terminates the stream and no further chunks are yielded', async () => { + const stream = makeStream( + 'data: {"id":4}\n', + 'data: [DONE]\n', + 'data: {"id":5}\n' // must NOT be yielded + ); + const results = await collect(parseSSEStream(stream)); + expect(results).toEqual([{ id: 4 }]); + }); + + test('[DONE] in flush (no trailing newline) terminates cleanly with no yield', async () => { + const stream = makeStream( + 'data: {"id":6}\n', + 'data: [DONE]' // no trailing newline — flush path + ); + const results = await collect(parseSSEStream(stream)); + expect(results).toEqual([{ id: 6 }]); + }); + + test('non-JSON data: line is skipped deterministically and does not throw', async () => { + // A keep-alive ping or SSE comment ("data: keep-alive") must be silently skipped, + // not cause an error or stop the stream. + const stream = makeStream( + 'data: {"id":7}\n', + 'data: keep-alive\n', // not valid JSON — must skip + 'data: {"id":8}\n' + ); + const results = await collect(parseSSEStream(stream)); + // Only the two valid JSON chunks should appear; the ping is silently dropped. + expect(results).toEqual([{ id: 7 }, { id: 8 }]); + }); + + test('non-JSON data: line in flush position is skipped without throwing', async () => { + // Same as above but the bad line is the last thing in the stream (flush path). + const stream = makeStream( + 'data: {"id":9}\n', + 'data: not-json' // flush path, not valid JSON + ); + const results = await collect(parseSSEStream(stream)); + expect(results).toEqual([{ id: 9 }]); + }); + + test('empty lines and non-data SSE fields are ignored', async () => { + const stream = makeStream( + '\n', + 'event: content\n', + 'id: 1\n', + 'data: {"id":10}\n', + '\n' + ); + const results = await collect(parseSSEStream(stream)); + expect(results).toEqual([{ id: 10 }]); + }); + + test('multiple JSON chunks in a single received buffer segment', async () => { + // Two complete `data:` lines arrive in one chunk. + const stream = makeStream('data: {"id":11}\ndata: {"id":12}\n'); + const results = await collect(parseSSEStream(stream)); + expect(results).toEqual([{ id: 11 }, { id: 12 }]); + }); +}); + +// --------------------------------------------------------------------------- +// Retry cap tests (LMArenaApiService._callApi) +// --------------------------------------------------------------------------- + +describe('LMArenaApiService._callApi – retry time cap', () => { + let service; + + beforeEach(() => { + jest.useFakeTimers(); + + // Build a minimal service instance that bypasses real HTTP. + // We set isInitialized=true and replace axiosInstance.request with a spy. + service = new LMArenaApiService({ LMARENA_BRIDGE_URL: 'http://localhost:8000' }); + service.isInitialized = true; + }); + + afterEach(() => { + jest.useRealTimers(); + }); + + test('stops retrying and surfaces last error when REQUEST_MAX_RETRY_TIME_MS is exceeded', async () => { + // With a 1 s cap and base delay of 1 000 ms, the very first retry waits + // 1 000 ms, after which elapsed >= 1 000 ms cap, so no second retry happens. + service.config = { + ...service.config, + REQUEST_MAX_RETRIES: 10, // high count — cap should win + REQUEST_BASE_DELAY: 1000, // 1 s base delay + REQUEST_MAX_RETRY_TIME_MS: 1000, // 1 s total cap + }; + + const networkErr = makeNetworkError('ECONNRESET'); + service.axiosInstance = { request: jest.fn().mockRejectedValue(networkErr) }; + + // Attach the assertion handler BEFORE running timers to avoid unhandled rejection. + const assertionPromise = expect(service._callApi({ model: 'test' })) + .rejects.toMatchObject({ code: 'ECONNRESET' }); + + await jest.runAllTimersAsync(); + await assertionPromise; + + // Should have been called only a small number of times (1 initial + ≤ 1 retry) + expect(service.axiosInstance.request.mock.calls.length).toBeGreaterThanOrEqual(1); + expect(service.axiosInstance.request.mock.calls.length).toBeLessThanOrEqual(3); + }); + + test('retries up to REQUEST_MAX_RETRIES when safely within time cap', async () => { + // Cap is very large (1 hour); retry count limit of 2 should govern. + service.config = { + ...service.config, + REQUEST_MAX_RETRIES: 2, + REQUEST_BASE_DELAY: 100, + REQUEST_MAX_RETRY_TIME_MS: 3_600_000, // 1 hour — won't be hit + }; + + const networkErr = makeNetworkError('ETIMEDOUT'); + service.axiosInstance = { request: jest.fn().mockRejectedValue(networkErr) }; + + const assertionPromise = expect(service._callApi({ model: 'test' })) + .rejects.toMatchObject({ code: 'ETIMEDOUT' }); + + await jest.runAllTimersAsync(); + await assertionPromise; + + // 1 initial attempt + 2 retries = 3 total + expect(service.axiosInstance.request).toHaveBeenCalledTimes(3); + }); + + test('does not retry when REQUEST_MAX_RETRY_TIME_MS is 0', async () => { + service.config = { + ...service.config, + REQUEST_MAX_RETRIES: 5, + REQUEST_BASE_DELAY: 100, + REQUEST_MAX_RETRY_TIME_MS: 0, // immediate cap — no retries allowed + }; + + const networkErr = makeNetworkError('ECONNRESET'); + service.axiosInstance = { request: jest.fn().mockRejectedValue(networkErr) }; + + const assertionPromise = expect(service._callApi({ model: 'test' })) + .rejects.toMatchObject({ code: 'ECONNRESET' }); + + await jest.runAllTimersAsync(); + await assertionPromise; + + // Zero-time cap: elapsed(0) >= 0 → no retry, only 1 attempt. + expect(service.axiosInstance.request).toHaveBeenCalledTimes(1); + }); + + test('marks 429 error with shouldSwitchCredential regardless of retry cap', async () => { + service.config = { + ...service.config, + REQUEST_MAX_RETRIES: 3, + REQUEST_BASE_DELAY: 100, + REQUEST_MAX_RETRY_TIME_MS: 30000, + }; + + const httpErr = makeHttpError(429); + service.axiosInstance = { request: jest.fn().mockRejectedValue(httpErr) }; + + const thrown = await service._callApi({ model: 'test' }).catch(e => e); + expect(thrown.shouldSwitchCredential).toBe(true); + }); +});