-
Notifications
You must be signed in to change notification settings - Fork 552
Expand file tree
/
Copy pathproxy.ts
More file actions
2505 lines (2213 loc) · 85.2 KB
/
proxy.ts
File metadata and controls
2505 lines (2213 loc) · 85.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/**
* Local x402 Proxy Server
*
* Sits between OpenClaw's pi-ai (which makes standard OpenAI-format requests)
* and BlockRun's API (which requires x402 micropayments).
*
* Flow:
* pi-ai → http://localhost:{port}/v1/chat/completions
* → proxy forwards to https://blockrun.ai/api/v1/chat/completions
* → gets 402 → @x402/fetch signs payment → retries
* → streams response back to pi-ai
*
* Optimizations (v0.3.0):
* - SSE heartbeat: for streaming requests, sends headers + heartbeat immediately
* before the x402 flow, preventing OpenClaw's 10-15s timeout from firing.
* - Response dedup: hashes request bodies and caches responses for 30s,
* preventing double-charging when OpenClaw retries after timeout.
* - Payment cache: after first 402, pre-signs subsequent requests to skip
* the 402 round trip (~200ms savings per request).
* - Smart routing: when model is "blockrun/auto", classify query and pick cheapest model.
* - Usage logging: log every request as JSON line to ~/.openclaw/blockrun/logs/
*/
import { createServer, type IncomingMessage, type ServerResponse } from "node:http";
import { finished } from "node:stream";
import type { AddressInfo } from "node:net";
import { privateKeyToAccount } from "viem/accounts";
import { createPaymentFetch, type PreAuthParams } from "./x402.js";
import {
route,
getFallbackChain,
getFallbackChainFiltered,
calculateModelCost,
DEFAULT_ROUTING_CONFIG,
type RouterOptions,
type RoutingDecision,
type RoutingConfig,
type ModelPricing,
} from "./router/index.js";
import {
BLOCKRUN_MODELS,
OPENCLAW_MODELS,
resolveModelAlias,
getModelContextWindow,
isReasoningModel,
} from "./models.js";
import { logUsage, type UsageEntry } from "./logger.js";
import { getStats } from "./stats.js";
import { RequestDeduplicator } from "./dedup.js";
import { ResponseCache, type ResponseCacheConfig } from "./response-cache.js";
import { BalanceMonitor } from "./balance.js";
import { compressContext, shouldCompress, type NormalizedMessage } from "./compression/index.js";
// Error classes available for programmatic use but not used in proxy
// (universal free fallback means we don't throw balance errors anymore)
// import { InsufficientFundsError, EmptyWalletError } from "./errors.js";
import { USER_AGENT } from "./version.js";
import { SessionStore, getSessionId, type SessionConfig } from "./session.js";
import { checkForUpdates } from "./updater.js";
import { PROXY_PORT } from "./config.js";
import { SessionJournal } from "./journal.js";
const BLOCKRUN_API = "https://blockrun.ai/api";
// Routing profile models - virtual models that trigger intelligent routing
const AUTO_MODEL = "blockrun/auto";
const ROUTING_PROFILES = new Set([
"blockrun/free",
"free",
"blockrun/eco",
"eco",
"blockrun/auto",
"auto",
"blockrun/premium",
"premium",
]);
const FREE_MODEL = "nvidia/gpt-oss-120b"; // Free model for empty wallet fallback
const MAX_MESSAGES = 200; // BlockRun API limit - truncate older messages if exceeded
const CONTEXT_LIMIT_KB = 5120; // Server-side limit: 5MB in KB
const HEARTBEAT_INTERVAL_MS = 2_000;
const DEFAULT_REQUEST_TIMEOUT_MS = 180_000; // 3 minutes (allows for on-chain tx + LLM response)
const MAX_FALLBACK_ATTEMPTS = 5; // Maximum models to try in fallback chain (increased from 3 to ensure cheap models are tried)
const HEALTH_CHECK_TIMEOUT_MS = 2_000; // Timeout for checking existing proxy
const RATE_LIMIT_COOLDOWN_MS = 60_000; // 60 seconds cooldown for rate-limited models
const PORT_RETRY_ATTEMPTS = 5; // Max attempts to bind port (handles TIME_WAIT)
const PORT_RETRY_DELAY_MS = 1_000; // Delay between retry attempts
/**
* Transform upstream payment errors into user-friendly messages.
* Parses the raw x402 error and formats it nicely.
*/
function transformPaymentError(errorBody: string): string {
try {
// Try to parse the error JSON
const parsed = JSON.parse(errorBody) as {
error?: string;
details?: string;
};
// Check if this is a payment verification error
if (parsed.error === "Payment verification failed" && parsed.details) {
// Extract the nested JSON from details
// Format: "Verification failed: {json}\n"
const match = parsed.details.match(/Verification failed:\s*(\{.*\})/s);
if (match) {
const innerJson = JSON.parse(match[1]) as {
invalidMessage?: string;
invalidReason?: string;
payer?: string;
};
if (innerJson.invalidReason === "insufficient_funds" && innerJson.invalidMessage) {
// Parse "insufficient balance: 251 < 11463"
const balanceMatch = innerJson.invalidMessage.match(
/insufficient balance:\s*(\d+)\s*<\s*(\d+)/i,
);
if (balanceMatch) {
const currentMicros = parseInt(balanceMatch[1], 10);
const requiredMicros = parseInt(balanceMatch[2], 10);
const currentUSD = (currentMicros / 1_000_000).toFixed(6);
const requiredUSD = (requiredMicros / 1_000_000).toFixed(6);
const wallet = innerJson.payer || "unknown";
const shortWallet =
wallet.length > 12 ? `${wallet.slice(0, 6)}...${wallet.slice(-4)}` : wallet;
return JSON.stringify({
error: {
message: `Insufficient USDC balance. Current: $${currentUSD}, Required: ~$${requiredUSD}`,
type: "insufficient_funds",
wallet: wallet,
current_balance_usd: currentUSD,
required_usd: requiredUSD,
help: `Fund wallet ${shortWallet} with USDC on Base, or use free model: /model free`,
},
});
}
}
// Handle invalid_payload errors (signature issues, malformed payment)
if (innerJson.invalidReason === "invalid_payload") {
return JSON.stringify({
error: {
message: "Payment signature invalid. This may be a temporary issue.",
type: "invalid_payload",
help: "Try again. If this persists, reinstall ClawRouter: curl -fsSL https://blockrun.ai/ClawRouter-update | bash",
},
});
}
}
}
// Handle settlement failures (gas estimation, on-chain errors)
if (parsed.error === "Settlement failed" || parsed.details?.includes("Settlement failed")) {
const details = parsed.details || "";
const gasError = details.includes("unable to estimate gas");
return JSON.stringify({
error: {
message: gasError
? "Payment failed: network congestion or gas issue. Try again."
: "Payment settlement failed. Try again in a moment.",
type: "settlement_failed",
help: "This is usually temporary. If it persists, try: /model free",
},
});
}
} catch {
// If parsing fails, return original
}
return errorBody;
}
/**
* Track rate-limited models to avoid hitting them again.
* Maps model ID to the timestamp when the rate limit was hit.
*/
const rateLimitedModels = new Map<string, number>();
/**
* Check if a model is currently rate-limited (in cooldown period).
*/
function isRateLimited(modelId: string): boolean {
const hitTime = rateLimitedModels.get(modelId);
if (!hitTime) return false;
const elapsed = Date.now() - hitTime;
if (elapsed >= RATE_LIMIT_COOLDOWN_MS) {
rateLimitedModels.delete(modelId);
return false;
}
return true;
}
/**
* Mark a model as rate-limited.
*/
function markRateLimited(modelId: string): void {
rateLimitedModels.set(modelId, Date.now());
console.log(`[ClawRouter] Model ${modelId} rate-limited, will deprioritize for 60s`);
}
/**
* Reorder models to put rate-limited ones at the end.
*/
function prioritizeNonRateLimited(models: string[]): string[] {
const available: string[] = [];
const rateLimited: string[] = [];
for (const model of models) {
if (isRateLimited(model)) {
rateLimited.push(model);
} else {
available.push(model);
}
}
return [...available, ...rateLimited];
}
/**
* Check if response socket is writable (prevents write-after-close errors).
* Returns true only if all conditions are safe for writing.
*/
function canWrite(res: ServerResponse): boolean {
return (
!res.writableEnded &&
!res.destroyed &&
res.socket !== null &&
!res.socket.destroyed &&
res.socket.writable
);
}
/**
* Safe write with backpressure handling.
* Returns true if write succeeded, false if socket is closed or write failed.
*/
function safeWrite(res: ServerResponse, data: string | Buffer): boolean {
if (!canWrite(res)) {
return false;
}
return res.write(data);
}
// Extra buffer for balance check (on top of estimateAmount's 20% buffer)
// Total effective buffer: 1.2 * 1.5 = 1.8x (80% safety margin)
// This prevents x402 payment failures after streaming headers are sent,
// which would trigger OpenClaw's 5-24 hour billing cooldown.
const BALANCE_CHECK_BUFFER = 1.5;
/**
* Get the proxy port from pre-loaded configuration.
* Port is validated at module load time, this just returns the cached value.
*/
export function getProxyPort(): number {
return PROXY_PORT;
}
/**
* Check if a proxy is already running on the given port.
* Returns the wallet address if running, undefined otherwise.
*/
async function checkExistingProxy(port: number): Promise<string | undefined> {
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), HEALTH_CHECK_TIMEOUT_MS);
try {
const response = await fetch(`http://127.0.0.1:${port}/health`, {
signal: controller.signal,
});
clearTimeout(timeoutId);
if (response.ok) {
const data = (await response.json()) as { status?: string; wallet?: string };
if (data.status === "ok" && data.wallet) {
return data.wallet;
}
}
return undefined;
} catch {
clearTimeout(timeoutId);
return undefined;
}
}
/**
* Error patterns that indicate a provider-side issue (not user's fault).
* These errors should trigger fallback to the next model in the chain.
*/
const PROVIDER_ERROR_PATTERNS = [
/billing/i,
/insufficient.*balance/i,
/credits/i,
/quota.*exceeded/i,
/rate.*limit/i,
/model.*unavailable/i,
/model.*not.*available/i,
/service.*unavailable/i,
/capacity/i,
/overloaded/i,
/temporarily.*unavailable/i,
/api.*key.*invalid/i,
/authentication.*failed/i,
/request too large/i,
/request.*size.*exceeds/i,
/payload too large/i,
];
/**
* "Successful" response bodies that are actually provider degradation placeholders.
* Some upstream providers occasionally return these with HTTP 200.
*/
const DEGRADED_RESPONSE_PATTERNS = [
/the ai service is temporarily overloaded/i,
/service is temporarily overloaded/i,
/please try again in a moment/i,
];
/**
* Known low-quality loop signatures seen during provider degradation windows.
*/
const DEGRADED_LOOP_PATTERNS = [
/the boxed is the response\./i,
/the response is the text\./i,
/the final answer is the boxed\./i,
];
function extractAssistantContent(payload: unknown): string | undefined {
if (!payload || typeof payload !== "object") return undefined;
const record = payload as Record<string, unknown>;
const choices = record.choices;
if (!Array.isArray(choices) || choices.length === 0) return undefined;
const firstChoice = choices[0];
if (!firstChoice || typeof firstChoice !== "object") return undefined;
const choice = firstChoice as Record<string, unknown>;
const message = choice.message;
if (!message || typeof message !== "object") return undefined;
const content = (message as Record<string, unknown>).content;
return typeof content === "string" ? content : undefined;
}
function hasKnownLoopSignature(text: string): boolean {
const matchCount = DEGRADED_LOOP_PATTERNS.reduce(
(count, pattern) => (pattern.test(text) ? count + 1 : count),
0,
);
if (matchCount >= 2) return true;
// Generic repetitive loop fallback for short repeated lines.
const lines = text
.split(/\r?\n/)
.map((line) => line.trim())
.filter(Boolean);
if (lines.length < 8) return false;
const counts = new Map<string, number>();
for (const line of lines) {
counts.set(line, (counts.get(line) ?? 0) + 1);
}
const maxRepeat = Math.max(...counts.values());
const uniqueRatio = counts.size / lines.length;
return maxRepeat >= 3 && uniqueRatio <= 0.45;
}
/**
* Detect degraded 200-response payloads that should trigger model fallback.
* Returns a short reason when fallback should happen, otherwise undefined.
*/
export function detectDegradedSuccessResponse(body: string): string | undefined {
const trimmed = body.trim();
if (!trimmed) return undefined;
// Plain-text placeholder response.
if (DEGRADED_RESPONSE_PATTERNS.some((pattern) => pattern.test(trimmed))) {
return "degraded response: overloaded placeholder";
}
// Plain-text looping garbage response.
if (hasKnownLoopSignature(trimmed)) {
return "degraded response: repetitive loop output";
}
try {
const parsed = JSON.parse(trimmed) as Record<string, unknown>;
// Some providers return JSON error payloads with HTTP 200.
const errorField = parsed.error;
let errorText = "";
if (typeof errorField === "string") {
errorText = errorField;
} else if (errorField && typeof errorField === "object") {
const errObj = errorField as Record<string, unknown>;
errorText = [
typeof errObj.message === "string" ? errObj.message : "",
typeof errObj.type === "string" ? errObj.type : "",
typeof errObj.code === "string" ? errObj.code : "",
]
.filter(Boolean)
.join(" ");
}
if (errorText && PROVIDER_ERROR_PATTERNS.some((pattern) => pattern.test(errorText))) {
return `degraded response: ${errorText.slice(0, 120)}`;
}
// Successful wrapper with bad assistant content.
const assistantContent = extractAssistantContent(parsed);
if (!assistantContent) return undefined;
if (DEGRADED_RESPONSE_PATTERNS.some((pattern) => pattern.test(assistantContent))) {
return "degraded response: overloaded assistant content";
}
if (hasKnownLoopSignature(assistantContent)) {
return "degraded response: repetitive assistant loop";
}
} catch {
// Not JSON - handled by plaintext checks above.
}
return undefined;
}
/**
* HTTP status codes that indicate provider issues worth retrying with fallback.
*/
const FALLBACK_STATUS_CODES = [
400, // Bad request - sometimes used for billing errors
401, // Unauthorized - provider API key issues
402, // Payment required - but from upstream, not x402
403, // Forbidden - provider restrictions
413, // Payload too large - request exceeds model's context limit
429, // Rate limited
500, // Internal server error
502, // Bad gateway
503, // Service unavailable
504, // Gateway timeout
];
/**
* Check if an error response indicates a provider issue that should trigger fallback.
*/
function isProviderError(status: number, body: string): boolean {
// Check status code first
if (!FALLBACK_STATUS_CODES.includes(status)) {
return false;
}
// For 5xx errors, always fallback
if (status >= 500) {
return true;
}
// For 4xx errors, check the body for known provider error patterns
return PROVIDER_ERROR_PATTERNS.some((pattern) => pattern.test(body));
}
/**
* Valid message roles for OpenAI-compatible APIs.
* Some clients send non-standard roles (e.g., "developer" instead of "system").
*/
const VALID_ROLES = new Set(["system", "user", "assistant", "tool", "function"]);
/**
* Role mappings for non-standard roles.
* Maps client-specific roles to standard OpenAI roles.
*/
const ROLE_MAPPINGS: Record<string, string> = {
developer: "system", // OpenAI's newer API uses "developer" for system messages
model: "assistant", // Some APIs use "model" instead of "assistant"
};
type ChatMessage = { role: string; content: string | unknown };
/**
* Anthropic tool ID pattern: only alphanumeric, underscore, and hyphen allowed.
* Error: "messages.X.content.Y.tool_use.id: String should match pattern '^[a-zA-Z0-9_-]+$'"
*/
const VALID_TOOL_ID_PATTERN = /^[a-zA-Z0-9_-]+$/;
/**
* Sanitize a tool ID to match Anthropic's required pattern.
* Replaces invalid characters with underscores.
*/
function sanitizeToolId(id: string | undefined): string | undefined {
if (!id || typeof id !== "string") return id;
if (VALID_TOOL_ID_PATTERN.test(id)) return id;
// Replace invalid characters with underscores
return id.replace(/[^a-zA-Z0-9_-]/g, "_");
}
/**
* Type for messages with tool calls (OpenAI format).
*/
type MessageWithTools = ChatMessage & {
tool_calls?: Array<{ id?: string; type?: string; function?: unknown }>;
tool_call_id?: string;
};
/**
* Type for content blocks that may contain tool IDs (Anthropic format in OpenAI wrapper).
*/
type ContentBlock = {
type?: string;
id?: string;
tool_use_id?: string;
[key: string]: unknown;
};
/**
* Sanitize all tool IDs in messages to match Anthropic's pattern.
* Handles both OpenAI format (tool_calls, tool_call_id) and content block formats.
*/
function sanitizeToolIds(messages: ChatMessage[]): ChatMessage[] {
if (!messages || messages.length === 0) return messages;
let hasChanges = false;
const sanitized = messages.map((msg) => {
const typedMsg = msg as MessageWithTools;
let msgChanged = false;
let newMsg = { ...msg } as MessageWithTools;
// Sanitize tool_calls[].id in assistant messages
if (typedMsg.tool_calls && Array.isArray(typedMsg.tool_calls)) {
const newToolCalls = typedMsg.tool_calls.map((tc) => {
if (tc.id && typeof tc.id === "string") {
const sanitized = sanitizeToolId(tc.id);
if (sanitized !== tc.id) {
msgChanged = true;
return { ...tc, id: sanitized };
}
}
return tc;
});
if (msgChanged) {
newMsg = { ...newMsg, tool_calls: newToolCalls };
}
}
// Sanitize tool_call_id in tool messages
if (typedMsg.tool_call_id && typeof typedMsg.tool_call_id === "string") {
const sanitized = sanitizeToolId(typedMsg.tool_call_id);
if (sanitized !== typedMsg.tool_call_id) {
msgChanged = true;
newMsg = { ...newMsg, tool_call_id: sanitized };
}
}
// Sanitize content blocks if content is an array (Anthropic-style content)
if (Array.isArray(typedMsg.content)) {
const newContent = (typedMsg.content as ContentBlock[]).map((block) => {
if (!block || typeof block !== "object") return block;
let blockChanged = false;
let newBlock = { ...block };
// tool_use blocks have "id"
if (block.type === "tool_use" && block.id && typeof block.id === "string") {
const sanitized = sanitizeToolId(block.id);
if (sanitized !== block.id) {
blockChanged = true;
newBlock = { ...newBlock, id: sanitized };
}
}
// tool_result blocks have "tool_use_id"
if (
block.type === "tool_result" &&
block.tool_use_id &&
typeof block.tool_use_id === "string"
) {
const sanitized = sanitizeToolId(block.tool_use_id);
if (sanitized !== block.tool_use_id) {
blockChanged = true;
newBlock = { ...newBlock, tool_use_id: sanitized };
}
}
if (blockChanged) {
msgChanged = true;
return newBlock;
}
return block;
});
if (msgChanged) {
newMsg = { ...newMsg, content: newContent };
}
}
if (msgChanged) {
hasChanges = true;
return newMsg;
}
return msg;
});
return hasChanges ? sanitized : messages;
}
/**
* Normalize message roles to standard OpenAI format.
* Converts non-standard roles (e.g., "developer") to valid ones.
*/
function normalizeMessageRoles(messages: ChatMessage[]): ChatMessage[] {
if (!messages || messages.length === 0) return messages;
let hasChanges = false;
const normalized = messages.map((msg) => {
if (VALID_ROLES.has(msg.role)) return msg;
const mappedRole = ROLE_MAPPINGS[msg.role];
if (mappedRole) {
hasChanges = true;
return { ...msg, role: mappedRole };
}
// Unknown role - default to "user" to avoid API errors
hasChanges = true;
return { ...msg, role: "user" };
});
return hasChanges ? normalized : messages;
}
/**
* Normalize messages for Google models.
* Google's Gemini API requires the first non-system message to be from "user".
* If conversation starts with "assistant"/"model", prepend a placeholder user message.
*/
function normalizeMessagesForGoogle(messages: ChatMessage[]): ChatMessage[] {
if (!messages || messages.length === 0) return messages;
// Find first non-system message
let firstNonSystemIdx = -1;
for (let i = 0; i < messages.length; i++) {
if (messages[i].role !== "system") {
firstNonSystemIdx = i;
break;
}
}
// If no non-system messages, return as-is
if (firstNonSystemIdx === -1) return messages;
const firstRole = messages[firstNonSystemIdx].role;
// If first non-system message is already "user", no change needed
if (firstRole === "user") return messages;
// If first non-system message is "assistant" or "model", prepend a user message
if (firstRole === "assistant" || firstRole === "model") {
const normalized = [...messages];
normalized.splice(firstNonSystemIdx, 0, {
role: "user",
content: "(continuing conversation)",
});
return normalized;
}
return messages;
}
/**
* Check if a model is a Google model that requires message normalization.
*/
function isGoogleModel(modelId: string): boolean {
return modelId.startsWith("google/") || modelId.startsWith("gemini");
}
/**
* Extended message type for thinking-enabled conversations.
*/
type ExtendedChatMessage = ChatMessage & {
tool_calls?: unknown[];
reasoning_content?: unknown;
};
/**
* Normalize messages for thinking-enabled requests.
* When thinking/extended_thinking is enabled, assistant messages with tool_calls
* must have reasoning_content (can be empty string if not present).
* Error: "400 thinking is enabled but reasoning_content is missing in assistant tool call message"
*/
function normalizeMessagesForThinking(messages: ExtendedChatMessage[]): ExtendedChatMessage[] {
if (!messages || messages.length === 0) return messages;
let hasChanges = false;
const normalized = messages.map((msg) => {
// Skip if not assistant or already has reasoning_content
if (msg.role !== "assistant" || msg.reasoning_content !== undefined) {
return msg;
}
// Check for OpenAI format: tool_calls array
const hasOpenAIToolCalls =
msg.tool_calls && Array.isArray(msg.tool_calls) && msg.tool_calls.length > 0;
// Check for Anthropic format: content array with tool_use blocks
const hasAnthropicToolUse =
Array.isArray(msg.content) &&
(msg.content as Array<{ type?: string }>).some((block) => block?.type === "tool_use");
if (hasOpenAIToolCalls || hasAnthropicToolUse) {
hasChanges = true;
return { ...msg, reasoning_content: "" };
}
return msg;
});
return hasChanges ? normalized : messages;
}
/**
* Result of truncating messages.
*/
type TruncationResult<T> = {
messages: T[];
wasTruncated: boolean;
originalCount: number;
truncatedCount: number;
};
/**
* Truncate messages to stay under BlockRun's MAX_MESSAGES limit.
* Keeps all system messages and the most recent conversation history.
* Returns the messages and whether truncation occurred.
*/
function truncateMessages<T extends { role: string }>(messages: T[]): TruncationResult<T> {
if (!messages || messages.length <= MAX_MESSAGES) {
return {
messages,
wasTruncated: false,
originalCount: messages?.length ?? 0,
truncatedCount: messages?.length ?? 0,
};
}
// Separate system messages from conversation
const systemMsgs = messages.filter((m) => m.role === "system");
const conversationMsgs = messages.filter((m) => m.role !== "system");
// Keep all system messages + most recent conversation messages
const maxConversation = MAX_MESSAGES - systemMsgs.length;
const truncatedConversation = conversationMsgs.slice(-maxConversation);
const result = [...systemMsgs, ...truncatedConversation];
console.log(
`[ClawRouter] Truncated messages: ${messages.length} → ${result.length} (kept ${systemMsgs.length} system + ${truncatedConversation.length} recent)`,
);
return {
messages: result,
wasTruncated: true,
originalCount: messages.length,
truncatedCount: result.length,
};
}
// Kimi/Moonshot models use special Unicode tokens for thinking boundaries.
// Pattern: <|begin▁of▁thinking|>content<|end▁of▁thinking|>
// The | is fullwidth vertical bar (U+FF5C), ▁ is lower one-eighth block (U+2581).
// Match full Kimi thinking blocks: <|begin...|>content<|end...|>
const KIMI_BLOCK_RE = /<[||][^<>]*begin[^<>]*[||]>[\s\S]*?<[||][^<>]*end[^<>]*[||]>/gi;
// Match standalone Kimi tokens like <|end▁of▁thinking|>
const KIMI_TOKEN_RE = /<[||][^<>]*[||]>/g;
// Standard thinking tags that may leak through from various models
const THINKING_TAG_RE = /<\s*\/?\s*(?:think(?:ing)?|thought|antthinking)\b[^>]*>/gi;
// Full thinking blocks: <think>content</think>
const THINKING_BLOCK_RE =
/<\s*(?:think(?:ing)?|thought|antthinking)\b[^>]*>[\s\S]*?<\s*\/\s*(?:think(?:ing)?|thought|antthinking)\s*>/gi;
/**
* Strip thinking tokens and blocks from model response content.
* Handles both Kimi-style Unicode tokens and standard XML-style tags.
*
* NOTE: DSML tags (<|DSML|...>) are NOT stripped - those are tool calls
* that should be handled by the API, not hidden from users.
*/
function stripThinkingTokens(content: string): string {
if (!content) return content;
// Strip full Kimi thinking blocks first (begin...end with content)
let cleaned = content.replace(KIMI_BLOCK_RE, "");
// Strip remaining standalone Kimi tokens
cleaned = cleaned.replace(KIMI_TOKEN_RE, "");
// Strip full thinking blocks (<think>...</think>)
cleaned = cleaned.replace(THINKING_BLOCK_RE, "");
// Strip remaining standalone thinking tags
cleaned = cleaned.replace(THINKING_TAG_RE, "");
return cleaned;
}
/** Callback info for low balance warning */
export type LowBalanceInfo = {
balanceUSD: string;
walletAddress: string;
};
/** Callback info for insufficient funds error */
export type InsufficientFundsInfo = {
balanceUSD: string;
requiredUSD: string;
walletAddress: string;
};
export type ProxyOptions = {
walletKey: string;
apiBase?: string;
/** Port to listen on (default: 8402) */
port?: number;
routingConfig?: Partial<RoutingConfig>;
/**
* Custom payment-enabled fetch function. When provided, replaces the built-in
* x402 payment handler. Use this to integrate alternative payment systems
* (e.g., spend limits, budget enforcement, or custom payment flows).
*
* If not provided, uses built-in createPaymentFetch with walletKey.
*/
paymentFetch?: (
input: RequestInfo | URL,
init?: RequestInit,
preAuth?: PreAuthParams,
) => Promise<Response>;
/** Request timeout in ms (default: 180000 = 3 minutes). Covers on-chain tx + LLM response. */
requestTimeoutMs?: number;
/** Skip balance checks (for testing only). Default: false */
skipBalanceCheck?: boolean;
/**
* Session persistence config. When enabled, maintains model selection
* across requests within a session to prevent mid-task model switching.
*/
sessionConfig?: Partial<SessionConfig>;
/**
* Auto-compress large requests to reduce network usage.
* When enabled, requests are automatically compressed using
* LLM-safe context compression (15-40% reduction).
* Default: true
*/
autoCompressRequests?: boolean;
/**
* Threshold in KB to trigger auto-compression (default: 180).
* Requests larger than this are compressed before sending.
* Set to 0 to compress all requests.
*/
compressionThresholdKB?: number;
/**
* Response caching config. When enabled, identical requests return
* cached responses instead of making new API calls.
* Default: enabled with 10 minute TTL, 200 max entries.
*/
cacheConfig?: ResponseCacheConfig;
onReady?: (port: number) => void;
onError?: (error: Error) => void;
onPayment?: (info: { model: string; amount: string; network: string }) => void;
onRouted?: (decision: RoutingDecision) => void;
/** Called when balance drops below $1.00 (warning, request still proceeds) */
onLowBalance?: (info: LowBalanceInfo) => void;
/** Called when balance is insufficient for a request (request fails) */
onInsufficientFunds?: (info: InsufficientFundsInfo) => void;
};
export type ProxyHandle = {
port: number;
baseUrl: string;
walletAddress: string;
balanceMonitor: BalanceMonitor;
close: () => Promise<void>;
};
/**
* Build model pricing map from BLOCKRUN_MODELS.
*/
function buildModelPricing(): Map<string, ModelPricing> {
const map = new Map<string, ModelPricing>();
for (const m of BLOCKRUN_MODELS) {
if (m.id === AUTO_MODEL) continue; // skip meta-model
map.set(m.id, { inputPrice: m.inputPrice, outputPrice: m.outputPrice });
}
return map;
}
type ModelListEntry = {
id: string;
object: "model";
created: number;
owned_by: string;
};
/**
* Build `/v1/models` response entries from the full OpenClaw model registry.
* This includes alias IDs (e.g., `flash`, `kimi`) so `/model <alias>` works reliably.
*/
export function buildProxyModelList(
createdAt: number = Math.floor(Date.now() / 1000),
): ModelListEntry[] {
const seen = new Set<string>();
return OPENCLAW_MODELS.filter((model) => {
if (seen.has(model.id)) return false;
seen.add(model.id);
return true;
}).map((model) => ({
id: model.id,
object: "model",
created: createdAt,
owned_by: model.id.includes("/") ? (model.id.split("/")[0] ?? "blockrun") : "blockrun",
}));
}
/**
* Merge partial routing config overrides with defaults.
*/
function mergeRoutingConfig(overrides?: Partial<RoutingConfig>): RoutingConfig {
if (!overrides) return DEFAULT_ROUTING_CONFIG;
return {
...DEFAULT_ROUTING_CONFIG,
...overrides,
classifier: { ...DEFAULT_ROUTING_CONFIG.classifier, ...overrides.classifier },
scoring: { ...DEFAULT_ROUTING_CONFIG.scoring, ...overrides.scoring },
tiers: { ...DEFAULT_ROUTING_CONFIG.tiers, ...overrides.tiers },
overrides: { ...DEFAULT_ROUTING_CONFIG.overrides, ...overrides.overrides },
};
}
/**
* Estimate USDC cost for a request based on model pricing.
* Returns amount string in USDC smallest unit (6 decimals) or undefined if unknown.
*/
function estimateAmount(
modelId: string,
bodyLength: number,
maxTokens: number,
): string | undefined {
const model = BLOCKRUN_MODELS.find((m) => m.id === modelId);
if (!model) return undefined;
// Rough estimate: ~4 chars per token for input
const estimatedInputTokens = Math.ceil(bodyLength / 4);
const estimatedOutputTokens = maxTokens || model.maxOutput || 4096;
const costUsd =
(estimatedInputTokens / 1_000_000) * model.inputPrice +
(estimatedOutputTokens / 1_000_000) * model.outputPrice;
// Convert to USDC 6-decimal integer, add 20% buffer for estimation error
// Minimum 100 ($0.0001) to avoid zero-amount rejections
const amountMicros = Math.max(100, Math.ceil(costUsd * 1.2 * 1_000_000));
return amountMicros.toString();
}
/**
* Proxy a partner API request through x402 payment flow.
*
* Simplified proxy for partner endpoints (/v1/x/*, /v1/partner/*).
* No smart routing, SSE, compression, or sessions — just collect body,
* forward via payFetch (which handles 402 automatically), and stream back.
*/
async function proxyPartnerRequest(
req: IncomingMessage,
res: ServerResponse,
apiBase: string,
payFetch: (
input: RequestInfo | URL,
init?: RequestInit,
preAuth?: PreAuthParams,
) => Promise<Response>,
): Promise<void> {
const startTime = Date.now();
const upstreamUrl = `${apiBase}${req.url}`;
// Collect request body
const bodyChunks: Buffer[] = [];
for await (const chunk of req) {
bodyChunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk));
}
const body = Buffer.concat(bodyChunks);
// Forward headers (strip hop-by-hop)
const headers: Record<string, string> = {};
for (const [key, value] of Object.entries(req.headers)) {
if (key === "host" || key === "connection" || key === "transfer-encoding" || key === "content-length")
continue;
if (typeof value === "string") headers[key] = value;
}
if (!headers["content-type"]) headers["content-type"] = "application/json";
headers["user-agent"] = USER_AGENT;
console.log(`[ClawRouter] Partner request: ${req.method} ${req.url}`);
const upstream = await payFetch(upstreamUrl, {
method: req.method ?? "POST",
headers,
body: body.length > 0 ? new Uint8Array(body) : undefined,
});
// Forward response headers