Skip to content

Commit 8caae64

Browse files
authored
feat(core): support agent graceful shutdown (#909)
## AgentScope-Java Version 1.0.10-SNAPSHOT ## Description Closes #907 ## Checklist Please check the following items before code is ready to be reviewed. - [x] Code has been formatted with `mvn spotless:apply` - [x] All tests are passing (`mvn test`) - [x] Javadoc comments are complete and follow project conventions - [x] Related documentation has been updated (e.g. links, examples, etc.) - [x] Code is ready for review
1 parent 762672e commit 8caae64

26 files changed

+3193
-63
lines changed

agentscope-core/src/main/java/io/agentscope/core/ReActAgent.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import io.agentscope.core.hook.ReasoningChunkEvent;
3030
import io.agentscope.core.hook.SummaryChunkEvent;
3131
import io.agentscope.core.interruption.InterruptContext;
32+
import io.agentscope.core.interruption.InterruptSource;
3233
import io.agentscope.core.memory.InMemoryMemory;
3334
import io.agentscope.core.memory.LongTermMemory;
3435
import io.agentscope.core.memory.LongTermMemoryMode;
@@ -56,6 +57,9 @@
5657
import io.agentscope.core.rag.model.Document;
5758
import io.agentscope.core.rag.model.RetrieveConfig;
5859
import io.agentscope.core.session.Session;
60+
import io.agentscope.core.shutdown.AgentShuttingDownException;
61+
import io.agentscope.core.shutdown.GracefulShutdownManager;
62+
import io.agentscope.core.shutdown.PartialReasoningPolicy;
5963
import io.agentscope.core.skill.SkillBox;
6064
import io.agentscope.core.skill.SkillHook;
6165
import io.agentscope.core.state.AgentMetaState;
@@ -130,6 +134,8 @@
130134
public class ReActAgent extends StructuredOutputCapableAgent {
131135

132136
private static final Logger log = LoggerFactory.getLogger(ReActAgent.class);
137+
private static final GracefulShutdownManager shutdownManager =
138+
GracefulShutdownManager.getInstance();
133139

134140
// ==================== Core Dependencies ====================
135141

@@ -224,8 +230,15 @@ public void saveTo(Session session, SessionKey sessionKey) {
224230
* @param session the session to load state from
225231
* @param sessionKey the session identifier
226232
*/
233+
@Override
234+
public boolean loadIfExists(Session session, SessionKey sessionKey) {
235+
shutdownManager.bindSession(this, session, sessionKey);
236+
return super.loadIfExists(session, sessionKey);
237+
}
238+
227239
@Override
228240
public void loadFrom(Session session, SessionKey sessionKey) {
241+
shutdownManager.bindSession(this, session, sessionKey);
229242
// Load memory if managed
230243
if (statePersistence.memoryManaged()) {
231244
memory.loadFrom(session, sessionKey);
@@ -440,10 +453,19 @@ private Mono<Msg> reasoning(int iter, boolean ignoreMaxIters) {
440453
.onErrorResume(
441454
InterruptedException.class,
442455
error -> {
443-
// Save accumulated message before propagating interrupt
444456
Msg msg = context.buildFinalMessage();
445457
if (msg != null) {
446-
memory.addMessage(msg);
458+
boolean discard =
459+
getInterruptSource() == InterruptSource.SYSTEM
460+
&& shutdownManager
461+
.getConfig()
462+
.partialReasoningPolicy()
463+
== PartialReasoningPolicy.DISCARD;
464+
// Manually interruption will save the msg, while system
465+
// interruption will discard on specific config
466+
if (!discard) {
467+
memory.addMessage(msg);
468+
}
447469
}
448470
return Mono.error(error);
449471
})
@@ -932,6 +954,11 @@ private Mono<Void> notifySummaryChunk(
932954

933955
@Override
934956
protected Mono<Msg> handleInterrupt(InterruptContext context, Msg... originalArgs) {
957+
if (context.getSource() == InterruptSource.SYSTEM) {
958+
shutdownManager.saveOnInterruptObserved(this);
959+
return Mono.error(new AgentShuttingDownException());
960+
}
961+
935962
String recoveryText = "I noticed that you have interrupted me. What can I do for you?";
936963

937964
Msg recoveryMsg =

agentscope-core/src/main/java/io/agentscope/core/agent/AgentBase.java

Lines changed: 72 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import io.agentscope.core.interruption.InterruptContext;
2424
import io.agentscope.core.interruption.InterruptSource;
2525
import io.agentscope.core.message.Msg;
26+
import io.agentscope.core.shutdown.GracefulShutdownHook;
27+
import io.agentscope.core.shutdown.GracefulShutdownManager;
2628
import io.agentscope.core.state.StateModule;
2729
import io.agentscope.core.tracing.TracerRegistry;
2830
import java.util.ArrayList;
@@ -93,14 +95,18 @@ public abstract class AgentBase implements StateModule, Agent {
9395
private final AtomicBoolean running = new AtomicBoolean(false);
9496
private final boolean checkRunning;
9597
private final List<Hook> hooks;
96-
private static final List<Hook> systemHooks = new CopyOnWriteArrayList<>();
98+
private static final List<Hook> systemHooks =
99+
new CopyOnWriteArrayList<>(
100+
List.of(new GracefulShutdownHook(GracefulShutdownManager.getInstance())));
97101
private final Map<String, List<AgentBase>> hubSubscribers = new ConcurrentHashMap<>();
98102

99103
// Interrupt state management (available to all agents)
100104
private final AtomicBoolean interruptFlag = new AtomicBoolean(false);
101105
private final AtomicReference<Msg> userInterruptMessage = new AtomicReference<>(null);
102106
// Hook non-null
103107
private static final Comparator<Hook> HOOK_COMPARATOR = Comparator.comparingInt(Hook::priority);
108+
private final AtomicReference<InterruptSource> interruptSource =
109+
new AtomicReference<>(InterruptSource.USER);
104110

105111
/**
106112
* Constructor for AgentBase.
@@ -165,14 +171,7 @@ public final String getDescription() {
165171
@Override
166172
public final Mono<Msg> call(List<Msg> msgs) {
167173
return Mono.using(
168-
() -> {
169-
if (checkRunning && !running.compareAndSet(false, true)) {
170-
throw new IllegalStateException(
171-
"Agent is still running, please wait for it to finish");
172-
}
173-
resetInterruptFlag();
174-
return this;
175-
},
174+
this::acquireExecution,
176175
resource ->
177176
TracerRegistry.get()
178177
.callAgent(
@@ -185,7 +184,7 @@ public final Mono<Msg> call(List<Msg> msgs) {
185184
.onErrorResume(
186185
createErrorHandler(
187186
msgs.toArray(new Msg[0])))),
188-
resource -> running.set(false),
187+
this::releaseExecution,
189188
true);
190189
}
191190

@@ -201,14 +200,7 @@ public final Mono<Msg> call(List<Msg> msgs) {
201200
@Override
202201
public final Mono<Msg> call(List<Msg> msgs, Class<?> structuredOutputClass) {
203202
return Mono.using(
204-
() -> {
205-
if (checkRunning && !running.compareAndSet(false, true)) {
206-
throw new IllegalStateException(
207-
"Agent is still running, please wait for it to finish");
208-
}
209-
resetInterruptFlag();
210-
return this;
211-
},
203+
this::acquireExecution,
212204
resource ->
213205
TracerRegistry.get()
214206
.callAgent(
@@ -225,7 +217,7 @@ public final Mono<Msg> call(List<Msg> msgs, Class<?> structuredOutputClass) {
225217
.onErrorResume(
226218
createErrorHandler(
227219
msgs.toArray(new Msg[0])))),
228-
resource -> running.set(false),
220+
this::releaseExecution,
229221
true);
230222
}
231223

@@ -241,14 +233,7 @@ public final Mono<Msg> call(List<Msg> msgs, Class<?> structuredOutputClass) {
241233
@Override
242234
public final Mono<Msg> call(List<Msg> msgs, JsonNode schema) {
243235
return Mono.using(
244-
() -> {
245-
if (checkRunning && !running.compareAndSet(false, true)) {
246-
throw new IllegalStateException(
247-
"Agent is still running, please wait for it to finish");
248-
}
249-
resetInterruptFlag();
250-
return this;
251-
},
236+
this::acquireExecution,
252237
resource ->
253238
TracerRegistry.get()
254239
.callAgent(
@@ -261,7 +246,7 @@ public final Mono<Msg> call(List<Msg> msgs, JsonNode schema) {
261246
.onErrorResume(
262247
createErrorHandler(
263248
msgs.toArray(new Msg[0])))),
264-
resource -> running.set(false),
249+
this::releaseExecution,
265250
true);
266251
}
267252

@@ -318,6 +303,7 @@ public static void removeSystemHook(Hook hook) {
318303
*/
319304
@Override
320305
public void interrupt() {
306+
interruptSource.set(InterruptSource.USER);
321307
interruptFlag.set(true);
322308
}
323309

@@ -329,12 +315,23 @@ public void interrupt() {
329315
*/
330316
@Override
331317
public void interrupt(Msg msg) {
318+
interruptSource.set(InterruptSource.USER);
332319
interruptFlag.set(true);
333320
if (msg != null) {
334321
userInterruptMessage.set(msg);
335322
}
336323
}
337324

325+
/**
326+
* Interrupt execution with explicit source.
327+
*
328+
* @param source interruption source
329+
*/
330+
public void interrupt(InterruptSource source) {
331+
interruptSource.set(source != null ? source : InterruptSource.SYSTEM);
332+
interruptFlag.set(true);
333+
}
334+
338335
/**
339336
* Check if the agent execution has been interrupted (reactive version).
340337
* Returns a Mono that completes normally if not interrupted, or errors with
@@ -376,6 +373,7 @@ protected Mono<Void> checkInterruptedAsync() {
376373
protected void resetInterruptFlag() {
377374
interruptFlag.set(false);
378375
userInterruptMessage.set(null);
376+
interruptSource.set(InterruptSource.USER);
379377
}
380378

381379
/**
@@ -386,11 +384,47 @@ protected void resetInterruptFlag() {
386384
*/
387385
private InterruptContext createInterruptContext() {
388386
return InterruptContext.builder()
389-
.source(InterruptSource.USER)
387+
.source(interruptSource.get())
390388
.userMessage(userInterruptMessage.get())
391389
.build();
392390
}
393391

392+
/**
393+
* Acquire execution resources for a {@code call()} invocation.
394+
* Used as the {@code resourceSupplier} in {@link Mono#using} to guarantee that
395+
* {@link #releaseExecution} is always called on completion, error, or cancellation.
396+
*
397+
* @return this agent instance
398+
*/
399+
private AgentBase acquireExecution() {
400+
if (checkRunning && !running.compareAndSet(false, true)) {
401+
throw new IllegalStateException("Agent is still running, please wait for it to finish");
402+
}
403+
try {
404+
resetInterruptFlag();
405+
GracefulShutdownManager.getInstance().ensureAcceptingRequests();
406+
GracefulShutdownManager.getInstance().registerRequest(this);
407+
} catch (RuntimeException ex) {
408+
if (checkRunning) {
409+
running.set(false);
410+
}
411+
throw ex;
412+
}
413+
return this;
414+
}
415+
416+
/**
417+
* Release execution resources after a {@code call()} invocation.
418+
* Used as the {@code resourceCleanup} in {@link Mono#using} — guaranteed to run
419+
* regardless of how the reactive chain terminates (success, error, or cancel).
420+
*
421+
* @param resource the agent instance (ignored, uses {@code this})
422+
*/
423+
private void releaseExecution(AgentBase resource) {
424+
running.set(false);
425+
GracefulShutdownManager.getInstance().unregisterRequest(this);
426+
}
427+
394428
/**
395429
* Create error handler for call() methods.
396430
* Handles InterruptedException specially and delegates to handleInterrupt,
@@ -420,6 +454,15 @@ protected AtomicBoolean getInterruptFlag() {
420454
return interruptFlag;
421455
}
422456

457+
/**
458+
* Get current interruption source.
459+
*
460+
* @return interruption source
461+
*/
462+
protected InterruptSource getInterruptSource() {
463+
return interruptSource.get();
464+
}
465+
423466
/**
424467
* Observe a message without generating a reply.
425468
* This allows agents to receive messages from other agents or the environment

agentscope-core/src/main/java/io/agentscope/core/model/transport/HttpTransportFactory.java

Lines changed: 3 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,9 @@
4747
* HttpTransportFactory.register(myTransport);
4848
* }</pre>
4949
*
50-
* <p>All registered transports and the default transport will be automatically
51-
* closed when the JVM shuts down.
50+
* <p>All registered transports and the default transport will be closed
51+
* during JVM shutdown, orchestrated by {@code AgentScopeJvmShutdownHook}
52+
* after all agents have finished processing.
5253
*/
5354
public final class HttpTransportFactory {
5455

@@ -60,9 +61,6 @@ public final class HttpTransportFactory {
6061
/** List of all managed transports for cleanup. */
6162
private static final List<HttpTransport> managedTransports = new CopyOnWriteArrayList<>();
6263

63-
/** Whether the shutdown hook has been registered. */
64-
private static volatile boolean shutdownHookRegistered = false;
65-
6664
/** Lock object for synchronization. */
6765
private static final Object lock = new Object();
6866

@@ -85,7 +83,6 @@ public static HttpTransport getDefault() {
8583
if (defaultTransport == null) {
8684
defaultTransport = new JdkHttpTransport();
8785
managedTransports.add(defaultTransport);
88-
registerShutdownHookIfNeeded();
8986
log.debug("Created default HttpTransport: {}", defaultTransport);
9087
}
9188
}
@@ -107,7 +104,6 @@ public static void setDefault(HttpTransport transport) {
107104
if (transport != null && !managedTransports.contains(transport)) {
108105
managedTransports.add(transport);
109106
}
110-
registerShutdownHookIfNeeded();
111107
log.debug("Set default HttpTransport: {}", transport);
112108
}
113109
}
@@ -124,7 +120,6 @@ public static void setDefault(HttpTransport transport) {
124120
public static void register(HttpTransport transport) {
125121
if (transport != null && !managedTransports.contains(transport)) {
126122
managedTransports.add(transport);
127-
registerShutdownHookIfNeeded();
128123
log.debug("Registered HttpTransport for management: {}", transport);
129124
}
130125
}
@@ -146,30 +141,6 @@ public static boolean unregister(HttpTransport transport) {
146141
return removed;
147142
}
148143

149-
/**
150-
* Register the JVM shutdown hook if not already registered.
151-
*/
152-
private static void registerShutdownHookIfNeeded() {
153-
if (!shutdownHookRegistered) {
154-
synchronized (lock) {
155-
if (!shutdownHookRegistered) {
156-
Runtime.getRuntime()
157-
.addShutdownHook(
158-
new Thread(
159-
() -> {
160-
log.debug(
161-
"Shutdown hook triggered, closing all"
162-
+ " HttpTransports");
163-
shutdown();
164-
},
165-
"HttpTransportFactory-ShutdownHook"));
166-
shutdownHookRegistered = true;
167-
log.debug("Registered HttpTransport shutdown hook");
168-
}
169-
}
170-
}
171-
}
172-
173144
/**
174145
* Shutdown and close all managed transports.
175146
*

agentscope-core/src/main/java/io/agentscope/core/session/InMemorySession.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,15 @@ public void delete(SessionKey sessionKey) {
170170
sessions.remove(sessionKeyStr);
171171
}
172172

173+
@Override
174+
public void delete(SessionKey sessionKey, String key) {
175+
String sessionKeyStr = serializeSessionKey(sessionKey);
176+
SessionData data = sessions.get(sessionKeyStr);
177+
if (data != null) {
178+
data.removeSingleState(key);
179+
}
180+
}
181+
173182
/**
174183
* List all session keys.
175184
*
@@ -224,6 +233,10 @@ State getSingleState(String key) {
224233
return singleStates.get(key);
225234
}
226235

236+
void removeSingleState(String key) {
237+
singleStates.remove(key);
238+
}
239+
227240
void setListState(String key, List<? extends State> values) {
228241
// Use List.copyOf for immutable copy to prevent external modification
229242
listStates.put(key, List.copyOf(values));

0 commit comments

Comments
 (0)