Skip to content

Commit 8ae33ba

Browse files
author
Eurekaxun
committed
fix(openclaw-plugin): add defensive re-spawn for OpenViking subprocess after Gateway restart
1 parent 8c57069 commit 8ae33ba

1 file changed

Lines changed: 85 additions & 5 deletions

File tree

examples/openclaw-plugin/index.ts

Lines changed: 85 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import {
2222
import {
2323
IS_WIN,
2424
waitForHealth,
25+
quickHealthCheck,
2526
quickRecallPrecheck,
2627
withTimeout,
2728
resolvePythonCommand,
@@ -460,14 +461,17 @@ const contextEnginePlugin = {
460461
}),
461462
async execute(_toolCallId: string, params: Record<string, unknown>) {
462463
const archiveId = String((params as { archiveId?: string }).archiveId ?? "").trim();
464+
const sessionId = ctx.sessionId ?? "";
465+
api.logger.info?.(`openviking: ov_archive_expand invoked (archiveId=${archiveId || "(empty)"}, sessionId=${sessionId || "(empty)"})`);
466+
463467
if (!archiveId) {
468+
api.logger.warn?.(`openviking: ov_archive_expand missing archiveId`);
464469
return {
465470
content: [{ type: "text", text: "Error: archiveId is required." }],
466471
details: { error: "missing_param", param: "archiveId" },
467472
};
468473
}
469474

470-
const sessionId = ctx.sessionId ?? "";
471475
if (!sessionId) {
472476
return {
473477
content: [{ type: "text", text: "Error: no active session." }],
@@ -495,6 +499,7 @@ const contextEnginePlugin = {
495499
.map((m: OVMessage) => formatMessageFaithful(m))
496500
.join("\n\n");
497501

502+
api.logger.info?.(`openviking: ov_archive_expand expanded ${detail.archive_id}, messages=${detail.messages.length}, chars=${body.length}, sessionId=${sessionId}`);
498503
return {
499504
content: [{ type: "text", text: `${header}\n${body}` }],
500505
details: {
@@ -506,6 +511,7 @@ const contextEnginePlugin = {
506511
};
507512
} catch (err) {
508513
const msg = err instanceof Error ? err.message : String(err);
514+
api.logger.warn?.(`openviking: ov_archive_expand failed (archiveId=${archiveId}, sessionId=${sessionId}): ${msg}`);
509515
return {
510516
content: [{ type: "text", text: `Failed to expand ${archiveId}: ${msg}` }],
511517
details: { error: msg, archiveId, sessionId },
@@ -794,10 +800,8 @@ const contextEnginePlugin = {
794800
localProcess = null;
795801
localClientCache.delete(localCacheKey);
796802
}
797-
if (code != null && code !== 0 || signal) {
798-
const out = formatStderrOutput();
799-
api.logger.warn(`openviking: subprocess exited (code=${code}, signal=${signal})${out}`);
800-
}
803+
const out = formatStderrOutput();
804+
api.logger.warn(`openviking: subprocess exited (code=${code}, signal=${signal})${out}`);
801805
});
802806
try {
803807
await waitForHealth(baseUrl, timeoutMs, intervalMs);
@@ -819,6 +823,82 @@ const contextEnginePlugin = {
819823
}
820824
throw err;
821825
}
826+
} else if (cfg.mode === "local") {
827+
// Defensive re-spawn: if we're not the designated spawner but there's
828+
// no valid local process, trigger a fresh spawn to recover from
829+
// scenarios like Gateway force-restart where the child process was
830+
// orphaned or exited silently.
831+
const cached = localClientCache.get(localCacheKey);
832+
const processAlive = cached?.process && cached.process.exitCode === null && !cached.process.killed;
833+
if (!processAlive) {
834+
const healthOk = await quickHealthCheck(`http://127.0.0.1:${cfg.port}`, 2000);
835+
if (!healthOk) {
836+
api.logger.warn(
837+
`openviking: no valid local process detected (isSpawner=false), triggering defensive re-spawn`,
838+
);
839+
const timeoutMs = 60_000;
840+
const intervalMs = 500;
841+
const actualPort = await prepareLocalPort(cfg.port, api.logger);
842+
const baseUrl = `http://127.0.0.1:${actualPort}`;
843+
const pythonCmd = resolvePythonCommand(api.logger);
844+
const pathSep = IS_WIN ? ";" : ":";
845+
const env = {
846+
...process.env,
847+
PYTHONUNBUFFERED: "1",
848+
PYTHONWARNINGS: "ignore::RuntimeWarning",
849+
OPENVIKING_CONFIG_FILE: cfg.configPath,
850+
OPENVIKING_START_CONFIG: cfg.configPath,
851+
OPENVIKING_START_HOST: "127.0.0.1",
852+
OPENVIKING_START_PORT: String(actualPort),
853+
...(process.env.OPENVIKING_GO_PATH && { PATH: `${process.env.OPENVIKING_GO_PATH}${pathSep}${process.env.PATH || ""}` }),
854+
...(process.env.OPENVIKING_GOPATH && { GOPATH: process.env.OPENVIKING_GOPATH }),
855+
...(process.env.OPENVIKING_GOPROXY && { GOPROXY: process.env.OPENVIKING_GOPROXY }),
856+
};
857+
const runpyCode = `import sys,os,warnings; warnings.filterwarnings('ignore', category=RuntimeWarning, message='.*sys.modules.*'); sys.argv=['openviking.server.bootstrap','--config',os.environ['OPENVIKING_START_CONFIG'],'--host',os.environ.get('OPENVIKING_START_HOST','127.0.0.1'),'--port',os.environ['OPENVIKING_START_PORT']]; import runpy, importlib.util; spec=importlib.util.find_spec('openviking.server.bootstrap'); (runpy.run_path(spec.origin, run_name='__main__') if spec and getattr(spec,'origin',None) else runpy.run_module('openviking.server.bootstrap', run_name='__main__', alter_sys=True))`;
858+
const child = spawn(
859+
pythonCmd,
860+
["-c", runpyCode],
861+
{ env, cwd: IS_WIN ? tmpdir() : "/tmp", stdio: ["ignore", "pipe", "pipe"] },
862+
);
863+
localProcess = child;
864+
child.on("error", (err: Error) => api.logger.warn(`openviking: local server error (re-spawn): ${String(err)}`));
865+
child.stderr?.on("data", (chunk: Buffer) => {
866+
api.logger.debug?.(`[openviking-respawn] ${String(chunk).trim()}`);
867+
});
868+
child.on("exit", (code: number | null, signal: string | null) => {
869+
if (localProcess === child) {
870+
localProcess = null;
871+
localClientCache.delete(localCacheKey);
872+
}
873+
api.logger.warn(`openviking: re-spawned subprocess exited (code=${code}, signal=${signal})`);
874+
});
875+
try {
876+
await waitForHealth(baseUrl, timeoutMs, intervalMs);
877+
const client = new OpenVikingClient(baseUrl, cfg.apiKey, cfg.agentId, cfg.timeoutMs);
878+
localClientCache.set(localCacheKey, { client, process: child });
879+
if (resolveLocalClient) {
880+
resolveLocalClient(client);
881+
rejectLocalClient = null;
882+
}
883+
api.logger.info(
884+
`openviking: local server re-spawned successfully (${baseUrl}, config: ${cfg.configPath})`,
885+
);
886+
} catch (err) {
887+
localProcess = null;
888+
child.kill("SIGTERM");
889+
markLocalUnavailable("re-spawn failed", err);
890+
api.logger.warn(`openviking: defensive re-spawn failed: ${String(err)}`);
891+
throw err;
892+
}
893+
} else {
894+
api.logger.info(`openviking: local process healthy on port ${cfg.port} (isSpawner=false)`);
895+
}
896+
} else {
897+
await (await getClient()).healthCheck().catch(() => {});
898+
api.logger.info(
899+
`openviking: initialized via cache (url: ${cfg.baseUrl}, targetUri: ${cfg.targetUri})`,
900+
);
901+
}
822902
} else {
823903
await (await getClient()).healthCheck().catch(() => {});
824904
api.logger.info(

0 commit comments

Comments
 (0)