From 111b541bc90ebf87b7617413a1c386805c67896a Mon Sep 17 00:00:00 2001 From: Yakiv Yereskovskyi Date: Wed, 26 Mar 2025 22:28:23 +0000 Subject: [PATCH 1/2] propagate transport coroutine context --- .../kotlinx/rpc/krpc/server/KrpcServer.kt | 4 +- .../test/CoroutineContextPropagationTest.kt | 57 +++++++++++++++++++ .../kotlinx/rpc/krpc/test/LocalTransport.kt | 9 ++- 3 files changed, 65 insertions(+), 5 deletions(-) create mode 100644 krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/CoroutineContextPropagationTest.kt diff --git a/krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/KrpcServer.kt b/krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/KrpcServer.kt index 46ae5cc7..a320d3a6 100644 --- a/krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/KrpcServer.kt +++ b/krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/KrpcServer.kt @@ -47,7 +47,7 @@ public abstract class KrpcServer( transport: KrpcTransport, ) : RpcServer, KrpcEndpoint { // we make a child here, so we can send cancellation messages before closing the connection - final override val coroutineContext: CoroutineContext = SupervisorJob(transport.coroutineContext.job) + final override val coroutineContext: CoroutineContext = transport.coroutineContext + SupervisorJob(transport.coroutineContext.job) private val logger = CommonLogger.logger(objectId()) @@ -130,7 +130,7 @@ public abstract class KrpcServer( descriptor: RpcServiceDescriptor, serviceFactory: (CoroutineContext) -> Service, ): KrpcServerService { - val serviceInstanceContext = SupervisorJob(coroutineContext.job) + val serviceInstanceContext = coroutineContext + SupervisorJob(coroutineContext.job) return KrpcServerService( service = serviceFactory(serviceInstanceContext), diff --git a/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/CoroutineContextPropagationTest.kt b/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/CoroutineContextPropagationTest.kt new file mode 100644 index 00000000..a2944b78 --- /dev/null +++ b/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/CoroutineContextPropagationTest.kt @@ -0,0 +1,57 @@ +/* + * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.rpc.krpc.test + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.currentCoroutineContext +import kotlinx.coroutines.test.runTest +import kotlinx.rpc.krpc.rpcClientConfig +import kotlinx.rpc.krpc.rpcServerConfig +import kotlinx.rpc.krpc.serialization.json.json +import kotlinx.rpc.withService +import kotlin.coroutines.CoroutineContext +import kotlin.test.Test +import kotlin.test.assertEquals + +class CoroutineContextPropagationTest { + private val rpcServerConfig = rpcServerConfig { + serialization { + json() + } + } + private val rpcClientConfig = rpcClientConfig { + serialization { + json { + ignoreUnknownKeys = true + } + } + } + + object CoroutineElement : CoroutineContext.Element { + object Key : CoroutineContext.Key + + override val key: CoroutineContext.Key<*> = Key + } + + @Test + fun test() = runTest { + var actualContext: CoroutineElement? = null + val transport = LocalTransport(CoroutineScope(CoroutineElement)) + val server = KrpcTestServer(rpcServerConfig, transport.server) + val client = KrpcTestClient(rpcClientConfig, transport.client) + server.registerService(Echo::class) { + object : Echo { + override suspend fun echo(message: String): String = run { + actualContext = currentCoroutineContext().get(CoroutineElement.Key) + "response" + } + + override val coroutineContext: CoroutineContext = it + } + } + client.withService(Echo::class).echo("request") + assertEquals(actualContext, CoroutineElement) + } +} \ No newline at end of file diff --git a/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/LocalTransport.kt b/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/LocalTransport.kt index d4cda550..257e9fc7 100644 --- a/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/LocalTransport.kt +++ b/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/LocalTransport.kt @@ -14,14 +14,16 @@ import kotlinx.rpc.krpc.KrpcTransportMessage import kotlin.coroutines.CoroutineContext class LocalTransport(parentScope: CoroutineScope? = null) : CoroutineScope { - override val coroutineContext = parentScope?.run { SupervisorJob(coroutineContext.job) } + override val coroutineContext = parentScope + ?.run { coroutineContext + SupervisorJob(coroutineContext.job) } ?: SupervisorJob() private val clientIncoming = Channel() private val serverIncoming = Channel() val client: KrpcTransport = object : KrpcTransport { - override val coroutineContext: CoroutineContext = Job(this@LocalTransport.coroutineContext.job) + override val coroutineContext: CoroutineContext = this@LocalTransport.coroutineContext + + Job(this@LocalTransport.coroutineContext.job) override suspend fun send(message: KrpcTransportMessage) { serverIncoming.send(message) @@ -33,7 +35,8 @@ class LocalTransport(parentScope: CoroutineScope? = null) : CoroutineScope { } val server: KrpcTransport = object : KrpcTransport { - override val coroutineContext: CoroutineContext = Job(this@LocalTransport.coroutineContext) + override val coroutineContext: CoroutineContext = this@LocalTransport.coroutineContext + + Job(this@LocalTransport.coroutineContext.job) override suspend fun send(message: KrpcTransportMessage) { clientIncoming.send(message) From 2a12d33bb387f9f92b641f6a27ce23034b703141 Mon Sep 17 00:00:00 2001 From: Yakiv Yereskovskyi Date: Fri, 28 Mar 2025 13:42:31 +0000 Subject: [PATCH 2/2] propagate transport coroutine context --- .../test/CoroutineContextPropagationTest.kt | 2 +- .../kotlinx/rpc/krpc/test/LocalTransport.kt | 19 +++++++++++++------ 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/CoroutineContextPropagationTest.kt b/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/CoroutineContextPropagationTest.kt index a2944b78..07ae8401 100644 --- a/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/CoroutineContextPropagationTest.kt +++ b/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/CoroutineContextPropagationTest.kt @@ -38,7 +38,7 @@ class CoroutineContextPropagationTest { @Test fun test() = runTest { var actualContext: CoroutineElement? = null - val transport = LocalTransport(CoroutineScope(CoroutineElement)) + val transport = LocalTransport(transportContext = CoroutineElement) val server = KrpcTestServer(rpcServerConfig, transport.server) val client = KrpcTestClient(rpcClientConfig, transport.client) server.registerService(Echo::class) { diff --git a/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/LocalTransport.kt b/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/LocalTransport.kt index 257e9fc7..d0df9f91 100644 --- a/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/LocalTransport.kt +++ b/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/LocalTransport.kt @@ -13,17 +13,22 @@ import kotlinx.rpc.krpc.KrpcTransport import kotlinx.rpc.krpc.KrpcTransportMessage import kotlin.coroutines.CoroutineContext -class LocalTransport(parentScope: CoroutineScope? = null) : CoroutineScope { +class LocalTransport( + parentScope: CoroutineScope? = null, + transportContext: CoroutineContext? = null, +) : CoroutineScope { override val coroutineContext = parentScope - ?.run { coroutineContext + SupervisorJob(coroutineContext.job) } + ?.run { SupervisorJob(coroutineContext.job) } ?: SupervisorJob() private val clientIncoming = Channel() private val serverIncoming = Channel() val client: KrpcTransport = object : KrpcTransport { - override val coroutineContext: CoroutineContext = this@LocalTransport.coroutineContext + - Job(this@LocalTransport.coroutineContext.job) + override val coroutineContext: CoroutineContext = Job(this@LocalTransport.coroutineContext.job).let { + if(transportContext != null) transportContext + it + else it + } override suspend fun send(message: KrpcTransportMessage) { serverIncoming.send(message) @@ -35,8 +40,10 @@ class LocalTransport(parentScope: CoroutineScope? = null) : CoroutineScope { } val server: KrpcTransport = object : KrpcTransport { - override val coroutineContext: CoroutineContext = this@LocalTransport.coroutineContext + - Job(this@LocalTransport.coroutineContext.job) + override val coroutineContext: CoroutineContext = Job(this@LocalTransport.coroutineContext.job).let { + if(transportContext != null) transportContext + it + else it + } override suspend fun send(message: KrpcTransportMessage) { clientIncoming.send(message)