Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ Notes:
| `retryable_failure_cooldown_secs` | `15` | Cooldown window after retryable failures that should temporarily sideline an upstream. `0` disables cooldown. Reloading or restarting the running proxy resets current cooldown state. |
| `tray_token_rate.enabled` | `true` | macOS tray live rate; harmless elsewhere. |
| `tray_token_rate.format` | `split` | `combined` (`total`), `split` (`↑in ↓out`), `both` (`total | ↑in ↓out`). |
| `upstream_strategy` | `priority_fill_first` | `priority_fill_first` (default) keeps trying the highest-priority group in list order; `priority_round_robin` rotates within each priority group. |
| `upstream_strategy` | `{ "order": "fill_first", "dispatch": { "type": "serial" } }` | Structured strategy object. `order` controls candidate ordering inside one priority group; `dispatch` controls serial / hedged / race execution. |

### Upstream entries (`upstreams[]`)
| Field | Default | Notes |
Expand Down Expand Up @@ -147,7 +147,14 @@ Notes:
- **Gemini**: `upstream.api_key` → `x-goog-api-key` → query `?key=...` → error.

## Load balancing & retries
- Priorities: higher `priority` groups first; inside a group use list order (fill-first) or round-robin (if `priority_round_robin`).
- Priorities: higher `priority` groups first.
- `upstream_strategy.order` controls selection inside the same priority group:
- `fill_first`: keep the configured list order.
- `round_robin`: rotate the starting point across requests.
- `upstream_strategy.dispatch` controls how requests are launched inside one priority group:
- `{"type":"serial"}`: try one candidate at a time.
- `{"type":"hedged","delay_ms":2000,"max_parallel":2}`: launch the first candidate immediately, then add one more attempt after `delay_ms` if the prior attempt is still unresolved, up to `max_parallel`.
- `{"type":"race","max_parallel":3}`: launch up to `max_parallel` candidates immediately and take the first successful result.
- Retryable conditions: network timeout/connect errors, or status 400/401/403/404/408/422/429/307/5xx (including 504/524). Retries stay within the same provider's priority groups.
- Cooldown conditions: `401/403/408/429/5xx` will temporarily move the failed upstream behind ready peers for `retryable_failure_cooldown_secs` (default `15`); `400/404/422/307` stay retryable but do not trigger cross-request cooldown.
- `/v1/messages` only: after the chosen native provider is exhausted (retryable errors), the proxy can fall back to the other native provider (`anthropic` ↔ `kiro`) if it is configured.
Expand Down
11 changes: 9 additions & 2 deletions README.zh-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ pnpm exec tsc --noEmit
| `retryable_failure_cooldown_secs` | `15` | 对适合短时降级的可重试失败施加冷却窗口;`0` 表示关闭冷却。重载或重启运行中的代理会重置当前冷却状态 |
| `tray_token_rate.enabled` | `true` | macOS 托盘实时速率;其他平台无害 |
| `tray_token_rate.format` | `split` | `combined`(总数) / `split`(↑入 ↓出) / `both`(总数 | ↑入 ↓出) |
| `upstream_strategy` | `priority_fill_first` | `priority_fill_first` 默认先填满高优先级;`priority_round_robin` 在同组内轮询 |
| `upstream_strategy` | `{ "order": "fill_first", "dispatch": { "type": "serial" } }` | 结构化策略对象。`order` 控制同一优先级组内的候选顺序;`dispatch` 控制串行 / hedged / race 派发方式 |

### 上游条目(`upstreams[]`)
| 字段 | 默认值 | 说明 |
Expand Down Expand Up @@ -147,7 +147,14 @@ pnpm exec tsc --noEmit
- **Gemini**:`upstream.api_key` → `x-goog-api-key` → 查询参数 `?key=` → 报错

