Skip to content

Commit b53a1e9

Browse files
committed
feat: 新增 Agent 评估运行流式入口并结合 Langfuse 的观测性
1 parent 3429493 commit b53a1e9

21 files changed

Lines changed: 1407 additions & 7 deletions
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
"""Agent evaluation run service.
2+
3+
This service intentionally does not implement dataset storage or judging. It
4+
creates a normal conversation-backed AgentRun, blocks until it finishes, and
5+
returns the run's final result by reusing the shared agent_run base capability.
6+
"""
7+
8+
from __future__ import annotations
9+
10+
import uuid
11+
from typing import Any
12+
13+
from fastapi import HTTPException
14+
from sqlalchemy.ext.asyncio import AsyncSession
15+
16+
from yuxi.repositories.agent_repository import AgentRepository
17+
from yuxi.repositories.conversation_repository import ConversationRepository
18+
from yuxi.services.agent_run_service import await_agent_run_result, create_agent_run_view
19+
from yuxi.storage.postgres.models_business import User
20+
21+
EVALUATION_SOURCE = "agent_evaluation"
22+
EVALUATION_FIELDS = ("dataset_name", "dataset_item_id", "experiment_name")
23+
MAX_REQUEST_ID_LENGTH = 64
24+
25+
26+
def _normalize_evaluation(evaluation: dict[str, Any] | None) -> dict[str, str]:
27+
"""仅保留已知评估字段,并统一转成去空白的非空字符串。"""
28+
if not isinstance(evaluation, dict):
29+
return {}
30+
31+
normalized: dict[str, str] = {}
32+
for key in EVALUATION_FIELDS:
33+
value = evaluation.get(key)
34+
if value is None:
35+
continue
36+
text = str(value).strip()
37+
if text:
38+
normalized[key] = text
39+
return normalized
40+
41+
42+
def _normalize_request_id(meta: dict[str, Any] | None) -> str:
43+
"""返回去空白并校验长度的 request_id;缺省时生成新的 UUID。"""
44+
raw_request_id = (meta or {}).get("request_id")
45+
if raw_request_id is None or not str(raw_request_id).strip():
46+
return str(uuid.uuid4())
47+
48+
request_id = str(raw_request_id).strip()
49+
if len(request_id) > MAX_REQUEST_ID_LENGTH:
50+
raise HTTPException(status_code=422, detail=f"request_id 不能超过 {MAX_REQUEST_ID_LENGTH} 个字符")
51+
return request_id
52+
53+
54+
async def run_agent_eval(
55+
*,
56+
query: str,
57+
agent_slug: str,
58+
evaluation: dict[str, Any] | None,
59+
meta: dict[str, Any] | None,
60+
image_content: str | None,
61+
model_spec: str | None,
62+
current_user: User,
63+
db: AsyncSession,
64+
) -> dict[str, Any]:
65+
"""创建评估 AgentRun,阻塞至运行结束并返回最终结果。
66+
67+
评估调用方只关心最终输出,因此不做 SSE 流式封装:建 run 后直接复用
68+
``await_agent_run_result`` 等待运行终结并返回结果。注意这会让 HTTP 请求阻塞至
69+
运行结束(无中间字节),网关链路上长运行需相应放宽空闲超时。
70+
"""
71+
agent_slug = agent_slug.strip()
72+
if not agent_slug:
73+
raise HTTPException(status_code=422, detail="agent_slug 不能为空")
74+
if not query:
75+
raise HTTPException(status_code=422, detail="query 不能为空")
76+
77+
agent_item = await AgentRepository(db).get_visible_by_slug(slug=agent_slug, user=current_user)
78+
if not agent_item:
79+
raise HTTPException(status_code=404, detail="智能体不存在")
80+
81+
evaluation_metadata = _normalize_evaluation(evaluation)
82+
request_id = _normalize_request_id(meta)
83+
thread_id = str(uuid.uuid4())
84+
85+
await ConversationRepository(db).create_conversation(
86+
uid=str(current_user.uid),
87+
agent_id=agent_item.slug,
88+
title="Agent Evaluation Run",
89+
thread_id=thread_id,
90+
metadata={
91+
"source": EVALUATION_SOURCE,
92+
"evaluation": evaluation_metadata,
93+
},
94+
)
95+
96+
run_meta = {
97+
"request_id": request_id,
98+
"source": EVALUATION_SOURCE,
99+
"evaluation": evaluation_metadata,
100+
"attachment_file_ids": (meta or {}).get("attachment_file_ids") or [],
101+
}
102+
run_response = await create_agent_run_view(
103+
query=query,
104+
agent_id=agent_item.slug,
105+
thread_id=thread_id,
106+
meta=run_meta,
107+
image_content=image_content,
108+
current_uid=str(current_user.uid),
109+
db=db,
110+
model_spec=model_spec,
111+
)
112+
return await await_agent_run_result(run_id=run_response["run_id"], current_uid=str(current_user.uid))

