diff --git a/AGENTS.md b/AGENTS.md index 72fdba7..55f1820 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -12,6 +12,7 @@ ## 参考项目 +- 代理转发/转换参考[litellm](.reference/litellm) - 代理转发/转换参考[new-api](.reference/new-api) - kiro、codex、antigravity等2api参考[CLIProxyAPIPlus](.reference/CLIProxyAPIPlus) - CLIProxyAPIPlus的可视化app参考[quotio](.reference/quotio) diff --git a/crates/token_proxy_core/src/proxy/anthropic_compat.test.rs b/crates/token_proxy_core/src/proxy/anthropic_compat.test.rs index ae387b8..84f426a 100644 --- a/crates/token_proxy_core/src/proxy/anthropic_compat.test.rs +++ b/crates/token_proxy_core/src/proxy/anthropic_compat.test.rs @@ -89,6 +89,81 @@ fn anthropic_request_to_responses_maps_tools_and_tool_blocks() { assert_eq!(input_items[2]["output"], json!("ok")); } +#[test] +fn anthropic_request_to_responses_maps_reasoning_context_and_structured_output() { + let http_clients = ProxyHttpClients::new().expect("http clients"); + + let input = bytes_from_json(json!({ + "model": "claude-3-7-sonnet", + "max_tokens": 256, + "system": [{ "type": "text", "text": "sys" }], + "thinking": { "type": "enabled", "budget_tokens": 6000 }, + "output_format": { + "type": "json_schema", + "schema": { + "type": "object", + "properties": { "answer": { "type": "string" } }, + "required": ["answer"] + } + }, + "context_management": { + "edits": [ + { + "type": "compact_20260112", + "trigger": { "type": "input_tokens", "value": 150000 } + } + ] + }, + "metadata": { "user_id": "user-123" }, + "tools": [ + { "type": "web_search_20250305", "name": "web_search" } + ], + "messages": [ + { + "role": "assistant", + "content": [ + { "type": "thinking", "thinking": "chain-of-thought summary" }, + { "type": "text", "text": "draft answer" } + ] + } + ] + })); + + let output = run_async(async { + anthropic_request_to_responses(&input, &http_clients) + .await + .expect("transform") + }); + let value = json_from_bytes(output); + + assert_eq!(value["reasoning"]["effort"], json!("medium")); + assert_eq!(value["reasoning"]["summary"], json!("detailed")); + assert_eq!(value["text"]["format"]["type"], json!("json_schema")); + assert_eq!( + value["text"]["format"]["schema"]["required"], + json!(["answer"]) + ); + assert_eq!(value["context_management"][0]["type"], json!("compaction")); + assert_eq!( + value["context_management"][0]["compact_threshold"], + json!(150000) + ); + assert_eq!(value["user"], json!("user-123")); + assert_eq!(value["tools"][0]["type"], json!("web_search_preview")); + + let input_items = value["input"].as_array().expect("input array"); + assert_eq!(input_items.len(), 1); + assert_eq!(input_items[0]["type"], json!("message")); + assert_eq!(input_items[0]["role"], json!("assistant")); + assert_eq!(input_items[0]["content"][0]["type"], json!("output_text")); + assert_eq!( + input_items[0]["content"][0]["text"], + json!("chain-of-thought summary") + ); + assert_eq!(input_items[0]["content"][1]["type"], json!("output_text")); + assert_eq!(input_items[0]["content"][1]["text"], json!("draft answer")); +} + #[test] fn responses_request_to_anthropic_maps_tool_choice_and_tool_result() { let http_clients = ProxyHttpClients::new().expect("http clients"); @@ -157,6 +232,42 @@ fn responses_request_to_anthropic_maps_tool_choice_and_tool_result() { assert_eq!(messages[2]["content"][0]["content"], json!("ok")); } +#[test] +fn responses_response_to_anthropic_maps_reasoning_items_to_thinking_blocks() { + let input = bytes_from_json(json!({ + "id": "resp_reasoning_item", + "model": "gpt-5", + "output": [ + { + "id": "rs_1", + "type": "reasoning", + "summary": [ + { "type": "summary_text", "text": "first analyze then answer" } + ] + }, + { + "type": "message", + "role": "assistant", + "content": [ + { "type": "output_text", "text": "final answer" } + ] + } + ], + "usage": { "input_tokens": 3, "output_tokens": 5 } + })); + + let output = responses_response_to_anthropic(&input, None).expect("transform"); + let value = json_from_bytes(output); + + assert_eq!(value["content"][0]["type"], json!("thinking")); + assert_eq!( + value["content"][0]["thinking"], + json!("first analyze then answer") + ); + assert_eq!(value["content"][1]["type"], json!("text")); + assert_eq!(value["content"][1]["text"], json!("final answer")); +} + #[test] fn responses_response_to_anthropic_includes_thinking_block() { let input = bytes_from_json(json!({ diff --git a/crates/token_proxy_core/src/proxy/anthropic_compat/request.rs b/crates/token_proxy_core/src/proxy/anthropic_compat/request.rs index 9614550..462699b 100644 --- a/crates/token_proxy_core/src/proxy/anthropic_compat/request.rs +++ b/crates/token_proxy_core/src/proxy/anthropic_compat/request.rs @@ -153,6 +153,28 @@ pub(super) async fn anthropic_request_to_responses( out.insert("top_p".to_string(), top_p.clone()); } + if let Some(reasoning) = map_anthropic_thinking_to_responses_reasoning(object.get("thinking")) + { + out.insert("reasoning".to_string(), reasoning); + } + + if let Some(text_format) = map_anthropic_output_format_to_responses_text( + object.get("output_format"), + object.get("output_config"), + ) { + out.insert("text".to_string(), text_format); + } + + if let Some(context_management) = + map_anthropic_context_management_to_responses(object.get("context_management")) + { + out.insert("context_management".to_string(), context_management); + } + + if let Some(user) = map_anthropic_metadata_to_responses_user(object.get("metadata")) { + out.insert("user".to_string(), Value::String(user)); + } + if let Some(stop) = tools::map_anthropic_stop_sequences_to_openai_stop(object.get("stop_sequences")) { @@ -374,6 +396,13 @@ fn claude_message_to_responses_input_items( message_parts.push(json!({ "type": text_part_type, "text": text })); } } + "thinking" => { + if let Some(text) = block.get("thinking").and_then(Value::as_str) { + if !text.is_empty() { + message_parts.push(json!({ "type": "output_text", "text": text })); + } + } + } "image" => { if let Some(part) = media::claude_image_block_to_input_image_part(block) { message_parts.push(part); @@ -626,3 +655,95 @@ fn ensure_claude_content_array_in_place(content: &mut Value) { } *content = Value::Array(Vec::new()); } + +fn map_anthropic_thinking_to_responses_reasoning(value: Option<&Value>) -> Option { + let thinking = value?.as_object()?; + if thinking.get("type").and_then(Value::as_str) != Some("enabled") { + return None; + } + + let budget = thinking + .get("budget_tokens") + .and_then(Value::as_i64) + .unwrap_or(0); + let effort = if budget >= 10_000 { + "high" + } else if budget >= 5_000 { + "medium" + } else if budget >= 2_000 { + "low" + } else { + "minimal" + }; + + Some(json!({ + "effort": effort, + "summary": "detailed" + })) +} + +fn map_anthropic_output_format_to_responses_text( + output_format: Option<&Value>, + output_config: Option<&Value>, +) -> Option { + let format = match output_format { + Some(Value::Object(object)) => Some(object), + _ => output_config + .and_then(Value::as_object) + .and_then(|config| config.get("format")) + .and_then(Value::as_object), + }?; + + if format.get("type").and_then(Value::as_str) != Some("json_schema") { + return None; + } + let schema = format.get("schema")?; + Some(json!({ + "format": { + "type": "json_schema", + "name": "structured_output", + "schema": schema, + "strict": true + } + })) +} + +fn map_anthropic_context_management_to_responses(value: Option<&Value>) -> Option { + let context_management = value?.as_object()?; + let edits = context_management.get("edits")?.as_array()?; + let mut mapped = Vec::new(); + for edit in edits { + let Some(edit) = edit.as_object() else { + continue; + }; + if edit.get("type").and_then(Value::as_str) != Some("compact_20260112") { + continue; + } + let mut item = Map::new(); + item.insert("type".to_string(), json!("compaction")); + if let Some(value) = edit + .get("trigger") + .and_then(Value::as_object) + .and_then(|trigger| trigger.get("value")) + .and_then(Value::as_i64) + { + item.insert("compact_threshold".to_string(), json!(value)); + } + mapped.push(Value::Object(item)); + } + if mapped.is_empty() { + None + } else { + Some(Value::Array(mapped)) + } +} + +fn map_anthropic_metadata_to_responses_user(value: Option<&Value>) -> Option { + let metadata = value?.as_object()?; + let user = metadata.get("user_id")?.as_str()?.trim(); + if user.is_empty() { + None + } else { + Some(user.chars().take(64).collect()) + } +} diff --git a/crates/token_proxy_core/src/proxy/anthropic_compat/response.rs b/crates/token_proxy_core/src/proxy/anthropic_compat/response.rs index d738fc3..b50daed 100644 --- a/crates/token_proxy_core/src/proxy/anthropic_compat/response.rs +++ b/crates/token_proxy_core/src/proxy/anthropic_compat/response.rs @@ -52,6 +52,12 @@ pub(super) fn responses_response_to_anthropic( continue; }; match item.get("type").and_then(Value::as_str) { + Some("reasoning") => { + let summary = extract_reasoning_summary(item); + if !summary.is_empty() { + thinking_text.push_str(&summary); + } + } Some("message") => { if item.get("role").and_then(Value::as_str) != Some("assistant") { continue; @@ -203,6 +209,25 @@ pub(super) fn anthropic_response_to_responses(body: &Bytes) -> Result) -> String { + let Some(summary) = item.get("summary").and_then(Value::as_array) else { + return String::new(); + }; + let mut combined = String::new(); + for part in summary { + let Some(part) = part.as_object() else { + continue; + }; + if part.get("type").and_then(Value::as_str) != Some("summary_text") { + continue; + } + if let Some(text) = part.get("text").and_then(Value::as_str) { + combined.push_str(text); + } + } + combined +} + fn responses_function_call_to_tool_use(item: &Map) -> Option { let call_id = item.get("call_id").and_then(Value::as_str).unwrap_or(""); let item_id = item.get("id").and_then(Value::as_str).unwrap_or(""); diff --git a/crates/token_proxy_core/src/proxy/anthropic_compat/tools.rs b/crates/token_proxy_core/src/proxy/anthropic_compat/tools.rs index 8b2e9b6..adc80a9 100644 --- a/crates/token_proxy_core/src/proxy/anthropic_compat/tools.rs +++ b/crates/token_proxy_core/src/proxy/anthropic_compat/tools.rs @@ -62,6 +62,11 @@ pub(super) fn map_anthropic_tools_to_responses(value: &Value) -> Value { fn map_anthropic_tool(value: &Value) -> Option { let tool = value.as_object()?; + let tool_type = tool.get("type").and_then(Value::as_str).unwrap_or(""); + let tool_name = tool.get("name").and_then(Value::as_str).unwrap_or(""); + if tool_type.starts_with("web_search") || tool_name == "web_search" { + return Some(json!({ "type": "web_search_preview" })); + } let name = tool.get("name").and_then(Value::as_str)?; let mut out = Map::new(); out.insert("type".to_string(), json!("function")); diff --git a/crates/token_proxy_core/src/proxy/gemini_compat/request.rs b/crates/token_proxy_core/src/proxy/gemini_compat/request.rs index f69b24c..de15126 100644 --- a/crates/token_proxy_core/src/proxy/gemini_compat/request.rs +++ b/crates/token_proxy_core/src/proxy/gemini_compat/request.rs @@ -393,6 +393,10 @@ fn function_response_to_chat_message(part: &serde_json::Map) -> O if !name.is_empty() { if let Some(message) = message.as_object_mut() { message.insert("name".to_string(), Value::String(name.to_string())); + message.insert( + "tool_call_id".to_string(), + Value::String(format!("call_{name}")), + ); } } Some(message) diff --git a/crates/token_proxy_core/src/proxy/gemini_compat/request.test.rs b/crates/token_proxy_core/src/proxy/gemini_compat/request.test.rs index 4d49824..0105669 100644 --- a/crates/token_proxy_core/src/proxy/gemini_compat/request.test.rs +++ b/crates/token_proxy_core/src/proxy/gemini_compat/request.test.rs @@ -55,4 +55,39 @@ fn gemini_request_to_chat_maps_function_response() { let value: Value = serde_json::from_slice(&output).expect("json"); assert_eq!(value["messages"][0]["role"], json!("tool")); assert_eq!(value["messages"][0]["name"], json!("getFoo")); + assert_eq!(value["messages"][0]["tool_call_id"], json!("call_getFoo")); +} + +#[test] +fn gemini_request_to_chat_maps_parameters_json_schema() { + let input = json!({ + "contents": [ + { "role": "user", "parts": [{ "text": "hi" }] } + ], + "tools": [{ + "functionDeclarations": [ + { + "name": "getFoo", + "description": "x", + "parametersJsonSchema": { + "type": "object", + "properties": { "query": { "type": "string" } }, + "required": ["query"] + } + } + ] + }] + }); + + let output = gemini_request_to_chat( + &Bytes::from(serde_json::to_vec(&input).unwrap()), + Some("gemini-1.5-flash"), + ) + .expect("convert"); + let value: Value = serde_json::from_slice(&output).expect("json"); + + assert_eq!( + value["tools"][0]["function"]["parameters"]["properties"]["query"]["type"], + json!("string") + ); } diff --git a/crates/token_proxy_core/src/proxy/gemini_compat/stream.rs b/crates/token_proxy_core/src/proxy/gemini_compat/stream.rs index 96f732f..7f7317f 100644 --- a/crates/token_proxy_core/src/proxy/gemini_compat/stream.rs +++ b/crates/token_proxy_core/src/proxy/gemini_compat/stream.rs @@ -505,9 +505,18 @@ where if state.name.is_empty() { return None; } - let args: Value = serde_json::from_str(&state.arguments).unwrap_or_else(|_| json!({})); + let args = if state.arguments.is_empty() { + json!({}) + } else { + match serde_json::from_str::(&state.arguments) { + Ok(args) => args, + Err(_) => return None, + } + }; + let name = state.name.clone(); + self.tool_calls[index] = None; Some(json!({ - "functionCall": { "name": state.name, "args": args } + "functionCall": { "name": name, "args": args } })) } diff --git a/crates/token_proxy_core/src/proxy/gemini_compat/tools.rs b/crates/token_proxy_core/src/proxy/gemini_compat/tools.rs index 2063504..983e95e 100644 --- a/crates/token_proxy_core/src/proxy/gemini_compat/tools.rs +++ b/crates/token_proxy_core/src/proxy/gemini_compat/tools.rs @@ -98,6 +98,7 @@ pub(super) fn map_gemini_tools_to_chat(value: &Value) -> Value { .unwrap_or(""); let parameters = declaration .get("parameters") + .or_else(|| declaration.get("parametersJsonSchema")) .cloned() .unwrap_or_else(|| json!({})); tools.push(json!({ diff --git a/crates/token_proxy_core/src/proxy/openai_compat.rs b/crates/token_proxy_core/src/proxy/openai_compat.rs index 87f6214..6b9a1b2 100644 --- a/crates/token_proxy_core/src/proxy/openai_compat.rs +++ b/crates/token_proxy_core/src/proxy/openai_compat.rs @@ -184,6 +184,7 @@ fn chat_request_to_responses(body: &Bytes) -> Result { copy_key(object, &mut output, "parallel_tool_calls"); copy_key(object, &mut output, "modalities"); copy_key(object, &mut output, "audio"); + copy_key(object, &mut output, "previous_response_id"); if let Some(max_output_tokens) = object .get("max_completion_tokens") @@ -213,6 +214,14 @@ fn chat_request_to_responses(body: &Bytes) -> Result { text_obj.insert("format".to_string(), response_format.clone()); output.insert("text".to_string(), Value::Object(text_obj)); } + if let Some(reasoning) = + map_chat_reasoning_effort_to_responses_reasoning(object.get("reasoning_effort")) + { + output.insert("reasoning".to_string(), reasoning); + } + if object.get("web_search_options").is_some() { + append_responses_web_search_tool(&mut output, object.get("web_search_options")); + } serde_json::to_vec(&Value::Object(output)) .map(Bytes::from) @@ -420,7 +429,7 @@ fn push_chat_tool_message(input: &mut Vec, message: &Map) .get("tool_call_id") .and_then(Value::as_str) .unwrap_or(""); - let output = message::stringify_any_json(message.get("content")); + let output = message::chat_tool_content_to_responses_output(message.get("content")); input.push(json!({ "type": "function_call_output", "call_id": call_id, @@ -428,6 +437,40 @@ fn push_chat_tool_message(input: &mut Vec, message: &Map) })); } +fn map_chat_reasoning_effort_to_responses_reasoning(value: Option<&Value>) -> Option { + match value { + Some(Value::String(effort)) if !effort.trim().is_empty() => { + Some(json!({ "effort": effort })) + } + Some(Value::Object(object)) if !object.is_empty() => Some(Value::Object(object.clone())), + _ => None, + } +} + +fn append_responses_web_search_tool( + output: &mut Map, + web_search_options: Option<&Value>, +) { + let tools = output + .entry("tools".to_string()) + .or_insert_with(|| Value::Array(Vec::new())); + if !matches!(tools, Value::Array(_)) { + *tools = Value::Array(Vec::new()); + } + let Value::Array(items) = tools else { + return; + }; + + let mut tool = Map::new(); + tool.insert("type".to_string(), Value::String("web_search".to_string())); + if let Some(Value::Object(options)) = web_search_options { + for (key, value) in options { + tool.insert(key.clone(), value.clone()); + } + } + items.push(Value::Object(tool)); +} + fn responses_response_to_chat(bytes: &Bytes, model_hint: Option<&str>) -> Result { let value: Value = serde_json::from_slice(bytes).map_err(|_| "Upstream response must be JSON.".to_string())?; diff --git a/crates/token_proxy_core/src/proxy/openai_compat.test.part2.rs b/crates/token_proxy_core/src/proxy/openai_compat.test.part2.rs index 1e2a9e0..d3ff651 100644 --- a/crates/token_proxy_core/src/proxy/openai_compat.test.part2.rs +++ b/crates/token_proxy_core/src/proxy/openai_compat.test.part2.rs @@ -55,6 +55,62 @@ fn responses_and_gemini_request_conversions() { assert_eq!(gemini_value["max_output_tokens"], json!(64)); assert_eq!(gemini_value["top_p"], json!(0.8)); } + +#[test] +fn chat_request_to_responses_maps_advanced_optional_params() { + let http_clients = ProxyHttpClients::new().expect("http clients"); + let value = transform_request_value( + FormatTransform::ChatToResponses, + json!({ + "model": "gpt-5", + "messages": [{ "role": "user", "content": "hi" }], + "reasoning_effort": "high", + "previous_response_id": "resp_prev_123", + "web_search_options": { "search_context_size": "high" } + }), + &http_clients, + None, + ); + + assert_eq!(value["reasoning"]["effort"], json!("high")); + assert_eq!(value["previous_response_id"], json!("resp_prev_123")); + assert_eq!(value["tools"][0]["type"], json!("web_search")); + assert_eq!(value["tools"][0]["search_context_size"], json!("high")); +} + +#[test] +fn chat_request_to_responses_preserves_structured_tool_output() { + let http_clients = ProxyHttpClients::new().expect("http clients"); + let value = transform_request_value( + FormatTransform::ChatToResponses, + json!({ + "model": "gpt-4.1", + "messages": [ + { "role": "user", "content": "show image result" }, + { + "role": "tool", + "tool_call_id": "call_123", + "content": [ + { "type": "text", "text": "done" }, + { "type": "image_url", "image_url": { "url": "https://example.com/result.png" } } + ] + } + ] + }), + &http_clients, + None, + ); + + assert_eq!(value["input"][1]["type"], json!("function_call_output")); + assert_eq!(value["input"][1]["call_id"], json!("call_123")); + assert_eq!(value["input"][1]["output"][0]["type"], json!("input_text")); + assert_eq!(value["input"][1]["output"][0]["text"], json!("done")); + assert_eq!(value["input"][1]["output"][1]["type"], json!("input_image")); + assert_eq!( + value["input"][1]["output"][1]["image_url"]["url"], + json!("https://example.com/result.png") + ); +} #[test] fn gemini_and_anthropic_request_conversions() { let http_clients = ProxyHttpClients::new().expect("http clients"); diff --git a/crates/token_proxy_core/src/proxy/openai_compat/input.rs b/crates/token_proxy_core/src/proxy/openai_compat/input.rs index 7366b06..d781d0d 100644 --- a/crates/token_proxy_core/src/proxy/openai_compat/input.rs +++ b/crates/token_proxy_core/src/proxy/openai_compat/input.rs @@ -61,11 +61,16 @@ fn responses_function_call_output_item_to_chat_message( .get("call_id") .and_then(Value::as_str) .ok_or_else(|| "function_call_output must include call_id.".to_string())?; - let output = item.get("output").and_then(Value::as_str).unwrap_or(""); + let content = match item.get("output") { + Some(Value::String(text)) => Value::String(text.to_string()), + Some(value) => responses_message_content_to_chat_content(value) + .unwrap_or_else(|| Value::String(String::new())), + None => Value::String(String::new()), + }; Ok(json!({ "role": "tool", "tool_call_id": call_id, - "content": output + "content": content })) } diff --git a/crates/token_proxy_core/src/proxy/openai_compat/message.rs b/crates/token_proxy_core/src/proxy/openai_compat/message.rs index dd7904a..a2f3281 100644 --- a/crates/token_proxy_core/src/proxy/openai_compat/message.rs +++ b/crates/token_proxy_core/src/proxy/openai_compat/message.rs @@ -165,6 +165,24 @@ pub(super) fn stringify_any_json(value: Option<&Value>) -> String { } } +pub(super) fn chat_tool_content_to_responses_output(content: Option<&Value>) -> Value { + match content { + None => Value::Array(Vec::new()), + Some(Value::String(text)) => Value::Array(vec![json!({ + "type": "input_text", + "text": text + })]), + Some(Value::Array(_)) => Value::Array( + chat_content_to_responses_message_parts(content, "input_text") + .unwrap_or_else(|_| Vec::new()), + ), + Some(other) => Value::Array(vec![json!({ + "type": "input_text", + "text": stringify_any_json(Some(other)) + })]), + } +} + pub(super) fn user_placeholder_item() -> Value { json!({ "type": "message", diff --git a/crates/token_proxy_core/src/proxy/response.test.part2.rs b/crates/token_proxy_core/src/proxy/response.test.part2.rs index 6762e49..f8b812b 100644 --- a/crates/token_proxy_core/src/proxy/response.test.part2.rs +++ b/crates/token_proxy_core/src/proxy/response.test.part2.rs @@ -177,3 +177,145 @@ fn stream_responses_to_chat_persists_log_when_client_drops_stream_early() { ); }); } + +#[test] +fn stream_responses_to_anthropic_emits_thinking_from_reasoning_summary_events() { + super::run_async(async { + let context = LogContext { + path: "/v1/messages".to_string(), + provider: "openai-response".to_string(), + upstream_id: "unit-test".to_string(), + model: Some("unit-model".to_string()), + mapped_model: Some("unit-model".to_string()), + stream: true, + status: 200, + upstream_request_id: None, + request_headers: None, + request_body: None, + ttfb_ms: None, + start: Instant::now(), + }; + + let upstream = futures_util::stream::iter(vec![ + Ok::(Bytes::from( + "data: {\"type\":\"response.output_item.added\",\"item\":{\"id\":\"rs_1\",\"type\":\"reasoning\"}}\n\n", + )), + Ok(Bytes::from( + "data: {\"type\":\"response.reasoning_summary_text.delta\",\"item_id\":\"rs_1\",\"delta\":\"think step by step\"}\n\n", + )), + Ok(Bytes::from( + "data: {\"type\":\"response.completed\",\"response\":{\"output\":[{\"id\":\"rs_1\",\"type\":\"reasoning\",\"summary\":[{\"type\":\"summary_text\",\"text\":\"think step by step\"}]}],\"usage\":{\"input_tokens\":1,\"output_tokens\":2}}}\n\n", + )), + Ok(Bytes::from("data: [DONE]\n\n")), + ]); + + let token_tracker = crate::proxy::token_rate::TokenRateTracker::new() + .register(None, None) + .await; + let anthropic_stream = super::super::responses_to_anthropic::stream_responses_to_anthropic( + upstream, + context, + Arc::new(LogWriter::new(None)), + token_tracker, + ); + + let chunks: Vec = anthropic_stream + .map(|item| item.expect("stream item")) + .collect() + .await; + + let mut saw_thinking_start = false; + let mut saw_thinking_delta = false; + for chunk in &chunks { + let Some((event_type, data)) = super::parse_anthropic_sse(chunk) else { + continue; + }; + if event_type == "content_block_start" + && data["content_block"]["type"] == json!("thinking") + { + saw_thinking_start = true; + } + if event_type == "content_block_delta" + && data["delta"]["type"] == json!("thinking_delta") + && data["delta"]["thinking"] == json!("think step by step") + { + saw_thinking_delta = true; + } + } + + assert!(saw_thinking_start, "missing thinking content_block_start"); + assert!( + saw_thinking_delta, + "missing thinking_delta from reasoning summary" + ); + }); +} + +#[test] +fn stream_chat_to_gemini_waits_for_complete_tool_call_arguments() { + super::run_async(async { + let context = LogContext { + path: "/v1/messages".to_string(), + provider: "openai".to_string(), + upstream_id: "unit-test".to_string(), + model: Some("unit-model".to_string()), + mapped_model: Some("unit-model".to_string()), + stream: true, + status: 200, + upstream_request_id: None, + request_headers: None, + request_body: None, + ttfb_ms: None, + start: Instant::now(), + }; + + let upstream = futures_util::stream::iter(vec![ + Ok::(Bytes::from( + "data: {\"choices\":[{\"index\":0,\"delta\":{\"tool_calls\":[{\"index\":0,\"function\":{\"name\":\"get_weather\",\"arguments\":\"{\\\"city\\\":\"}}]}}]}\n\n", + )), + Ok(Bytes::from( + "data: {\"choices\":[{\"index\":0,\"delta\":{\"tool_calls\":[{\"index\":0,\"function\":{\"arguments\":\"\\\"Paris\\\"}\"}}]},\"finish_reason\":\"tool_calls\"}]}\n\n", + )), + Ok(Bytes::from("data: [DONE]\n\n")), + ]); + + let token_tracker = crate::proxy::token_rate::TokenRateTracker::new() + .register(None, None) + .await; + let gemini_stream = crate::proxy::gemini_compat::stream_chat_to_gemini( + upstream, + context, + Arc::new(LogWriter::new(None)), + token_tracker, + ); + + let chunks: Vec = gemini_stream + .map(|item| item.expect("stream item")) + .collect() + .await; + + let payloads = chunks + .iter() + .filter_map(super::parse_sse_json) + .collect::>(); + + let function_calls = payloads + .iter() + .flat_map(|payload| { + payload["candidates"] + .as_array() + .into_iter() + .flatten() + .filter_map(|candidate| candidate["content"]["parts"].as_array()) + .flatten() + .filter_map(|part| part.get("functionCall")) + .cloned() + .collect::>() + }) + .collect::>(); + + assert_eq!(function_calls.len(), 1); + assert_eq!(function_calls[0]["name"], json!("get_weather")); + assert_eq!(function_calls[0]["args"]["city"], json!("Paris")); + }); +} diff --git a/crates/token_proxy_core/src/proxy/response/responses_to_anthropic.rs b/crates/token_proxy_core/src/proxy/response/responses_to_anthropic.rs index ceca8bf..e2b49c2 100644 --- a/crates/token_proxy_core/src/proxy/response/responses_to_anthropic.rs +++ b/crates/token_proxy_core/src/proxy/response/responses_to_anthropic.rs @@ -41,6 +41,13 @@ struct ToolUseState { sent_input: bool, } +struct ReasoningBlockState { + index: usize, + sent_start: bool, + sent_stop: bool, + sent_delta: bool, +} + struct ResponsesToAnthropicState { upstream: S, parser: SseEventParser, @@ -58,6 +65,7 @@ struct ResponsesToAnthropicState { active_block: Option, next_block_index: usize, tool_uses: HashMap, + reasoning_blocks: HashMap, saw_tool_use: bool, stop_reason_override: Option<&'static str>, saw_reasoning_delta: bool, @@ -114,6 +122,7 @@ where active_block: None, next_block_index: 0, tool_uses: HashMap::new(), + reasoning_blocks: HashMap::new(), saw_tool_use: false, stop_reason_override: None, saw_reasoning_delta: false, @@ -188,7 +197,9 @@ where self.handle_output_text_delta(&value, token_texts); return; } - if event_type.ends_with("reasoning_text.delta") { + if event_type.ends_with("reasoning_text.delta") + || event_type.ends_with("reasoning_summary_text.delta") + { self.handle_reasoning_text_delta(&value, token_texts); return; } @@ -242,7 +253,16 @@ where self.saw_reasoning_delta = true; token_texts.push(delta.to_string()); self.ensure_message_start(); - let index = self.ensure_thinking_block(); + let index = match value.get("item_id").and_then(Value::as_str) { + Some(item_id) if !item_id.is_empty() => { + let index = self.ensure_reasoning_block(item_id); + if let Some(state) = self.reasoning_blocks.get_mut(item_id) { + state.sent_delta = true; + } + index + } + _ => self.ensure_thinking_block(), + }; self.out.push_back(super::anthropic_event_sse( "content_block_delta", json!({ @@ -257,23 +277,32 @@ where let Some(item) = value.get("item").and_then(Value::as_object) else { return; }; - if item.get("type").and_then(Value::as_str) != Some("function_call") { - return; + match item.get("type").and_then(Value::as_str) { + Some("function_call") => { + let item_id = item.get("id").and_then(Value::as_str).unwrap_or(""); + let call_id = item.get("call_id").and_then(Value::as_str).unwrap_or(""); + let name = item.get("name").and_then(Value::as_str).unwrap_or(""); + + let tool_use_id = if !call_id.is_empty() { + call_id.to_string() + } else if !item_id.is_empty() { + item_id.to_string() + } else { + "tool_use_proxy".to_string() + }; + + self.ensure_message_start(); + self.ensure_tool_use_block(item_id, &tool_use_id, name); + } + Some("reasoning") => { + let Some(item_id) = item.get("id").and_then(Value::as_str) else { + return; + }; + self.ensure_message_start(); + self.ensure_reasoning_block(item_id); + } + _ => {} } - let item_id = item.get("id").and_then(Value::as_str).unwrap_or(""); - let call_id = item.get("call_id").and_then(Value::as_str).unwrap_or(""); - let name = item.get("name").and_then(Value::as_str).unwrap_or(""); - - let tool_use_id = if !call_id.is_empty() { - call_id.to_string() - } else if !item_id.is_empty() { - item_id.to_string() - } else { - "tool_use_proxy".to_string() - }; - - self.ensure_message_start(); - self.ensure_tool_use_block(item_id, &tool_use_id, name); } fn handle_function_call_arguments_delta(&mut self, value: &Value) { @@ -326,15 +355,24 @@ where let Some(item) = value.get("item").and_then(Value::as_object) else { return; }; - if item.get("type").and_then(Value::as_str) != Some("function_call") { - return; + match item.get("type").and_then(Value::as_str) { + Some("function_call") => { + let Some(item_id) = item.get("id").and_then(Value::as_str) else { + return; + }; + self.ensure_message_start(); + self.ensure_tool_use_state(item_id); + self.stop_tool_use_block(item_id); + } + Some("reasoning") => { + let Some(item_id) = item.get("id").and_then(Value::as_str) else { + return; + }; + self.ensure_message_start(); + self.stop_reasoning_block(item_id); + } + _ => {} } - let Some(item_id) = item.get("id").and_then(Value::as_str) else { - return; - }; - self.ensure_message_start(); - self.ensure_tool_use_state(item_id); - self.stop_tool_use_block(item_id); } fn handle_response_completed(&mut self, value: &Value) { @@ -386,6 +424,24 @@ where self.stop_tool_use_block(item_id); } } + Some("reasoning") => { + let summary = extract_reasoning_summary(item); + if summary.trim().is_empty() { + continue; + } + if let Some(item_id) = item.get("id").and_then(Value::as_str) { + let already_emitted = self + .reasoning_blocks + .get(item_id) + .is_some_and(|state| state.sent_delta); + if !already_emitted { + self.emit_reasoning_summary_for_item(item_id, &summary); + } + self.stop_reasoning_block(item_id); + } else if reasoning_snapshot.is_empty() { + reasoning_snapshot = summary; + } + } Some("message") => { if item.get("role").and_then(Value::as_str) != Some("assistant") { continue; @@ -466,6 +522,106 @@ where index } + fn ensure_reasoning_state(&mut self, item_id: &str) -> &mut ReasoningBlockState { + self.reasoning_blocks + .entry(item_id.to_string()) + .or_insert_with(|| { + let index = self.next_block_index; + self.next_block_index += 1; + ReasoningBlockState { + index, + sent_start: false, + sent_stop: false, + sent_delta: false, + } + }) + } + + fn ensure_reasoning_block(&mut self, item_id: &str) -> usize { + let index = self.ensure_reasoning_state(item_id).index; + let sent_start = self + .reasoning_blocks + .get(item_id) + .is_some_and(|state| state.sent_start); + if !sent_start { + self.start_reasoning_block(item_id); + return index; + } + if !matches!( + self.active_block, + Some(ActiveBlock::Thinking { index: active }) if active == index + ) { + self.stop_active_block(); + self.active_block = Some(ActiveBlock::Thinking { index }); + } + index + } + + fn start_reasoning_block(&mut self, item_id: &str) { + let index = self.ensure_reasoning_state(item_id).index; + let sent_start = self + .reasoning_blocks + .get(item_id) + .is_some_and(|state| state.sent_start); + if sent_start { + return; + } + + self.stop_active_block(); + if let Some(state) = self.reasoning_blocks.get_mut(item_id) { + state.sent_start = true; + } + self.active_block = Some(ActiveBlock::Thinking { index }); + self.out.push_back(super::anthropic_event_sse( + "content_block_start", + json!({ + "type": "content_block_start", + "index": index, + "content_block": { "type": "thinking", "thinking": "" } + }), + )); + } + + fn emit_reasoning_summary_for_item(&mut self, item_id: &str, text: &str) { + if text.trim().is_empty() { + return; + } + self.saw_reasoning_delta = true; + self.ensure_message_start(); + let index = self.ensure_reasoning_block(item_id); + self.out.push_back(super::anthropic_event_sse( + "content_block_delta", + json!({ + "type": "content_block_delta", + "index": index, + "delta": { "type": "thinking_delta", "thinking": text } + }), + )); + if let Some(state) = self.reasoning_blocks.get_mut(item_id) { + state.sent_delta = true; + } + } + + fn stop_reasoning_block(&mut self, item_id: &str) { + let Some(state) = self.reasoning_blocks.get_mut(item_id) else { + return; + }; + if state.sent_stop || !state.sent_start { + return; + } + state.sent_stop = true; + if matches!( + &self.active_block, + Some(ActiveBlock::Thinking { index }) if *index == state.index + ) { + self.active_block = None; + } + self.out.push_back(super::anthropic_event_sse( + "content_block_stop", + json!({ "type": "content_block_stop", "index": state.index }), + )); + } + fn emit_reasoning_snapshot(&mut self, text: &str) { if self.saw_reasoning_delta || text.trim().is_empty() { return; @@ -725,3 +881,22 @@ fn extract_reasoning_text(parts: &[Value]) -> String { } reasoning } + +fn extract_reasoning_summary(item: &Map) -> String { + let Some(summary) = item.get("summary").and_then(Value::as_array) else { + return String::new(); + }; + let mut combined = String::new(); + for part in summary { + let Some(part) = part.as_object() else { + continue; + }; + if part.get("type").and_then(Value::as_str) != Some("summary_text") { + continue; + } + if let Some(text) = part.get("text").and_then(Value::as_str) { + combined.push_str(text); + } + } + combined +}