## 负载均衡与重试
- 优先级:高优先级组先尝试;组内按列表顺序(fill-first)或轮询(round-robin)
- 优先级:高优先级组先尝试。
- `upstream_strategy.order` 控制同一优先级组内的选择顺序:
- `fill_first`:保持配置列表顺序。
- `round_robin`:跨请求轮换起点。
- `upstream_strategy.dispatch` 控制同一优先级组内的发起方式:
- `{"type":"serial"}`:一次只尝试一个候选。
- `{"type":"hedged","delay_ms":2000,"max_parallel":2}`:先立即发第一个;若 `delay_ms` 后仍未决,再补发下一个,最多并发到 `max_parallel`。
- `{"type":"race","max_parallel":3}`:立即并发发起最多 `max_parallel` 个候选,谁先成功就返回谁。
- 可重试条件:网络超时/连接错误,或状态码 400/401/403/404/408/422/429/307/5xx(包含 504/524);重试只在同一 provider 的优先级组内进行
- 冷却条件:`401/403/408/429/5xx` 会让失败 upstream 在 `retryable_failure_cooldown_secs`(默认 `15`)内被暂时后置;`400/404/422/307` 仍可重试,但不会触发跨请求冷却
- 仅 `/v1/messages`:当命中的 native provider(`anthropic`/`kiro`)被耗尽(仍是可重试错误)时,若另一个 native provider 已配置,会自动 fallback(Anthropic ↔ Kiro)
Expand Down
6 changes: 6 additions & 0 deletions crates/token_proxy_core/src/proxy/config/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ const DEFAULT_CONFIG_HEADER: &str = concat!(
"// Token Proxy config (JSONC). Comments and trailing commas are supported.\n",
"// log_level (optional): silent|error|warn|info|debug|trace. Default: silent.\n",
"// upstream_no_data_timeout_secs (optional): upstream no-data timeout in seconds. Minimum: 3. Default: 120.\n",
"// upstream_strategy (optional): { order: \"fill_first\"|\"round_robin\", dispatch: { type: \"serial\"|\"hedged\"|\"race\", ... } }.\n",
"// Example hedged: { \"order\": \"round_robin\", \"dispatch\": { \"type\": \"hedged\", \"delay_ms\": 2000, \"max_parallel\": 2 } }\n",
"// upstreams[].api_keys (optional): one or more API keys for the same upstream. Example: [\"key-a\", \"key-b\"].\n",
"// app_proxy_url (optional): http(s)://... | socks5(h)://... (used for app updates and upstream proxy reuse).\n",
"// upstreams[].proxy_url (optional): empty => direct; \"$app_proxy_url\" => use app_proxy_url; or an explicit proxy URL.\n",
Expand Down Expand Up @@ -218,3 +220,7 @@ async fn ensure_parent_dir(path: &Path) -> Result<(), String> {
);
Ok(())
}

#[cfg(test)]
#[path = "io.test.rs"]
mod tests;
28 changes: 28 additions & 0 deletions crates/token_proxy_core/src/proxy/config/io.test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use super::*;
use std::path::Path;

#[test]
fn parse_config_file_migrates_legacy_upstream_strategy_string() {
let parsed = parse_config_file(
r#"
{
"host": "127.0.0.1",
"port": 9208,
"upstream_strategy": "priority_fill_first",
"upstreams": []
}
"#,
Path::new("/tmp/config.jsonc"),
)
.expect("legacy config should migrate");

assert!(parsed.migrated);
assert_eq!(
parsed.config.upstream_strategy.order,
crate::proxy::config::UpstreamOrderStrategy::FillFirst
);
assert_eq!(
parsed.config.upstream_strategy.dispatch,
crate::proxy::config::UpstreamDispatchStrategy::Serial
);
}
38 changes: 37 additions & 1 deletion crates/token_proxy_core/src/proxy/config/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,16 @@ pub(super) fn migrate_config_json(root: &mut Value) -> bool {
.is_some_and(|obj| obj.contains_key("api_key"))
})
});
let is_legacy_config = had_legacy_enable || had_legacy_provider || had_legacy_api_key;
let had_legacy_upstream_strategy = root_obj
.get("upstream_strategy")
.and_then(Value::as_str)
.is_some_and(|value| {
matches!(value.trim(), "priority_fill_first" | "priority_round_robin")
});
let is_legacy_config = had_legacy_enable
|| had_legacy_provider
|| had_legacy_api_key
|| had_legacy_upstream_strategy;

