-
Notifications
You must be signed in to change notification settings - Fork 161
Expand file tree
/
Copy pathagent_server.py
More file actions
3147 lines (2614 loc) · 113 KB
/
agent_server.py
File metadata and controls
3147 lines (2614 loc) · 113 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
NagaAgent独立服务 - 通过OpenClaw执行任务
提供意图识别和OpenClaw任务调度功能
"""
import asyncio
import logging
import httpx
import os
import sys
from typing import Dict, Any, Optional
from datetime import datetime
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
from system.config import config, add_config_listener
from agentserver.telemetry_client import emit_local_telemetry
from agentserver.openclaw import get_openclaw_client, set_openclaw_config
from agentserver.openclaw.embedded_runtime import get_embedded_runtime, EmbeddedRuntime
# 配置日志
logger = logging.getLogger(__name__)
def _track_travel_task(session_id: str, task: asyncio.Task) -> None:
Modules.travel_tasks[session_id] = task
def _cleanup(done: asyncio.Task) -> None:
Modules.travel_tasks.pop(session_id, None)
try:
done.result()
except asyncio.CancelledError:
logger.info(f"[旅行] 任务已取消: {session_id}")
except Exception as exc:
logger.error(f"[旅行] 任务退出异常 [{session_id}]: {exc}")
task.add_done_callback(_cleanup)
def _spawn_travel_session(session_id: str, *, resumed: bool = False) -> bool:
existing = Modules.travel_tasks.get(session_id)
if existing and not existing.done():
return False
task = asyncio.create_task(_run_travel_session(session_id, resumed=resumed), name=f"travel:{session_id}")
_track_travel_task(session_id, task)
return True
async def _resume_open_travel_sessions() -> None:
from apiserver.travel_service import list_open_sessions
sessions = list_open_sessions()
restored = 0
for session in sessions:
if _spawn_travel_session(session.session_id, resumed=True):
restored += 1
if restored:
logger.info(f"[旅行] 已恢复 {restored} 个未完成探索")
def _mark_travel_sessions_interrupted(
*,
session_ids: Optional[list[str]] = None,
reason: str = "interrupted",
) -> list[str]:
from apiserver.travel_service import (
get_session_or_none,
interrupt_open_sessions,
mark_session_interrupted,
)
interrupted_ids: list[str] = []
if session_ids:
for session_id in session_ids:
session = get_session_or_none(session_id)
if session is None:
continue
updated = mark_session_interrupted(session, reason=reason)
if updated.session_id not in interrupted_ids:
interrupted_ids.append(updated.session_id)
return interrupted_ids
for session in interrupt_open_sessions(reason=reason):
interrupted_ids.append(session.session_id)
return interrupted_ids
async def _interrupt_travel_sessions(
*,
session_ids: Optional[list[str]] = None,
reason: str = "interrupted",
) -> list[str]:
interrupted_ids = _mark_travel_sessions_interrupted(session_ids=session_ids, reason=reason)
cancelled_tasks: list[asyncio.Task] = []
for session_id in interrupted_ids:
task = Modules.travel_tasks.get(session_id)
if task and not task.done():
task.cancel()
cancelled_tasks.append(task)
if cancelled_tasks:
await asyncio.gather(*cancelled_tasks, return_exceptions=True)
return interrupted_ids
async def _start_gateway_if_port_free(runtime: EmbeddedRuntime) -> bool:
"""主 Gateway 仅按主端口状态判定是否需要启动。"""
if runtime.gateway_running:
logger.info("当前进程中的 OpenClaw Gateway 已在运行,跳过启动")
return False
if runtime.is_gateway_port_in_use():
logger.info(f"端口 {config.openclaw.gateway_port} 已被占用,跳过 Gateway 启动")
return False
if runtime.has_gateway_process():
logger.info("检测到其他 OpenClaw Gateway 进程,但主端口空闲;继续尝试启动 20789 主 Gateway")
gw_ok = await runtime.start_gateway()
if gw_ok:
logger.info("OpenClaw Gateway 启动成功")
else:
logger.warning("OpenClaw Gateway 启动失败")
return gw_ok
def _on_config_changed() -> None:
"""配置变更监听器:自动更新 OpenClaw LLM 配置"""
try:
embedded_runtime = get_embedded_runtime()
if embedded_runtime.is_vendor_ready:
from agentserver.openclaw.llm_config_bridge import inject_naga_llm_config
inject_naga_llm_config()
logger.info("配置变更:已更新 OpenClaw LLM 配置")
except Exception as e:
logger.warning(f"配置变更时更新 OpenClaw 配置失败: {e}")
def _is_port_in_use(port: int) -> bool:
"""检测端口是否被占用"""
import socket
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
result = sock.connect_ex(("127.0.0.1", port))
sock.close()
return result == 0
except Exception:
return False
async def _delayed_health_check():
"""延迟健康检查(等待所有服务启动)"""
await asyncio.sleep(6) # 虚拟机/低性能环境下启动更慢,适当延长缓冲时间
try:
from system.health_check import perform_startup_health_check
results, _summary = await perform_startup_health_check()
# API 在慢机器上可能晚于首次检查就绪,做一次延迟复检避免启动早期误报
api_result = results.get("api_server")
api_unhealthy = (
api_result is not None
and getattr(getattr(api_result, "status", None), "value", "") == "unhealthy"
)
if api_unhealthy:
logger.info("[HealthCheck] 检测到 API 尚未就绪,12 秒后执行一次复检")
await asyncio.sleep(12)
await perform_startup_health_check()
except Exception as e:
logger.error(f"启动时健康检查失败: {e}")
@asynccontextmanager
async def lifespan(app: FastAPI):
"""FastAPI应用生命周期"""
# startup
try:
# 初始化 OpenClaw 客户端 - 三层回退策略
try:
from agentserver.openclaw import detect_openclaw, OpenClawConfig as ClientOpenClawConfig
from agentserver.openclaw.llm_config_bridge import (
ensure_openclaw_config,
inject_naga_llm_config,
ensure_hooks_allow_request_session_key,
ensure_gateway_local_mode,
ensure_hooks_path,
ensure_gateway_port,
)
embedded_runtime = get_embedded_runtime()
logger.info(f"OpenClaw 运行时模式: {embedded_runtime.runtime_mode}")
logger.info(f" vendor_root: {embedded_runtime.vendor_root}")
logger.info(f" node_path: {embedded_runtime.node_path}")
logger.info(f" is_vendor_ready: {embedded_runtime.is_vendor_ready}")
# ── Step 1: 确保 vendor 就绪 + 配置 + onboard ──
if not embedded_runtime.is_vendor_ready:
await embedded_runtime.ensure_vendor_ready()
if embedded_runtime.is_vendor_ready:
ensure_openclaw_config()
ensure_gateway_port(auto_create=False)
ensure_gateway_local_mode(auto_create=False)
ensure_hooks_path(auto_create=False)
ensure_hooks_allow_request_session_key(auto_create=False)
await embedded_runtime.ensure_onboarded()
inject_naga_llm_config()
# ── Step 1.5: 清理端口范围内的残留进程 ──
try:
from agentserver.openclaw.instance_manager import cleanup_port_range
cleaned = cleanup_port_range()
if cleaned:
await asyncio.sleep(1) # 等端口释放
except Exception as e:
logger.warning(f"端口清理失败(可忽略): {e}")
# ── Step 2: 启动 Gateway ──
await _start_gateway_if_port_free(embedded_runtime)
# 检测最终状态并初始化客户端
openclaw_status = detect_openclaw(check_connection=False)
if openclaw_status.installed:
openclaw_config = ClientOpenClawConfig(
gateway_url=openclaw_status.gateway_url or config.openclaw.gateway_url,
gateway_token=openclaw_status.gateway_token,
hooks_token=openclaw_status.hooks_token,
hooks_path=getattr(openclaw_status, "hooks_path", "/hooks"),
timeout=config.openclaw.timeout,
)
logger.info(f"OpenClaw 配置: {openclaw_config.gateway_url}")
logger.info(
f" - gateway_token: {'***' + openclaw_config.gateway_token[-8:] if openclaw_config.gateway_token else '未配置'}"
)
logger.info(
f" - hooks_token: {'***' + openclaw_config.hooks_token[-8:] if openclaw_config.hooks_token else '未配置'}"
)
else:
openclaw_config = ClientOpenClawConfig(
gateway_url=config.openclaw.gateway_url,
gateway_token=config.openclaw.token,
hooks_token=config.openclaw.token,
timeout=config.openclaw.timeout,
)
logger.info(f"OpenClaw 未检测到安装,使用配置文件: {openclaw_config.gateway_url}")
Modules.openclaw_client = get_openclaw_client(openclaw_config)
Modules.openclaw_client.restore_session()
logger.info(f"OpenClaw客户端初始化完成: {openclaw_config.gateway_url}")
except Exception as e:
logger.warning(f"OpenClaw客户端初始化失败(可选功能): {e}")
Modules.openclaw_client = None
# 初始化干员多实例管理器(通讯录模式:只恢复列表,不启动进程)
try:
from agentserver.openclaw.instance_manager import InstanceManager
embedded_runtime = get_embedded_runtime()
Modules.instance_manager = InstanceManager(embedded_runtime, primary_client=Modules.openclaw_client)
logger.info("干员多实例管理器已初始化")
# 从 manifest 恢复通讯录(不启动进程)
try:
agents = Modules.instance_manager.restore_from_manifest()
logger.info(f"通讯录恢复完成,共 {len(agents)} 个干员")
except Exception as e:
logger.warning(f"恢复通讯录失败(可忽略): {e}")
try:
await _resume_open_travel_sessions()
except Exception as e:
logger.warning(f"恢复探索任务失败(可忽略): {e}")
except Exception as e:
logger.warning(f"干员多实例管理器初始化失败(可选功能): {e}")
Modules.instance_manager = None
# 注册配置变更监听器
add_config_listener(_on_config_changed)
logger.debug("已注册 OpenClaw 配置变更监听器")
# 初始化军牌系统(统一后台任务调度)
try:
from agentserver.dogtag import (
create_dogtag_scheduler,
get_dogtag_registry,
load_heartbeat_config,
create_heartbeat_executor,
load_proactive_config,
create_proactive_analyzer,
create_proactive_trigger,
)
from agentserver.dogtag.duties.heartbeat_duty import create_heartbeat_duty
from agentserver.dogtag.duties.screen_vision_duty import create_screen_vision_duty
# 1. 初始化子组件
pv_config = load_proactive_config()
create_proactive_trigger()
create_proactive_analyzer(pv_config)
hb_config = load_heartbeat_config()
create_heartbeat_executor(hb_config)
# 2. 创建军牌调度器
Modules.dogtag_scheduler = create_dogtag_scheduler()
registry = get_dogtag_registry()
# 3. 注册职责
hb_tag, hb_exec = create_heartbeat_duty(hb_config)
registry.register(hb_tag, hb_exec)
sv_tag, sv_exec = create_screen_vision_duty(pv_config)
registry.register(sv_tag, sv_exec)
# 4. 启动调度器
await Modules.dogtag_scheduler.start()
logger.info("[DogTag] 军牌系统已启动")
except Exception as e:
logger.warning(f"[DogTag] 军牌系统初始化失败(可选功能): {e}")
Modules.dogtag_scheduler = None
logger.info("NagaAgent服务初始化完成")
# 执行启动时健康检查(延迟2秒等待所有服务就绪)
asyncio.create_task(_delayed_health_check())
except Exception as e:
logger.error(f"服务初始化失败: {e}")
raise
# 运行期
yield
# shutdown
try:
# 停止军牌系统
if Modules.dogtag_scheduler:
await Modules.dogtag_scheduler.stop()
logger.info("[DogTag] 军牌系统已停止")
# 关闭心跳执行器的 HTTP 客户端
from agentserver.dogtag import get_heartbeat_executor
hb_executor = get_heartbeat_executor()
if hb_executor:
await hb_executor.close()
# 停止所有干员子实例(主 Gateway 由下方 stop_gateway 处理)
interrupted_ids = await _interrupt_travel_sessions(reason="shutdown")
if interrupted_ids:
logger.info(f"[旅行] 服务关闭前已中断 {len(interrupted_ids)} 个探索任务")
if Modules.instance_manager:
await Modules.instance_manager.destroy_all()
logger.info("干员多实例已全部停止")
# 停止 Gateway 进程(内嵌模式)
embedded_runtime = get_embedded_runtime()
if embedded_runtime.gateway_running:
await embedded_runtime.stop_gateway()
logger.info("NagaAgent服务已关闭")
except Exception as e:
logger.error(f"服务关闭失败: {e}")
app = FastAPI(title="NagaAgent Server", version="1.0.0", lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class Modules:
"""全局模块管理器"""
openclaw_client = None
dogtag_scheduler = None # 军牌系统统一调度器
instance_manager = None # 干员多实例管理器
travel_tasks: Dict[str, asyncio.Task] = {}
def _now_iso() -> str:
"""获取当前时间ISO格式"""
return datetime.now().isoformat()
async def _process_openclaw_task(instruction: str, session_id: Optional[str] = None) -> Dict[str, Any]:
"""通过 OpenClaw 执行任务"""
try:
if not Modules.openclaw_client:
return {
"success": False,
"error": "OpenClaw 客户端未初始化",
"task_type": "openclaw",
"instruction": instruction,
}
logger.info(f"开始通过 OpenClaw 执行任务: {instruction}")
task = await Modules.openclaw_client.send_message(
message=instruction,
session_key=session_id,
name="NagaAgent",
)
logger.info(f"OpenClaw 任务完成: {instruction}, 状态: {task.status.value}")
return {
"success": task.status.value == "completed",
"result": task.to_dict(),
"task_type": "openclaw",
"instruction": instruction,
}
except Exception as e:
logger.error(f"OpenClaw 任务失败: {e}")
return {"success": False, "error": str(e), "task_type": "openclaw", "instruction": instruction}
# ============ API端点 ============
@app.get("/health")
async def health_check():
"""健康检查"""
return {
"status": "healthy",
"timestamp": _now_iso(),
"modules": {
"openclaw": Modules.openclaw_client is not None,
"dogtag": Modules.dogtag_scheduler is not None,
},
}
@app.get("/health/full")
async def full_health_check():
"""完整健康检查(包括所有服务)"""
from system.health_check import get_health_checker
checker = get_health_checker()
results = await checker.check_all()
summary = checker.get_summary(results)
# 转换为可序列化格式
results_dict = {}
for service_name, result in results.items():
results_dict[service_name] = {
"status": result.status.value,
"message": result.message,
"checks": result.checks,
"details": result.details,
"latency_ms": result.latency_ms,
}
return {
"summary": summary,
"services": results_dict,
"timestamp": _now_iso(),
}
# ============ OpenClaw 集成 API ============
@app.get("/openclaw/health")
async def openclaw_health_check():
"""检查 OpenClaw Gateway 健康状态"""
if not Modules.openclaw_client:
return {"success": False, "status": "not_configured", "message": "OpenClaw 客户端未配置"}
try:
health = await Modules.openclaw_client.health_check()
return {"success": True, "health": health}
except Exception as e:
logger.error(f"OpenClaw 健康检查失败: {e}")
return {"success": False, "status": "error", "error": str(e)}
@app.post("/openclaw/config")
async def configure_openclaw(payload: Dict[str, Any]):
"""配置 OpenClaw 连接
请求体:
- gateway_url: Gateway 地址 (默认从 config.openclaw.gateway_url 读取)
- token: 认证 token
- timeout: 超时时间
- default_model: 默认模型
- default_channel: 默认通道
"""
try:
from agentserver.openclaw import OpenClawConfig as ClientOpenClawConfig
openclaw_config = ClientOpenClawConfig(
gateway_url=payload.get("gateway_url", config.openclaw.gateway_url),
token=payload.get("token"),
hooks_path=payload.get("hooks_path", "/hooks"),
timeout=payload.get("timeout", config.openclaw.timeout),
default_model=payload.get("default_model"),
default_channel=payload.get("default_channel", "last"),
)
set_openclaw_config(openclaw_config)
Modules.openclaw_client = get_openclaw_client()
logger.info(f"OpenClaw 配置更新: {openclaw_config.gateway_url}")
return {"success": True, "message": "OpenClaw 配置已更新", "gateway_url": openclaw_config.gateway_url}
except Exception as e:
logger.error(f"OpenClaw 配置失败: {e}")
raise HTTPException(500, f"配置失败: {e}")
@app.post("/openclaw/send")
async def openclaw_send_message(payload: Dict[str, Any]):
"""
发送消息给 OpenClaw Agent
使用 POST /hooks/agent 端点
文档: https://docs.openclaw.ai/automation/webhook
请求体:
- message: 消息内容 (必需)
- task_id: 外部任务ID(可选;用于与调度器task_id对齐)
- session_key: 会话标识 (可选)
- name: hook 名称 (可选)
- channel: 消息通道 (可选)
- to: 接收者 (可选)
- model: 模型名称 (可选)
- wake_mode: 唤醒模式 now/next-heartbeat (可选)
- deliver: 是否投递 (可选)
- timeout_seconds: 等待结果超时时间,默认120秒 (可选)
"""
if not Modules.openclaw_client:
raise HTTPException(503, "OpenClaw 客户端未就绪")
message = payload.get("message")
if not message:
raise HTTPException(400, "message 不能为空")
# 如果提供了 task_id 但未提供 session_key,则默认使用 task_id 派生稳定会话键,便于按任务查看中间过程
task_id = payload.get("task_id")
session_key = payload.get("session_key")
if task_id and not session_key:
session_key = f"naga:task:{task_id}"
try:
task = await Modules.openclaw_client.send_message(
message=message,
session_key=session_key,
name=payload.get("name"),
channel=payload.get("channel"),
to=payload.get("to"),
model=payload.get("model"),
wake_mode=payload.get("wake_mode", "now"),
deliver=payload.get("deliver", False),
timeout_seconds=payload.get("timeout_seconds", 120),
task_id=task_id,
)
return {
"success": task.status.value != "failed",
"task": task.to_dict(),
"reply": task.result.get("reply") if task.result else None,
"replies": task.result.get("replies") if task.result else None,
"error": task.error,
}
except Exception as e:
logger.error(f"OpenClaw 发送消息失败: {e}")
raise HTTPException(500, f"发送失败: {e}")
@app.post("/openclaw/wake")
async def openclaw_wake(payload: Dict[str, Any]):
"""
触发 OpenClaw 系统事件
使用 POST /hooks/wake 端点
文档: https://docs.openclaw.ai/automation/webhook
请求体:
- text: 事件描述 (必需)
- mode: 触发模式 now/next-heartbeat (可选)
"""
if not Modules.openclaw_client:
raise HTTPException(503, "OpenClaw 客户端未就绪")
text = payload.get("text")
if not text:
raise HTTPException(400, "text 不能为空")
try:
result = await Modules.openclaw_client.wake(text=text, mode=payload.get("mode", "now"))
return result
except Exception as e:
logger.error(f"OpenClaw 触发事件失败: {e}")
raise HTTPException(500, f"触发失败: {e}")
# ============ 干员实例管理 ============
@app.get("/openclaw/instances")
async def list_instances():
"""列出所有干员(通讯录),不管进程是否在跑"""
if not Modules.instance_manager:
raise HTTPException(503, "实例管理器未就绪")
return {"instances": Modules.instance_manager.list_agents()}
# ── 新通讯录 API ──
@app.get("/openclaw/agents")
async def list_agents():
"""列出通讯录中所有干员(不管进程是否在跑)"""
if not Modules.instance_manager:
raise HTTPException(503, "实例管理器未就绪")
return {"agents": Modules.instance_manager.list_agents()}
@app.post("/openclaw/agents")
async def create_agent(payload: Dict[str, Any]):
"""新建干员(写 manifest + 创建目录,不启动进程)"""
if not Modules.instance_manager:
raise HTTPException(503, "实例管理器未就绪")
name = payload.get("name")
character_template = (payload.get("character_template") or payload.get("characterTemplate") or "").strip() or None
engine = (payload.get("engine") or "openclaw").strip() or "openclaw"
telemetry_props = {
"engine": engine,
"name_length": len(str(name or "")),
"has_character_template": bool(character_template),
}
try:
inst = Modules.instance_manager.create_agent(
name,
character_template=character_template,
engine=engine,
)
await emit_local_telemetry(
"agent_create",
{
**telemetry_props,
"created_agent_id": inst.id,
},
agent_id=inst.id,
)
return {
"id": inst.id,
"name": inst.name,
"running": inst.running,
"character_template": inst.character_template,
"engine": inst.engine,
}
except Exception as e:
await emit_local_telemetry(
"agent_create_fail",
{
**telemetry_props,
"error": e,
},
)
logger.error(f"创建干员失败: {e}")
raise HTTPException(500, f"创建失败: {e}")
@app.get("/openclaw/agents/{agent_id}/settings")
async def get_agent_settings(agent_id: str):
"""读取干员设置(名字 / 引擎 / 人设 / SOUL.md)。"""
if not Modules.instance_manager:
raise HTTPException(503, "实例管理器未就绪")
settings = Modules.instance_manager.get_agent_settings(agent_id)
if settings is None:
raise HTTPException(404, "干员不存在")
return settings
@app.put("/openclaw/agents/{agent_id}/settings")
async def update_agent_settings(agent_id: str, payload: Dict[str, Any]):
"""更新干员设置(名字 / 引擎 / 人设 / SOUL.md)。"""
if not Modules.instance_manager:
raise HTTPException(503, "实例管理器未就绪")
name = payload.get("name")
if name is not None:
name = str(name).strip()
if not name:
raise HTTPException(400, "name 不能为空")
engine = payload.get("engine")
if engine is not None:
engine = str(engine).strip()
update_character_template = "character_template" in payload or "characterTemplate" in payload
character_template = payload.get("character_template")
if character_template is None and "characterTemplate" in payload:
character_template = payload.get("characterTemplate")
if character_template is not None:
character_template = str(character_template).strip() or None
update_soul_content = "soul_content" in payload or "soulContent" in payload
soul_content = payload.get("soul_content")
if soul_content is None and "soulContent" in payload:
soul_content = payload.get("soulContent")
if soul_content is not None:
soul_content = str(soul_content)
telemetry_props = {
"changed_fields": sorted(
field
for field, changed in (
("name", name is not None),
("engine", engine is not None),
("character_template", update_character_template),
("soul_content", update_soul_content),
)
if changed
),
"name_length": len(name or "") if name is not None else None,
"engine": engine,
"has_character_template": bool(character_template) if update_character_template else None,
"soul_content_chars": len(soul_content or "") if update_soul_content else None,
}
try:
settings = await Modules.instance_manager.update_agent_settings(
agent_id,
name=name,
character_template=character_template,
update_character_template=update_character_template,
engine=engine,
soul_content=soul_content,
update_soul_content=update_soul_content,
)
except ValueError as exc:
await emit_local_telemetry(
"agent_settings_update_fail",
{
**telemetry_props,
"error": str(exc),
"status_code": 400,
},
agent_id=agent_id,
)
raise HTTPException(400, str(exc))
except Exception as exc:
await emit_local_telemetry(
"agent_settings_update_fail",
{
**telemetry_props,
"error": exc,
},
agent_id=agent_id,
)
logger.error(f"更新干员设置失败: {exc}")
raise HTTPException(500, f"更新失败: {exc}")
if settings is None:
await emit_local_telemetry(
"agent_settings_update_fail",
{
**telemetry_props,
"status_code": 404,
"error": "干员不存在",
},
agent_id=agent_id,
)
raise HTTPException(404, "干员不存在")
await emit_local_telemetry("agent_settings_update", telemetry_props, agent_id=agent_id)
return settings
@app.delete("/openclaw/agents/{agent_id}")
async def delete_agent(agent_id: str, delete_data: bool = True):
"""从通讯录删除干员 + 停止进程 + 可选删数据"""
if not Modules.instance_manager:
raise HTTPException(503, "实例管理器未就绪")
try:
await Modules.instance_manager.destroy_agent_async(agent_id, delete_data=delete_data)
await emit_local_telemetry(
"agent_delete",
{
"delete_data": bool(delete_data),
},
agent_id=agent_id,
)
return {"success": True}
except Exception as e:
await emit_local_telemetry(
"agent_delete_fail",
{
"delete_data": bool(delete_data),
"error": e,
},
agent_id=agent_id,
)
logger.error(f"删除干员失败: {e}")
raise HTTPException(500, f"删除失败: {e}")
@app.put("/openclaw/agents/{agent_id}/name")
async def rename_agent(agent_id: str, payload: Dict[str, Any]):
"""重命名干员(通讯录 + 目录)"""
if not Modules.instance_manager:
raise HTTPException(503, "实例管理器未就绪")
new_name = (payload.get("name") or "").strip()
if not new_name:
await emit_local_telemetry(
"agent_rename_fail",
{
"name_length": 0,
"status_code": 400,
"error": "name 不能为空",
},
agent_id=agent_id,
)
raise HTTPException(400, "name 不能为空")
ok = Modules.instance_manager.rename_agent(agent_id, new_name)
if not ok:
await emit_local_telemetry(
"agent_rename_fail",
{
"name_length": len(new_name),
"status_code": 404,
"error": "干员不存在",
},
agent_id=agent_id,
)
raise HTTPException(404, "干员不存在")
await emit_local_telemetry(
"agent_rename",
{
"name_length": len(new_name),
},
agent_id=agent_id,
)
return {"success": True, "name": new_name}
@app.get("/openclaw/agents/{agent_id}/history")
async def get_agent_history(agent_id: str, limit: int = 50):
"""获取干员对话历史(自动 ensure_running 启动进程)"""
if not Modules.instance_manager:
raise HTTPException(503, "实例管理器未就绪")
inst = Modules.instance_manager.get_instance(agent_id)
if inst is None:
raise HTTPException(404, "干员不存在")
if inst.engine != "openclaw":
return {"messages": []}
# 按需启动进程(需要 Gateway 运行才能查询 session 历史)
if not inst.running or not inst.client:
try:
inst = await Modules.instance_manager.ensure_running(agent_id)
except Exception as e:
logger.warning(f"启动干员 [{agent_id}] 以获取历史失败: {e}")
return {"messages": []}
session_key = inst.client._default_session_key
if not session_key:
logger.warning(f"干员 [{inst.name}] session_key 为空,无法获取历史")
return {"messages": []}
try:
logger.info(f"获取干员 [{inst.name}] 历史: session_key={session_key}, limit={limit}")
history = await inst.client.get_local_session_history(
session_key=session_key,
limit=limit,
)
if history.get("success"):
messages = [
msg for msg in history.get("messages", [])
if isinstance(msg, dict) and msg.get("role") in ("user", "assistant")
]
logger.info(f"解析后有效消息: {len(messages)} 条")
return {"messages": messages}
logger.warning(f"获取干员 [{inst.name}] 历史失败: {history.get('error', 'unknown')}")
return {"messages": []}
except Exception as e:
logger.warning(f"获取干员 [{inst.name}] 历史失败: {e}", exc_info=True)
return {"messages": []}
@app.get("/openclaw/agents/{agent_id}/runtime")
async def get_agent_runtime(agent_id: str, wake: bool = False):
"""解析指定干员当前运行时;wake=true 时若未启动则按需唤醒。"""
if not Modules.instance_manager:
raise HTTPException(503, "实例管理器未就绪")
try:
runtime = await Modules.instance_manager.resolve_runtime(agent_id, wake=wake)
return {"success": True, "runtime": runtime}
except RuntimeError as exc:
detail = str(exc)
status = 404 if "不存在" in detail else 409
raise HTTPException(status, detail)
except Exception as exc:
logger.error(f"解析干员运行时失败 [{agent_id}]: {exc}")
raise HTTPException(500, f"解析运行时失败: {exc}")
@app.post("/openclaw/agents/{agent_id}/send")
async def send_to_agent(agent_id: str, payload: Dict[str, Any]):
"""向指定干员发送消息(同步等待结果),内部自动 ensure_running。"""
if not Modules.instance_manager:
raise HTTPException(503, "实例管理器未就绪")
message = payload.get("message", "")
if not message:
raise HTTPException(400, "message 不能为空")
timeout = int(payload.get("timeout_seconds", 120) or 120)
session_key = payload.get("session_key")
name = payload.get("name")
result = await Modules.instance_manager.send_message(
agent_id,
message,
timeout=timeout,
session_key=session_key,
name=name,
)
if not result.get("success", False) and "error" in result:
logger.warning(f"发送消息到干员 [{agent_id}] 失败: {result.get('error')}")
return result
@app.post("/openclaw/agents/{agent_id}/stream")
async def stream_to_agent(agent_id: str, payload: Dict[str, Any]):
"""向干员发送消息(SSE 流式输出),内部自动 ensure_running"""
from fastapi.responses import StreamingResponse
import json as _json
if not Modules.instance_manager:
raise HTTPException(503, "实例管理器未就绪")
message = payload.get("message", "")
if not message:
raise HTTPException(400, "message 不能为空")
timeout = payload.get("timeout_seconds", 120)
async def event_stream():
async for chunk in Modules.instance_manager.send_message_stream(agent_id, message, timeout):
yield f"data: {_json.dumps(chunk, ensure_ascii=False)}\n\n"
return StreamingResponse(event_stream(), media_type="text/event-stream")
# ============ 本地搜索代理(拦截 OpenClaw web_search) ============
_search_http_client: Optional[httpx.AsyncClient] = None
def _get_search_client() -> httpx.AsyncClient:
"""搜索代理共享 httpx 客户端"""
global _search_http_client
if _search_http_client is None or _search_http_client.is_closed:
_search_http_client = httpx.AsyncClient(timeout=30.0, proxy=None)
return _search_http_client
async def _local_search_proxy(args: Dict[str, Any]) -> Dict[str, Any]:
"""
本地搜索代理:拦截 web_search 请求,走 Naga 或 Brave,不转发给 OpenClaw。
返回 MCP 工具结果格式 { success, result: { content: [...] } }
"""
query = args.get("query", "") or args.get("q", "")
count = args.get("count", 10) or args.get("limit", 10)
freshness = args.get("freshness")
if not query:
return {"success": False, "error": "缺少搜索关键词 (query)"}
try:
# 优先级1: 已登录 Naga → NagaBusiness 搜索代理
from apiserver import naga_auth
if naga_auth.is_authenticated():
token = naga_auth.get_access_token()
params: Dict[str, Any] = {"q": query, "count": count}
if freshness:
params["freshness"] = freshness
client = _get_search_client()
resp = await client.post(
naga_auth.NAGA_MODEL_URL + "/tools/search",
json=params,
headers={"Authorization": f"Bearer {token}"},
)