backend/package/yuxi/services/agent_run_service.py

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import os
88
import uuid
99
from collections.abc import AsyncIterator
10+
from typing import Any
1011

1112
from fastapi import HTTPException
1213
from sqlalchemy import select
@@ -318,6 +319,8 @@ async def create_agent_run_view(
318319
"uid": str(current_uid),
319320
"request_id": request_id,
320321
"attachment_file_ids": (meta or {}).get("attachment_file_ids") or [],
322+
"source": (meta or {}).get("source"),
323+
"evaluation": (meta or {}).get("evaluation") or None,
321324
"created_at": utc_now_naive().isoformat(),
322325
}
323326
try:
@@ -344,6 +347,10 @@ async def create_agent_run_view(
344347
"attachments": [],
345348
"model_spec": resolved_model_spec,
346349
}
350+
if (meta or {}).get("source"):
351+
input_metadata["source"] = (meta or {}).get("source")
352+
if (meta or {}).get("evaluation"):
353+
input_metadata["evaluation"] = (meta or {}).get("evaluation")
347354
if run_type == "resume":
348355
input_metadata["source"] = "ask_user_question_resume"
349356

@@ -383,6 +390,77 @@ async def get_agent_run_view(*, run_id: str, current_uid: str, db: AsyncSession)
383390
return {"run": run.to_dict()}
384391

385392

