Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/docker-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ name: Build and Push Docker Image

on:
push:
branches: [main, master, develop, docker]
branches: ["*"]
tags: ["v*"]
pull_request:
branches: [main, master, develop, docker]
branches: ["*"]

env:
REGISTRY: ghcr.io
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ services:
- REFRESH_CHECK_INTERVAL=${REFRESH_CHECK_INTERVAL:-3600} # 刷新检查间隔(秒)
volumes:
# 可选:挂载日志目录
- ./logs:/app/logs
- logs:/app/logs
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
Expand Down
103 changes: 99 additions & 4 deletions proxy_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,10 +264,105 @@ async def handle_chat_completion(self, request: ChatCompletionRequest):
async def stream_response(
self, response: httpx.Response, model: str
) -> AsyncGenerator[str, None]:
"""Generate streaming response"""
async for parsed in self.process_streaming_response(response):
yield f"data: {json.dumps(parsed)}\n\n"
yield "data: [DONE]\n\n"
"""Generate streaming response in OpenAI format"""
import uuid
import time

# Generate a unique completion ID
completion_id = f"chatcmpl-{uuid.uuid4().hex[:29]}"

# Track content for transformation if needed
accumulated_content = ""
current_phase = None

try:
async for parsed in self.process_streaming_response(response):
try:
data = parsed.get("data", {})
delta_content = data.get("delta_content", "")
phase = data.get("phase", "")

# Track phase changes
if phase != current_phase:
current_phase = phase
logger.debug(f"Phase changed to: {phase}")

# For SHOW_THINK_TAGS=false, only send content during answer phase
if (
not settings.SHOW_THINK_TAGS
and phase != "answer"
and delta_content
):
logger.debug(
f"Skipping content in {phase} phase (SHOW_THINK_TAGS=false)"
)
accumulated_content += delta_content # Still accumulate for potential transformation
continue

# Accumulate all content for potential transformation
accumulated_content += delta_content

# Apply content transformation to the delta
if delta_content:
# For streaming, we need to be careful about transformation
# Only transform if we have complete thinking blocks
if settings.SHOW_THINK_TAGS:
# Convert <details> to <think> tags on the fly
transformed_delta = delta_content
transformed_delta = re.sub(
r"<details[^>]*>", "<think>", transformed_delta
)
transformed_delta = transformed_delta.replace(
"</details>", "</think>"
)
transformed_delta = re.sub(
r"<summary>.*?</summary>",
"",
transformed_delta,
flags=re.DOTALL,
)
else:
# For non-think mode in streaming, just pass through answer content
transformed_delta = delta_content

# Create OpenAI-compatible streaming chunk
openai_chunk = {
"id": completion_id,
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": model,
"choices": [
{
"index": 0,
"delta": {"content": transformed_delta},
"finish_reason": None,
}
],
}

yield f"data: {json.dumps(openai_chunk)}\n\n"

except Exception as e:
logger.error(f"Error processing streaming chunk: {e}")
continue

# Send final completion chunk
final_chunk = {
"id": completion_id,
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": model,
"choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}],
}

yield f"data: {json.dumps(final_chunk)}\n\n"
yield "data: [DONE]\n\n"

except Exception as e:
logger.error(f"Streaming error: {e}")
# Send error in OpenAI format
error_chunk = {"error": {"message": str(e), "type": "server_error"}}
yield f"data: {json.dumps(error_chunk)}\n\n"

async def non_stream_response(
self, response: httpx.Response, model: str
Expand Down