// 仅当检测到旧字段时才进行迁移;否则避免对新配置做“默认填充”,改变用户语义。
if !is_legacy_config {
Expand All @@ -45,6 +54,7 @@ pub(super) fn migrate_config_json(root: &mut Value) -> bool {

let mut changed = false;
changed |= had_legacy_enable;
changed |= migrate_legacy_upstream_strategy(root_obj);

let Some(upstreams_value) = root_obj.get_mut("upstreams") else {
return changed;
Expand All @@ -60,6 +70,32 @@ pub(super) fn migrate_config_json(root: &mut Value) -> bool {
changed
}

fn migrate_legacy_upstream_strategy(root_obj: &mut Map<String, Value>) -> bool {
let Some(value) = root_obj.get("upstream_strategy").and_then(Value::as_str) else {
return false;
};
let order = match value.trim() {
"priority_fill_first" => "fill_first",
"priority_round_robin" => "round_robin",
_ => return false,
};

root_obj.insert(
"upstream_strategy".to_string(),
Value::Object(Map::from_iter([
("order".to_string(), Value::String(order.to_string())),
(
"dispatch".to_string(),
Value::Object(Map::from_iter([(
"type".to_string(),
Value::String("serial".to_string()),
)])),
),
])),
);
true
}

fn migrate_single_upstream(upstream: &mut Value, legacy_enable_conversion: bool) -> bool {
let Some(obj) = upstream.as_object_mut() else {
return false;
Expand Down
49 changes: 49 additions & 0 deletions crates/token_proxy_core/src/proxy/config/migrate.test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,52 @@ fn migrate_api_key_to_api_keys() {
assert_eq!(keys.len(), 1);
assert_eq!(keys[0].as_str(), Some("key-1"));
}
#[test]
fn migrate_legacy_upstream_strategy_string_to_structured_fill_first_serial() {
let mut value = parse_json(
r#"
{
"host": "127.0.0.1",
"port": 9208,
"upstream_strategy": "priority_fill_first",
"upstreams": []
}
"#,
);

let changed = migrate_config_json(&mut value);
assert!(changed);

assert_eq!(
value["upstream_strategy"],
serde_json::json!({
"order": "fill_first",
"dispatch": { "type": "serial" }
})
);
}

#[test]
fn migrate_legacy_upstream_strategy_string_to_structured_round_robin_serial() {
let mut value = parse_json(
r#"
{
"host": "127.0.0.1",
"port": 9208,
"upstream_strategy": "priority_round_robin",
"upstreams": []
}
"#,
);

let changed = migrate_config_json(&mut value);
assert!(changed);

assert_eq!(
value["upstream_strategy"],
serde_json::json!({
"order": "round_robin",
"dispatch": { "type": "serial" }
})
);
}
45 changes: 43 additions & 2 deletions crates/token_proxy_core/src/proxy/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ const MIN_UPSTREAM_NO_DATA_TIMEOUT_SECS: u64 = 3;
pub use types::{
ConfigResponse, HeaderOverride, InboundApiFormat, KiroPreferredEndpoint, ProviderUpstreams,
ProxyConfig, ProxyConfigFile, TrayTokenRateConfig, TrayTokenRateFormat, UpstreamConfig,
UpstreamGroup, UpstreamOverrides, UpstreamRuntime, UpstreamStrategy,
UpstreamDispatchRuntime, UpstreamDispatchStrategy, UpstreamGroup, UpstreamOrderStrategy,
UpstreamOverrides, UpstreamRuntime, UpstreamStrategy, UpstreamStrategyRuntime,
};

pub async fn read_config(paths: &TokenProxyPaths) -> Result<ConfigResponse, String> {
Expand Down Expand Up @@ -75,7 +76,7 @@ fn build_runtime_config(config: ProxyConfigFile) -> Result<ProxyConfig, String>
upstream_no_data_timeout: resolve_upstream_no_data_timeout(
config.upstream_no_data_timeout_secs,
)?,
upstream_strategy: config.upstream_strategy,
upstream_strategy: resolve_upstream_strategy(config.upstream_strategy)?,
upstreams,
kiro_preferred_endpoint: config.kiro_preferred_endpoint,
antigravity_user_agent: config.antigravity_user_agent,
Expand Down Expand Up @@ -103,6 +104,46 @@ fn resolve_upstream_no_data_timeout(value: u64) -> Result<Duration, String> {
Ok(duration)
}

fn resolve_upstream_strategy(value: UpstreamStrategy) -> Result<UpstreamStrategyRuntime, String> {
let dispatch = match value.dispatch {
UpstreamDispatchStrategy::Serial => UpstreamDispatchRuntime::Serial,
UpstreamDispatchStrategy::Hedged {
delay_ms,
max_parallel,
} => UpstreamDispatchRuntime::Hedged {
delay: resolve_hedged_delay(delay_ms)?,
max_parallel: resolve_parallel_attempts("hedged", max_parallel)?,
},
UpstreamDispatchStrategy::Race { max_parallel } => UpstreamDispatchRuntime::Race {
max_parallel: resolve_parallel_attempts("race", max_parallel)?,
},
};
Ok(UpstreamStrategyRuntime {
order: value.order,
dispatch,
})
}

fn resolve_hedged_delay(value: u64) -> Result<Duration, String> {
if value == 0 {
return Err("upstream_strategy.dispatch.delay_ms must be at least 1.".to_string());
}
let duration = Duration::from_millis(value);
if Instant::now().checked_add(duration).is_none() {
return Err("upstream_strategy.dispatch.delay_ms is too large.".to_string());
}
Ok(duration)
}

fn resolve_parallel_attempts(dispatch: &str, value: u64) -> Result<usize, String> {
if value < 2 {
return Err(format!(
"upstream_strategy.dispatch.max_parallel must be at least 2 for {dispatch}."
));
}
usize::try_from(value)
.map_err(|_| "upstream_strategy.dispatch.max_parallel is too large.".to_string())
}
fn resolve_max_request_body_bytes(value: Option<u64>) -> usize {
let value = value.unwrap_or(DEFAULT_MAX_REQUEST_BODY_BYTES);
let value = if value == 0 {
Expand Down
91 changes: 91 additions & 0 deletions crates/token_proxy_core/src/proxy/config/mod.test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,97 @@ fn build_runtime_config_maps_upstream_no_data_timeout_secs() {
assert_eq!(runtime.upstream_no_data_timeout, Duration::from_secs(3));
}

#[test]
fn build_runtime_config_maps_hedged_strategy() {
let mut config = ProxyConfigFile::default();
config.upstream_strategy = UpstreamStrategy {
order: UpstreamOrderStrategy::RoundRobin,
dispatch: UpstreamDispatchStrategy::Hedged {
delay_ms: 250,
max_parallel: 3,
},
};

let runtime = build_runtime_config(config).expect("runtime config");

assert_eq!(
runtime.upstream_strategy.order,
UpstreamOrderStrategy::RoundRobin
);
assert_eq!(
runtime.upstream_strategy.dispatch,
UpstreamDispatchRuntime::Hedged {
delay: Duration::from_millis(250),
max_parallel: 3,
}
);
}

#[test]
fn build_runtime_config_maps_race_strategy() {
let mut config = ProxyConfigFile::default();
config.upstream_strategy = UpstreamStrategy {
order: UpstreamOrderStrategy::RoundRobin,
dispatch: UpstreamDispatchStrategy::Race { max_parallel: 4 },
};

let runtime = build_runtime_config(config).expect("runtime config");

assert_eq!(
runtime.upstream_strategy.order,
UpstreamOrderStrategy::RoundRobin
);
assert_eq!(
runtime.upstream_strategy.dispatch,
UpstreamDispatchRuntime::Race { max_parallel: 4 }
);
}

#[test]
fn build_runtime_config_rejects_hedged_strategy_with_zero_delay() {
let mut config = ProxyConfigFile::default();
config.upstream_strategy = UpstreamStrategy {
order: UpstreamOrderStrategy::FillFirst,
dispatch: UpstreamDispatchStrategy::Hedged {
delay_ms: 0,
max_parallel: 2,
},
};

let result = build_runtime_config(config);

assert!(result.is_err());
}

#[test]
fn build_runtime_config_rejects_hedged_strategy_with_max_parallel_below_two() {
let mut config = ProxyConfigFile::default();
config.upstream_strategy = UpstreamStrategy {
order: UpstreamOrderStrategy::FillFirst,
dispatch: UpstreamDispatchStrategy::Hedged {
delay_ms: 250,
max_parallel: 1,
},
};

let result = build_runtime_config(config);

assert!(result.is_err());
}

#[test]
fn build_runtime_config_rejects_race_strategy_with_max_parallel_below_two() {
let mut config = ProxyConfigFile::default();
config.upstream_strategy = UpstreamStrategy {
order: UpstreamOrderStrategy::FillFirst,
dispatch: UpstreamDispatchStrategy::Race { max_parallel: 1 },
};

let result = build_runtime_config(config);

assert!(result.is_err());
}

#[test]
fn build_runtime_config_rejects_upstream_no_data_timeout_below_minimum() {
let mut config = ProxyConfigFile::default();
Expand Down
Loading
Loading