393+
def _select_output_message(messages: list[Message], *, output_message_id: int | None) -> Message | None:
394+
"""优先选用运行记录的输出消息,否则回退到最后一条 assistant 消息。"""
395+
if output_message_id:
396+
for message in messages:
397+
if message.id == output_message_id and message.role == "assistant":
398+
return message
399+
400+
for message in reversed(messages):
401+
if message.role == "assistant":
402+
return message
403+
return None
404+
405+
406+
async def get_agent_run_result(*, run_id: str, current_uid: str, db: AsyncSession) -> dict:
407+
"""加载某个 run 的最终结果(状态/输出/Langfuse trace/错误),供 chat/eval/cron 等统一复用。"""
408+
run = await AgentRunRepository(db).get_run_for_user(run_id, str(current_uid))
409+
if not run:
410+
return {
411+
"status": "failed",
412+
"agent_run_id": run_id,
413+
"output": "",
414+
"error": {"type": "run_not_found", "message": "运行任务不存在"},
415+
}
416+
417+
messages: list[Message] = []
418+
if run.conversation_id:
419+
result = await db.execute(
420+
select(Message)
421+
.where(Message.conversation_id == run.conversation_id)
422+
.order_by(Message.created_at.asc(), Message.id.asc())
423+
)
424+
messages = list(result.scalars().unique().all())
425+
426+
output_message = _select_output_message(messages, output_message_id=run.output_message_id)
427+
output_metadata = (
428+
output_message.extra_metadata if output_message and isinstance(output_message.extra_metadata, dict) else {}
429+
)
430+
431+
payload: dict[str, Any] = {
432+
"status": run.status,
433+
"output": output_message.content if output_message else "",
434+
"agent_slug": run.agent_id,
435+
"thread_id": run.thread_id,
436+
"conversation_id": run.conversation_id,
437+
"agent_run_id": run.id,
438+
"request_id": run.request_id,
439+
"final_message_id": output_message.id if output_message else None,
440+
"langfuse_trace_id": output_metadata.get("langfuse_trace_id"),
441+
}
442+
if run.error_type or run.error_message:
443+
payload["error"] = {"type": run.error_type, "message": run.error_message}
444+
return payload
445+
446+
447+
async def load_agent_run_result(*, run_id: str, current_uid: str) -> dict:
448+
"""自开独立会话读取 run 结果,用于流结束/后台调用等请求会话已不可用的场景。"""
449+
async with pg_manager.get_async_session_context() as db:
450+
return await get_agent_run_result(run_id=run_id, current_uid=current_uid, db=db)
451+
452+
453+
async def await_agent_run_result(*, run_id: str, current_uid: str) -> dict:
454+
"""阻塞至 run 终结并返回最终结果,供 cron 等 in-process 调用。
455+
456+
复用有限事件流 ``stream_agent_run_events``:它在 run 终结或超时后自然结束,
457+
因此排空即等待,无需额外轮询。等待上限继承事件流内部的 ``SSE_MAX_CONNECTION_MINUTES``。
458+
"""
459+
async for _ in stream_agent_run_events(run_id=run_id, after_seq="0-0", current_uid=current_uid, verbose=False):
460+
pass
461+
return await load_agent_run_result(run_id=run_id, current_uid=current_uid)
462+
463+
386464
async def cancel_agent_run_view(*, run_id: str, current_uid: str, db: AsyncSession) -> dict:
387465
repo = AgentRunRepository(db)
388466
run = await repo.get_run_for_user(run_id, str(current_uid))

backend/package/yuxi/services/chat_service.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,30 @@ def _build_langfuse_run_context(
8585
operation: str,
8686
backend_id: str | None = None,
8787
message_type: str | None = None,
88+
meta: dict | None = None,
8889
) -> LangfuseRunContext:
90+
extra_metadata = None
91+
extra_tags = None
92+
evaluation = (meta or {}).get("evaluation") if isinstance(meta, dict) else None
93+
# 如果请求来自智能体评测,添加评测相关的 metadata 和 tags,方便在 Langfuse 中进行过滤和分析
94+
if (meta or {}).get("source") == "agent_evaluation" or (isinstance(evaluation, dict) and evaluation):
95+
extra_metadata = {
96+
"source": "agent_evaluation",
97+
"feature": "agent_evaluation",
98+
}
99+
extra_tags = ["agent_evaluation"]
100+
if isinstance(evaluation, dict):
101+
dataset_name = evaluation.get("dataset_name")
102+
experiment_name = evaluation.get("experiment_name")
103+
for key in ("dataset_name", "dataset_item_id", "experiment_name"):
104+
value = evaluation.get(key)
105+
if value:
106+
extra_metadata[f"evaluation_{key}"] = str(value)
107+
if dataset_name:
108+
extra_tags.append(f"dataset:{dataset_name}")
109+
if experiment_name:
110+
extra_tags.append(f"experiment:{experiment_name}")
111+
89112
return build_run_context(
90113
user_id=str(getattr(current_user, "uid", current_user.id)),
91114
thread_id=thread_id,
@@ -97,6 +120,8 @@ def _build_langfuse_run_context(
97120
username=getattr(current_user, "username", None),
98121
login_user_id=getattr(current_user, "uid", None),
99122
department_id=getattr(current_user, "department_id", None),
123+
extra_metadata=extra_metadata,
124+
extra_tags=extra_tags,
100125
)
101126

102127

@@ -799,6 +824,7 @@ async def agent_chat(
799824
request_id=meta["request_id"],
800825
operation="agent_chat_sync",
801826
message_type=message_type,
827+
meta=meta,
802828
)
803829
trace_info: dict[str, Any] = {}
804830

@@ -1010,6 +1036,7 @@ def make_chunk(content=None, **kwargs):
10101036
request_id=meta["request_id"],
10111037
operation="agent_chat_stream",
10121038
message_type=message_type,
1039+
meta=meta,
10131040
)
10141041
full_msg = None
10151042
accumulated_content: list[str] = []
@@ -1309,6 +1336,7 @@ def make_resume_chunk(content=None, **kwargs):
13091336
request_id=meta.get("request_id") or str(uuid.uuid4()),
13101337
operation="agent_chat_resume",
13111338
message_type="resume",
1339+
meta=meta,
13121340
)
13131341
trace_info: dict[str, Any] = {}
13141342
last_agent_state_signature = ""

