Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 85 additions & 3 deletions dispatch/index.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,62 @@ function agentFromSessionKey(sessionKey) {
return 'main';
}

function getSessionJsonlMtimeMs(agent, sessionId) {
const jsonlPath = getSessionJsonlPath(agent, sessionId);
if (!jsonlPath) return null;
try {
return statSync(jsonlPath).mtimeMs;
} catch {
return null;
}
}

function getJsonlPendingToolReason(entries) {
if (!Array.isArray(entries) || entries.length === 0) return null;
const last = entries[entries.length - 1];

if (last?.role === 'assistant') {
const content = Array.isArray(last.content) ? last.content : [];
const toolUse = content.find(c => c?.type === 'tool_use');
if (toolUse) {
return `last assistant entry has tool_use (${toolUse.name || 'unknown'}) -- awaiting tool result`;
}
if (last.type === 'tool_use') {
return `last entry is tool_use (${last.name || 'unknown'}) -- awaiting tool result`;
}
}

if (last?.role === 'user') {
const content = Array.isArray(last.content) ? last.content : [];
if (content.some(c => c?.type === 'tool_result')) {
return 'last entry is tool_result (tool executed, awaiting assistant reply)';
}
}

if (last?.type === 'tool_result') {
return 'last entry is tool_result (tool executed, awaiting assistant reply)';
}

return null;
}

function getJsonlTerminalReplyReason(entries) {
if (!Array.isArray(entries) || entries.length === 0) return null;
const terminalReply = extractTerminalAssistantReplyFromEntries(entries);
if (!terminalReply) return null;

for (let i = entries.length - 1; i >= 0; i--) {
const entry = entries[i];
if (entry?.role === 'assistant') {
return entry.stop_reason === 'end_turn'
? 'terminal assistant reply observed in JSONL'
: null;
}
}

return null;
}

// -- Gateway Session State Check ------------------------------

