Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,18 @@ data class GraphAgent(
*/
val x402Budgets: List<X402BudgetedResource>,

/**
* A set of events that should cause this agent to enter a sleeping state
* @see GraphAgentRequest.sleepEvents
*/
val sleepEvents: Set<SleepEvent> = emptySet(),

/**
* A set of events that should cause this agent to wake from a sleeping state
* @see GraphAgentRequest.sleepEvents
*/
val wakeEvents: Set<WakeEvent> = emptySet(),

/**
* Runtime secret ID. This is given to agents as an environment variable so that they may identify themselves to
* the server securely. This is useful for example, when consuming x402 budgets, we do not want to let agent A
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ data class GraphAgentRequest(

@Description("An optional list of resources and an accompanied budget that this agent may spend on services that accept x402 payments")
val x402Budgets: List<X402BudgetedResource> = listOf(),

@Description("Events that should cause this agent to go to sleep")
@SerialName("sleepEvents")
val sleepEvents: Set<SleepEvent> = emptySet(),

@Description("Events that should cause this agent to wake up")
@SerialName("wakeEvents")
val wakeEvents: Set<WakeEvent> = emptySet(),
) {
/**
* Given a reference to the agent registry [AgentRegistry], this function will attempt to convert this request into
Expand Down Expand Up @@ -124,6 +132,38 @@ data class GraphAgentRequest(
plugins = plugins,
provider = provider,
x402Budgets = x402Budgets,
sleepEvents = sleepEvents,
wakeEvents = wakeEvents,
)
}
}

/**
* Events that will put the agent into a sleeping state.
*/
@Serializable
enum class SleepEvent {
@SerialName("agent_started")
AGENT_STARTED,

@SerialName("removed_from_last_thread")
REMOVED_FROM_LAST_THREAD,

@SerialName("last_thread_closed")
LAST_THREAD_CLOSED,
}

/**
* Events that will wake the agent from a sleeping state.
*/
@Serializable
enum class WakeEvent {
@SerialName("added_to_any_thread")
ADDED_TO_ANY_THREAD,

@SerialName("added_to_thread_first_time")
ADDED_TO_THREAD_FIRST_TIME,

@SerialName("mentioned")
MENTIONED,
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ abstract class McpTool<T>() {
internal abstract suspend fun execute(mcpServer: CoralAgentIndividualMcp, arguments: T): McpToolResult

internal suspend fun executeRaw(mcpServer: CoralAgentIndividualMcp, request: CallToolRequest): CallToolResult {
mcpServer.localSession.awaitAwake(mcpServer.connectedAgentId)

val arguments = try {
apiJsonConfig.decodeFromString(argumentsSerializer, request.arguments.toString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import io.modelcontextprotocol.kotlin.sdk.TextResourceContents
import org.coralprotocol.coralserver.mcp.McpResources.AGENT_RESOURCE_URI
import org.coralprotocol.coralserver.server.CoralAgentIndividualMcp

private fun CoralAgentIndividualMcp.handler(request: ReadResourceRequest): ReadResourceResult {
private suspend fun CoralAgentIndividualMcp.handler(request: ReadResourceRequest): ReadResourceResult {
localSession.awaitAwake(connectedAgentId)
val otherAgents = localSession
.agents
.filter { (name, _) -> name != connectedAgentId }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ and expect or require a response from another agent, use the $WAIT_FOR_MENTIONS
In most cases assistant message output will not reach the user. Use tooling where possible to communicate with the user instead.
"""

private fun handle(request: ReadResourceRequest): ReadResourceResult {
private suspend fun CoralAgentIndividualMcp.handle(request: ReadResourceRequest): ReadResourceResult {
localSession.awaitAwake(connectedAgentId)
return ReadResourceResult(
contents = listOf(
TextResourceContents(
Expand All @@ -39,14 +40,14 @@ fun CoralAgentIndividualMcp.addInstructionResource() {
description = "Coral instructions resource",
uri = INSTRUCTION_RESOURCE_URI.toString(),
mimeType = "text/markdown",
readHandler = ::handle,
readHandler = { request -> this.handle(request) },
)
// deprecated
addResource(
name = "instructions",
description = "Coral instructions resource",
uri = "Instruction.resource",
mimeType = "text/markdown",
readHandler = ::handle,
readHandler = { request -> this.handle(request) },
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import org.coralprotocol.coralserver.models.ResolvedThread
import org.coralprotocol.coralserver.models.resolve
import org.coralprotocol.coralserver.server.CoralAgentIndividualMcp

private fun CoralAgentIndividualMcp.handler(request: ReadResourceRequest): ReadResourceResult {
private suspend fun CoralAgentIndividualMcp.handler(request: ReadResourceRequest): ReadResourceResult {
localSession.awaitAwake(connectedAgentId)
val threadsAgentPrivyIn: List<ResolvedThread> = this.localSession.getAllThreadsAgentParticipatesIn(this.connectedAgentId).map { it -> it.resolve() }
val renderedThreads: String = XML.encodeToString(threadsAgentPrivyIn, rootName = QName("threads"))
return ReadResourceResult(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package org.coralprotocol.coralserver.routes.api.v1

import io.github.smiley4.ktoropenapi.resources.get
import io.github.smiley4.ktoropenapi.resources.post
import io.github.smiley4.ktoropenapi.resources.put
import io.ktor.http.*
import io.ktor.resources.*
import io.ktor.server.request.*
import io.ktor.server.response.*
import io.ktor.server.routing.*
import kotlinx.serialization.Serializable
import org.coralprotocol.coralserver.server.RouteException
import org.coralprotocol.coralserver.session.LocalSessionManager

@Resource("/api/v1/sessions/{sessionId}/{agentId}/sleeping")
class Sleeping(val sessionId: String, val agentId: String)

@Serializable
data class SleepState(val sleeping: Boolean)


fun Routing.sleepingRoutes(localSessionManager: LocalSessionManager) {

get<Sleeping>({
summary = "Get agent sleeping state"
description = "Returns whether a given agent instance in a session is sleeping"
operationId = "getAgentSleeping"
request {
pathParameter<String>("sessionId") { description = "The session ID" }
pathParameter<String>("agentId") { description = "The agent ID within the session" }
}
response {
HttpStatusCode.OK to {
description = "Success"
body<SleepState> { description = "Current sleeping state" }
}
HttpStatusCode.NotFound to {
description = "Session or agent not found"
body<RouteException> { description = "Error details" }
}
}
}) {
val session = localSessionManager.getSession(it.sessionId) ?: throw RouteException(HttpStatusCode.NotFound, "Session not found")
val agent = session.getAgent(it.agentId) ?: throw RouteException(HttpStatusCode.NotFound, "Agent not found in session")
call.respond(HttpStatusCode.OK, SleepState(agent.sleeping))
}

put<Sleeping>({
summary = "Set agent sleeping state"
description = "Sets whether a given agent instance in a session is sleeping"
operationId = "setAgentSleeping"
request {
body<SleepState> { description = "Desired sleeping state" }
}
response {
HttpStatusCode.OK to {
description = "Success"
body<SleepState> { description = "Updated sleeping state" }
}
HttpStatusCode.NotFound to {
description = "Session or agent not found"
body<RouteException> { description = "Error details" }
}
}
}) { sleeping ->
val body = call.receive<SleepState>()
val result = setSleepingState(localSessionManager, sleeping.sessionId, sleeping.agentId, body.sleeping)
call.respond(HttpStatusCode.OK, result)
}


// TODO: This POST endpoint duplicates the PUT behavior. Not strictly REST-compliant,
// but provided because custom tools can only issue POST requests.
post<Sleeping>({
summary = "Set agent sleeping state (POST)"
description = "Same behavior as PUT: sets whether a given agent instance in a session is sleeping"
operationId = "setAgentSleepingPost"
request {
body<SleepState> { description = "Desired sleeping state" }
}
response {
HttpStatusCode.OK to {
description = "Success"
body<SleepState> { description = "Updated sleeping state" }
}
HttpStatusCode.NotFound to {
description = "Session or agent not found"
body<RouteException> { description = "Error details" }
}
}
}) { sleeping ->
val body = call.receive<SleepState>()
val result = setSleepingState(localSessionManager, sleeping.sessionId, sleeping.agentId, body.sleeping)
call.respond(HttpStatusCode.OK, result)
}
}

private fun setSleepingState(
localSessionManager: LocalSessionManager,
sessionId: String,
agentId: String,
desiredSleeping: Boolean
): SleepState {
val session = localSessionManager.getSession(sessionId)
?: throw RouteException(HttpStatusCode.NotFound, "Session not found")
if (!session.setAgentSleeping(agentId, desiredSleeping)) {
throw RouteException(HttpStatusCode.NotFound, "Agent not found in session")
}
return SleepState(session.isAgentSleeping(agentId))
}
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ class CoralServer(
debugApiRoutes(localSessionManager)
sessionApiRoutes(registry, localSessionManager, devmode)
messageApiRoutes(mcpServersByTransportId, localSessionManager, remoteSessionManager)
sleepingRoutes(localSessionManager)
telemetryApiRoutes(localSessionManager)
documentationApiRoutes()
agentApiRoutes(registry, blockchainService, remoteSessionManager, jupiterService, config.paymentConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@ package org.coralprotocol.coralserver.session
import io.github.oshai.kotlinlogging.KotlinLogging
import io.ktor.util.collections.*
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.ensureActive
import kotlinx.coroutines.currentCoroutineContext
import org.coralprotocol.coralserver.EventBus
import org.coralprotocol.coralserver.agent.graph.AgentGraph
import org.coralprotocol.coralserver.agent.graph.SleepEvent
import org.coralprotocol.coralserver.agent.graph.WakeEvent
import org.coralprotocol.coralserver.models.Message
import org.coralprotocol.coralserver.models.Thread
import org.coralprotocol.coralserver.models.resolve
Expand Down Expand Up @@ -45,6 +49,10 @@ class LocalSession(
private val eventBus = EventBus<SessionEvent>()
val events get() = eventBus.events

private val firstThreadAdded = ConcurrentSet<String>()

private val sleepingWaiters = ConcurrentHashMap<String, CompletableDeferred<Unit>>()

init {
agentGraph?.run {
for (id in agents.keys) {
Expand Down Expand Up @@ -128,6 +136,13 @@ class LocalSession(
)
agents[sessionAgent.id] = sessionAgent

graphAgent?.sleepEvents?.let { events ->
if (events.contains(SleepEvent.AGENT_STARTED)) {
setAgentSleeping(agentId, true)
}
}
// TODO: eventBus.emit(SessionEvent.AgentRegistered(sessionAgent)) ?

return sessionAgent
}

Expand Down Expand Up @@ -202,6 +217,18 @@ class LocalSession(
if (!thread.participants.contains(participantId)) {
thread.participants.add(participantId)
lastReadMessageIndex[Pair(participantId, threadId)] = thread.messages.size

val graphAgent = agentGraph?.agents[participantId]
val isFirstTime = !firstThreadAdded.contains(participantId)
if (graphAgent != null) { // else devmode
if (graphAgent.wakeEvents.contains(WakeEvent.ADDED_TO_ANY_THREAD)) {
setAgentSleeping(participantId, false)
}
if (isFirstTime && graphAgent.wakeEvents.contains(WakeEvent.ADDED_TO_THREAD_FIRST_TIME)) {
setAgentSleeping(participantId, false)
}
}
if (isFirstTime) firstThreadAdded.add(participantId)
}
return true
}
Expand All @@ -211,7 +238,15 @@ class LocalSession(

if (thread.isClosed) return false

return thread.participants.remove(participantId)
val wasActuallyRemoved = thread.participants.remove(participantId)
if (wasActuallyRemoved) {
val hasRemainingOpenThreads = threads.values.any { it.participants.contains(participantId) && !it.isClosed }
val graphAgent = agentGraph?.agents[participantId]
if (!hasRemainingOpenThreads && graphAgent?.sleepEvents?.contains(SleepEvent.REMOVED_FROM_LAST_THREAD) == true) {
setAgentSleeping(participantId, true)
}
}
return wasActuallyRemoved
}

fun closeThread(threadId: String, summary: String): Boolean {
Expand All @@ -220,6 +255,16 @@ class LocalSession(
thread.isClosed = true
thread.summary = summary

// Sleep agents that now have no open threads if configured
val participantsSnapshot = thread.participants.toList()
participantsSnapshot.forEach { participantId ->
val graphAgent = agentGraph?.agents[participantId]
val hasAnyOpenThreads = threads.values.any { it.participants.contains(participantId) && !it.isClosed }
if (!hasAnyOpenThreads && graphAgent?.sleepEvents?.contains(SleepEvent.LAST_THREAD_CLOSED) == true) {
setAgentSleeping(participantId, true)
}
}

return true
}

Expand Down Expand Up @@ -266,6 +311,11 @@ class LocalSession(
if (deferred != null && !deferred.isCompleted) {
deferred.complete(listOf(message))
}
// Wake on mention if configured
val graphAgent = agentGraph?.agents[mentionId]
if (graphAgent?.wakeEvents?.contains(WakeEvent.MENTIONED) == true) {
setAgentSleeping(mentionId, false)
}
}
}

Expand Down Expand Up @@ -332,4 +382,36 @@ class LocalSession(
super.destroy(sessionCloseMode)
clearAll()
}

fun isAgentSleeping(agentId: String): Boolean {
return agents[agentId]?.sleeping ?: false
}

fun setAgentSleeping(agentId: String, sleeping: Boolean): Boolean {
val agent = agents[agentId] ?: return false
agent.sleeping = sleeping
if (!sleeping) {
sleepingWaiters.remove(agentId)?.complete(Unit)
} else {
sleepingWaiters.remove(agentId)
}
return true
}

suspend fun awaitAwake(agentId: String) {
val agent = agents[agentId] ?: return
if (!agent.sleeping) return
var waiter = sleepingWaiters[agentId]
if (waiter == null) {
val newWaiter = CompletableDeferred<Unit>()
val prev = sleepingWaiters.putIfAbsent(agentId, newWaiter)
waiter = prev ?: newWaiter
}
if (!agent.sleeping) {
sleepingWaiters.remove(agentId)?.complete(Unit)
return
}
currentCoroutineContext().ensureActive()
waiter.await()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ data class SessionAgent(
val id: String,
var description: String = "",
var state: SessionAgentState = SessionAgentState.Disconnected,
var sleeping: Boolean = false,
var mcpUrl: String?,
val extraTools: Set<CustomTool> = setOf(),
val coralPlugins: Set<GraphAgentPlugin> = setOf(),
Expand Down
Loading