backend/package/yuxi/services/langfuse_service.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ def build_trace_metadata(
7070
username: str | None = None,
7171
login_user_id: str | None = None,
7272
department_id: int | str | None = None,
73+
extra_metadata: dict[str, Any] | None = None,
7374
) -> dict[str, Any]:
7475
metadata: dict[str, Any] = {
7576
"langfuse_user_id": user_id,
@@ -92,14 +93,25 @@ def build_trace_metadata(
9293
metadata["login_user_id"] = login_user_id
9394
if department_id is not None:
9495
metadata["department_id"] = str(department_id)
96+
if extra_metadata:
97+
metadata.update(extra_metadata)
9598

9699
return metadata
97100

98101

99-
def build_trace_tags(*, agent_id: str, operation: str, message_type: str | None = None) -> list[str]:
102+
def build_trace_tags(
103+
*,
104+
agent_id: str,
105+
operation: str,
106+
message_type: str | None = None,
107+
extra_tags: list[str] | None = None,
108+
) -> list[str]:
100109
tags = ["yuxi", "chat", operation, f"agent:{agent_id}"]
101110
if message_type:
102111
tags.append(f"message_type:{message_type}")
112+
for tag in extra_tags or []:
113+
if tag and tag not in tags:
114+
tags.append(tag)
103115
return tags
104116

105117

@@ -115,6 +127,8 @@ def build_run_context(
115127
username: str | None = None,
116128
login_user_id: str | None = None,
117129
department_id: int | str | None = None,
130+
extra_metadata: dict[str, Any] | None = None,
131+
extra_tags: list[str] | None = None,
118132
) -> LangfuseRunContext:
119133
metadata = build_trace_metadata(
120134
user_id=user_id,
@@ -127,8 +141,14 @@ def build_run_context(
127141
username=username,
128142
login_user_id=login_user_id,
129143
department_id=department_id,
144+
extra_metadata=extra_metadata,
145+
)
146+
tags = build_trace_tags(
147+
agent_id=agent_id,
148+
operation=operation,
149+
message_type=message_type,
150+
extra_tags=extra_tags,
130151
)
131-
tags = build_trace_tags(agent_id=agent_id, operation=operation, message_type=message_type)
132152

133153
client = get_langfuse_client()
134154
if client is None or CallbackHandler is None:

backend/package/yuxi/services/run_worker.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,10 @@ async def process_agent_run(ctx, run_id: str):
314314
"attachment_file_ids": payload.get("attachment_file_ids") or [],
315315
"model_spec": payload.get("model_spec"),
316316
}
317+
if payload.get("source"):
318+
meta["source"] = payload.get("source")
319+
if isinstance(payload.get("evaluation"), dict):
320+
meta["evaluation"] = payload.get("evaluation") or {}
317321

318322
await mark_run_running(run_id)
319323
run_ctx = RunContext(run_id=run_id)
@@ -332,6 +336,8 @@ async def process_agent_run(ctx, run_id: str):
332336
"agent_id": agent_id,
333337
"backend_id": payload.get("backend_id"),
334338
"uid": uid,
339+
"source": payload.get("source"),
340+
"evaluation": payload.get("evaluation") or {},
335341
},
336342
thread_id=thread_id,
337343
)

0 commit comments

Comments
 (0)