/**
Expand Down Expand Up @@ -647,9 +703,35 @@ function checkSessionDone(sessionKey, sessionsStore, thresholdMs, sessionEverFou
}

// 2. Session exists in store, check idle time.
const entry = sessionsStore[sessionKey];
const lastActivity = entry.updatedAt || 0;
const silenceMs = Date.now() - lastActivity;
const entry = sessionsStore[sessionKey];
const agent = agentFromSessionKey(sessionKey) || 'main';
const updatedAtMs = toTimestampMs(entry.updatedAt);
const lastActivityAtMs = toTimestampMs(entry.lastActivityAt);
const jsonlMtimeMs = getSessionJsonlMtimeMs(agent, entry.sessionId);
const activityTimes = [updatedAtMs, lastActivityAtMs, jsonlMtimeMs].filter(t => typeof t === 'number');
const lastActivity = activityTimes.length ? Math.max(...activityTimes) : null;
const silenceMs = lastActivity === null ? Infinity : Date.now() - lastActivity;

if (entry.sessionId) {
const entries = readJsonlTailEntries(entry.sessionId, agent, 20);
const pendingToolReason = getJsonlPendingToolReason(entries);
if (pendingToolReason) {
return {
shouldResolve: false,
reason: `session JSONL shows pending work: ${pendingToolReason}`,
lastActivity,
};
}

const terminalReplyReason = getJsonlTerminalReplyReason(entries);
if (terminalReplyReason) {
return {
shouldResolve: false,
reason: `${terminalReplyReason}; watcher/result path should deliver completion`,
lastActivity,
};
}
}

if (silenceMs >= thresholdMs) {
return {
Expand Down
24 changes: 20 additions & 4 deletions dispatch/watcher.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -1120,14 +1120,18 @@ function runOnceAndExit() {
}

const ageMs = status.liveness?.ageMs;
const idleResultCheckMs = getCurrentLivenessPolicy().idleProbeMs;
const livenessPolicy = getCurrentLivenessPolicy();
const idleResultCheckMs = livenessPolicy.idleProbeMs;
const idleFailureMs = livenessPolicy.idleFailureMs;
if (ageMs != null && ageMs >= idleResultCheckMs) {
const result = dispatch('result', ['--label', label]);
if (hasStructuredCompletion(result)) {
deliverResult(label, result?.lastReply || null, null, result?.completion || null);
}

const stallReason = getRunningSessionStallReason(status, idleResultCheckMs);
const stallReason = ageMs >= idleFailureMs
? getRunningSessionStallReason(status, idleFailureMs)
: null;
if (stallReason) {
process.stderr.write(`[watcher] [${label}] ${stallReason}\n`);
markLabelError(label, stallReason);
Expand Down Expand Up @@ -1504,14 +1508,18 @@ while (Date.now() < deadline) {
// while this watcher's lastPing heartbeat is fresh (written every 60s);
// this path handles normal completion before the ping goes stale.
const ageMs = status.liveness?.ageMs;
const idleResultCheckMs = getCurrentLivenessPolicy().idleProbeMs;
const livenessPolicy = getCurrentLivenessPolicy();
const idleResultCheckMs = livenessPolicy.idleProbeMs;
const idleFailureMs = livenessPolicy.idleFailureMs;
if (ageMs != null && ageMs >= idleResultCheckMs) {
const result = dispatch('result', ['--label', label]);
if (hasStructuredCompletion(result)) {
deliverResult(label, result?.lastReply || null, null, result?.completion || null);
}

const stallReason = getRunningSessionStallReason(status, idleResultCheckMs);
const stallReason = ageMs >= idleFailureMs
? getRunningSessionStallReason(status, idleFailureMs)
: null;
if (stallReason) {
process.stderr.write(`[watcher] [${label}] ${stallReason}\n`);
markLabelError(label, stallReason);
Expand All @@ -1530,6 +1538,14 @@ while (Date.now() < deadline) {
// Timed out -- try one last result check
const finalResult = dispatch('result', ['--label', label]);
const finalStatus = dispatch('status', ['--label', label]);
if (hasStructuredCompletion(finalResult)) {
deliverResult(
label,
finalResult?.lastReply || null,
finalStatus?.summary || null,
finalResult?.completion || finalStatus?.completion || null,
);
}
if (finalStatus?.status === 'done') {
const rc = getRetryCount(label);
if (rc > 0) setRetryCount(label, 0);
Expand Down
145 changes: 141 additions & 4 deletions tests/delivery-fixes.test.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ test('watcher --once detects stale sessions despite fresh watcher lastPing', ()
writeFileSync(join(sessionsDir, 'sessions.json'), JSON.stringify({
[sessionKey]: {
sessionId,
updatedAt: new Date(Date.now() - 77 * 60 * 1000).toISOString(),
updatedAt: new Date(Date.now() - 130 * 60 * 1000).toISOString(),
model: 'test',
},
}) + '\n');
Expand All @@ -372,7 +372,7 @@ test('watcher --once detects stale sessions despite fresh watcher lastPing', ()
role: 'assistant',
content: [{ type: 'text', text: 'Still working.' }],
}) + '\n');
const staleDate = new Date(Date.now() - 77 * 60 * 1000);
const staleDate = new Date(Date.now() - 130 * 60 * 1000);
utimesSync(jsonlPath, staleDate, staleDate);
writeFileSync(labelsPath, JSON.stringify({
[label]: {
Expand All @@ -386,14 +386,14 @@ test('watcher --once detects stale sessions despite fresh watcher lastPing', ()
}) + '\n');
writeFileSync(mockDispatch, `
const sub = process.argv[2];
if (sub === 'status') {
if (sub === 'status') {
process.stdout.write(JSON.stringify({
ok: true,
label: ${JSON.stringify(label)},
status: 'running',
sessionKey: ${JSON.stringify(sessionKey)},
agent: 'main',
liveness: { ageMs: ${77 * 60 * 1000} }
liveness: { ageMs: ${130 * 60 * 1000} }
}) + '\\n');
} else if (sub === 'result') {
process.stdout.write(JSON.stringify({ ok: true, status: 'running' }) + '\\n');
Expand Down Expand Up @@ -426,6 +426,143 @@ if (sub === 'status') {
}
});

test('watcher --once probes high-thinking idle sessions without failing before idle failure window', () => {
const tempDir = mkdtempSync(join(tmpdir(), 'watcher-once-high-probe-'));
const labelsPath = join(tempDir, 'labels.json');
const mockDispatch = join(tempDir, 'mock-dispatch.mjs');
const label = 'quick-poll-high-probe';
const sessionKey = 'agent:main:subagent:high-probe-session';
const sessionId = 'high-probe-jsonl-id';
const watcherPath = join(__dirname, '..', 'dispatch', 'watcher.mjs');
const sessionsDir = join(tempDir, '.openclaw', 'agents', 'main', 'sessions');
const staleMs = 16 * 60 * 1000;

try {
mkdirSync(sessionsDir, { recursive: true });
writeFileSync(join(sessionsDir, 'sessions.json'), JSON.stringify({
[sessionKey]: {
sessionId,
updatedAt: new Date(Date.now() - staleMs).toISOString(),
model: 'test',
},
}) + '\n');
const jsonlPath = join(sessionsDir, `${sessionId}.jsonl`);
writeFileSync(jsonlPath, JSON.stringify({
role: 'assistant',
content: [{ type: 'text', text: 'Still working.' }],
}) + '\n');
const staleDate = new Date(Date.now() - staleMs);
utimesSync(jsonlPath, staleDate, staleDate);
writeFileSync(labelsPath, JSON.stringify({
[label]: {
sessionKey,
status: 'running',
agent: 'main',
spawnedAt: new Date(Date.now() - staleMs).toISOString(),
timeoutSeconds: 3600,
thinking: 'high',
lastPing: new Date().toISOString(),
},
}) + '\n');
writeFileSync(mockDispatch, `
const sub = process.argv[2];
if (sub === 'status') {
process.stdout.write(JSON.stringify({
ok: true,
label: ${JSON.stringify(label)},
status: 'running',
sessionKey: ${JSON.stringify(sessionKey)},
agent: 'main',
liveness: { ageMs: ${staleMs} }
}) + '\\n');
} else if (sub === 'result') {
process.stdout.write(JSON.stringify({ ok: true, status: 'running' }) + '\\n');
} else {
process.stdout.write(JSON.stringify({ ok: true }) + '\\n');
}
`);

const run = spawnSync(process.execPath, [
watcherPath, '--label', label, '--timeout', '3600', '--poll-interval', '20', '--once',
], {
env: {
...process.env,
HOME: tempDir,
DISPATCH_INDEX_PATH: mockDispatch,
DISPATCH_LABELS_PATH: labelsPath,
OPENCLAW_SCHEDULER_NOTIFY_DISABLED: '1',
},
encoding: 'utf8',
timeout: 5000,
});
const labels = JSON.parse(readFileSync(labelsPath, 'utf8'));

assert.equal(run.status, 0);
assert.equal((run.stdout || '').trim(), '');
assert.match(run.stderr || '', /WATCHER_PENDING/);
assert.equal(labels[label].status, 'running');
} finally {
rmSync(tempDir, { recursive: true, force: true });
}
});

test('dispatch status keeps running when JSONL is fresher than sessions store', () => {
const tempDir = mkdtempSync(join(tmpdir(), 'dispatch-jsonl-live-'));
const labelsPath = join(tempDir, 'labels.json');
const sessionKey = 'agent:main:subagent:jsonl-live';
const sessionId = 'jsonl-live-session';
const sessionsDir = join(tempDir, '.openclaw', 'agents', 'main', 'sessions');
const dispatchIndex = join(__dirname, '..', 'dispatch', 'index.mjs');
const oldIso = new Date(Date.now() - 30 * 60_000).toISOString();

try {
mkdirSync(sessionsDir, { recursive: true });
writeFileSync(labelsPath, JSON.stringify({
'jsonl-live': {
sessionKey,
runId: 'run-jsonl-live',
agent: 'main',
status: 'running',
spawnedAt: oldIso,
timeoutSeconds: 600,
updatedAt: oldIso,
},
}, null, 2) + '\n');
writeFileSync(join(sessionsDir, 'sessions.json'), JSON.stringify({
[sessionKey]: {
sessionId,
updatedAt: Date.now() - 30 * 60_000,
sessionStartedAt: oldIso,
},
}, null, 2) + '\n');
writeFileSync(
join(sessionsDir, `${sessionId}.jsonl`),
JSON.stringify({ role: 'assistant', content: [{ type: 'text', text: 'Still working.' }], stop_reason: 'tool_use' }) + '\n',
);

const run = spawnSync(process.execPath, [dispatchIndex, 'status', '--label', 'jsonl-live'], {
env: {
...process.env,
HOME: tempDir,
DISPATCH_LABELS_PATH: labelsPath,
OPENCLAW_SCHEDULER_NOTIFY_DISABLED: '1',
},
encoding: 'utf8',
timeout: 10_000,
});

assert.equal(run.status, 0, run.stderr || run.stdout);
const result = JSON.parse(run.stdout);
assert.equal(result.status, 'running');
assert.equal(result.syncAction, undefined);

const labels = JSON.parse(readFileSync(labelsPath, 'utf8'));
assert.equal(labels['jsonl-live'].status, 'running');
} finally {
rmSync(tempDir, { recursive: true, force: true });
}
});

test('messages send accepts channel and delivery-to overrides for durable delivery', async (t) => {
const tempDir = mkdtempSync(join(tmpdir(), 'openclaw-scheduler-test-'));
const dbPath = join(tempDir, 'scheduler.sqlite');
Expand Down