Skip to content

Commit 77f098f

Browse files
committed
scope request dedupe to explicit request ids
1 parent 4cd3fe3 commit 77f098f

File tree

2 files changed

+46
-34
lines changed

2 files changed

+46
-34
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,7 @@ curl -X POST "http://127.0.0.1:6001/v1/images/generations" \
251251
- 可用于:`/v1/chat/completions``/v1/images/generations`
252252
- 查询接口:`GET /v1/requests/{request_id}`
253253
- 服务会在响应头回写 `X-Request-Id`,同时在 JSON 响应体里也包含 `request_id`
254+
- 同一个 `request_id` / `X-Request-Id` 会被视为同一任务的幂等键;如果换了真实请求内容,请务必使用新的 `request_id`
254255

255256
提交示例:
256257

app.py

Lines changed: 45 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ def _normalize_request_id(value: Any) -> str:
251251
return text
252252

253253

254-
def _build_request_dedupe_key(request: Request, path: str, raw_body: bytes) -> str:
254+
def _build_request_payload_hash(raw_body: bytes) -> str:
255255
if not raw_body:
256256
return ""
257257
try:
@@ -262,36 +262,23 @@ def _build_request_dedupe_key(request: Request, path: str, raw_body: bytes) -> s
262262
return ""
263263
if bool(data.get("stream")):
264264
return ""
265-
266265
normalized_data = dict(data)
267-
explicit_request_id = _normalize_request_id(
268-
normalized_data.pop("request_id", None)
269-
or normalized_data.pop("requestId", None)
270-
)
266+
normalized_data.pop("request_id", None)
267+
normalized_data.pop("requestId", None)
271268
normalized_body = json.dumps(
272269
normalized_data,
273270
ensure_ascii=False,
274271
sort_keys=True,
275272
separators=(",", ":"),
276273
)
277-
client_host = str(
278-
getattr(getattr(request, "client", None), "host", "") or ""
279-
).strip()
280-
user_agent = str(request.headers.get("user-agent") or "").strip()
281-
auth_value = str(
282-
request.headers.get("authorization")
283-
or request.headers.get("x-api-key")
284-
or ""
285-
).strip()
286-
scope_seed = "|".join((client_host, user_agent, auth_value))
287-
scope_hash = hashlib.sha256(scope_seed.encode("utf-8")).hexdigest()[:16]
288-
dedupe_seed = (
289-
f"rid:{explicit_request_id}|{normalized_body}"
290-
if explicit_request_id
291-
else normalized_body
292-
)
293-
dedupe_hash = hashlib.sha256(dedupe_seed.encode("utf-8")).hexdigest()
294-
return f"{path}|{scope_hash}|{dedupe_hash}"
274+
return hashlib.sha256(normalized_body.encode("utf-8")).hexdigest()
275+
276+
277+
def _build_request_dedupe_key(path: str, explicit_request_id: str) -> str:
278+
request_id = _normalize_request_id(explicit_request_id)
279+
if not request_id:
280+
return ""
281+
return f"{path}|rid|{request_id}"
295282

296283

297284
def _prune_request_dedupe_entries(now_ts: Optional[float] = None) -> None:
@@ -311,26 +298,30 @@ def _prune_request_dedupe_entries(now_ts: Optional[float] = None) -> None:
311298

312299

313300
def _begin_request_dedupe(
314-
dedupe_key: str, request_id: str
315-
) -> tuple[bool, dict[str, Any] | None]:
301+
dedupe_key: str, request_id: str, payload_hash: str
302+
) -> tuple[str, dict[str, Any] | None]:
316303
if not dedupe_key:
317-
return True, None
304+
return "owner", None
318305
now_ts = time.time()
319306
with _request_dedupe_lock:
320307
_prune_request_dedupe_entries(now_ts)
321308
existing = _request_dedupe_entries.get(dedupe_key)
322309
if isinstance(existing, dict):
310+
existing_payload_hash = str(existing.get("payload_hash") or "")
311+
if existing_payload_hash and payload_hash and existing_payload_hash != payload_hash:
312+
return "conflict", existing
323313
existing["updated_at"] = now_ts
324-
return False, existing
314+
return "join", existing
325315
entry = {
326316
"request_id": request_id,
317+
"payload_hash": payload_hash,
327318
"created_at": now_ts,
328319
"updated_at": now_ts,
329320
"event": threading.Event(),
330321
"response": None,
331322
}
332323
_request_dedupe_entries[dedupe_key] = entry
333-
return True, entry
324+
return "owner", entry
334325

335326

336327
def _freeze_response(response: Response) -> dict[str, Any]:
@@ -718,23 +709,43 @@ async def request_logger(request: Request, call_next):
718709
header_request_id = _normalize_request_id(
719710
request.headers.get("x-request-id")
720711
)
712+
explicit_request_id = (
713+
header_request_id or str(body_meta.get("request_id") or "").strip()
714+
)
721715
request.state.log_id = (
722-
header_request_id
723-
or str(body_meta.get("request_id") or "").strip()
716+
explicit_request_id
724717
or uuid.uuid4().hex[:12]
725718
)
726-
dedupe_key = _build_request_dedupe_key(request, path, raw_body)
727-
dedupe_owner, dedupe_entry = _begin_request_dedupe(
719+
dedupe_key = _build_request_dedupe_key(path, explicit_request_id)
720+
dedupe_action, dedupe_entry = _begin_request_dedupe(
728721
dedupe_key,
729722
str(getattr(request.state, "log_id", "") or ""),
723+
_build_request_payload_hash(raw_body),
730724
)
725+
dedupe_owner = dedupe_action == "owner"
731726
if isinstance(dedupe_entry, dict):
732727
request.state.log_id = str(
733728
dedupe_entry.get("request_id")
734729
or getattr(request.state, "log_id", "")
735730
or uuid.uuid4().hex[:12]
736731
)
737-
if dedupe_key and not dedupe_owner and isinstance(dedupe_entry, dict):
732+
if dedupe_action == "conflict":
733+
conflict_payload = {
734+
"error": {
735+
"message": "request_id already exists with a different payload",
736+
"type": "invalid_request_error",
737+
}
738+
}
739+
response = Response(
740+
content=json.dumps(conflict_payload, ensure_ascii=False).encode("utf-8"),
741+
status_code=409,
742+
media_type="application/json",
743+
)
744+
response.headers["X-Request-Id"] = str(
745+
getattr(request.state, "log_id", "") or ""
746+
)
747+
return response
748+
if dedupe_key and dedupe_action == "join" and isinstance(dedupe_entry, dict):
738749
replay = await _replay_deduped_response(dedupe_entry)
739750
if replay is not None:
740751
dedupe_replayed = True

0 commit comments

Comments
 (0)