Skip to content

Commit 4dd0148

Browse files
committed
Move CoroutineScope from RpcClient/RpcServer to internal implementation details
1 parent 6711558 commit 4dd0148

File tree

8 files changed

+28
-33
lines changed

8 files changed

+28
-33
lines changed

core/src/commonMain/kotlin/kotlinx/rpc/RpcClient.kt

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,7 @@
55
package kotlinx.rpc
66

77
import kotlinx.coroutines.CoroutineScope
8-
import kotlinx.coroutines.Deferred
98
import kotlinx.coroutines.flow.Flow
10-
import kotlin.coroutines.CoroutineContext
119

1210
@Deprecated("Use RpcClient instead", ReplaceWith("RpcClient"), level = DeprecationLevel.ERROR)
1311
public typealias RPCClient = RpcClient
@@ -17,7 +15,7 @@ public typealias RPCClient = RpcClient
1715
* transform them, send to the server and handle responses and errors.
1816
* [CoroutineScope] defines the lifetime of the client.
1917
*/
20-
public interface RpcClient : CoroutineScope {
18+
public interface RpcClient {
2119
/**
2220
* This method is used by generated clients to perform a call to the server.
2321
*

core/src/commonMain/kotlin/kotlinx/rpc/RpcServer.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
package kotlinx.rpc
66

7-
import kotlinx.coroutines.CoroutineScope
87
import kotlinx.rpc.annotations.Rpc
98
import kotlin.reflect.KClass
109

@@ -15,7 +14,7 @@ public typealias RPCServer = RpcServer
1514
* RpcServer is used to accept RPC messages, route them to a specific service, and process given responses.
1615
* Server may contain multiple services.
1716
*/
18-
public interface RpcServer : CoroutineScope {
17+
public interface RpcServer {
1918
/**
2019
* Registers new service to the server. Server will route all designated messages to it.
2120
* Service of any type should be unique on the server, but RpcServer doesn't specify the actual retention policy.

krpc/krpc-client/src/commonMain/kotlin/kotlinx/rpc/krpc/client/KrpcClient.kt

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ public abstract class KrpcClient(
6161
private val config: KrpcConfig.Client,
6262
transport: KrpcTransport,
6363
) : RpcClient, KrpcEndpoint {
64-
final override val coroutineContext: CoroutineContext = SupervisorJob(transport.coroutineContext.job)
64+
@InternalRpcApi
65+
public val internalScope: CoroutineScope = CoroutineScope(SupervisorJob(transport.coroutineContext.job))
6566

6667
// we make a child here, so we can send cancellation messages before closing the connection
6768
private val connector by lazy {
@@ -93,7 +94,7 @@ public abstract class KrpcClient(
9394
private var clientCancelled = false
9495

9596
init {
96-
coroutineContext.job.invokeOnCompletion(onCancelling = true) {
97+
internalScope.coroutineContext.job.invokeOnCompletion(onCancelling = true) {
9798
clientCancelled = true
9899

99100
sendCancellation(CancellationType.ENDPOINT, null, null, closeTransportAfterSending = true)
@@ -102,12 +103,12 @@ public abstract class KrpcClient(
102103
requestChannels.clear()
103104
}
104105

105-
launch(CoroutineName("krpc-client-generic-messages")) {
106+
internalScope.launch(CoroutineName("krpc-client-generic-messages")) {
106107
connector.subscribeToGenericMessages(::handleGenericMessage)
107108
}
108109
}
109110

110-
private val initHandshake: Job = launch(CoroutineName("krpc-client-handshake")) {
111+
private val initHandshake: Job = internalScope.launch(CoroutineName("krpc-client-handshake")) {
111112
connector.sendMessage(KrpcProtocolMessage.Handshake(KrpcPlugin.ALL))
112113

113114
connector.subscribeToProtocolMessages(::handleProtocolMessage)
@@ -284,7 +285,7 @@ public abstract class KrpcClient(
284285
final override suspend fun handleCancellation(message: KrpcGenericMessage) {
285286
when (val type = message.cancellationType()) {
286287
CancellationType.ENDPOINT -> {
287-
cancel("Closing client after server cancellation") // we cancel this client
288+
internalScope.cancel("Closing client after server cancellation") // we cancel this client
288289
}
289290

290291
else -> {

krpc/krpc-ktor/krpc-ktor-server/src/commonMain/kotlin/kotlinx/rpc/krpc/ktor/server/KtorServerDsl.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,6 @@ private fun Route.createRpcServer(rpcRouteBuilder: KrpcRoute.() -> Unit) {
5454
registration(server)
5555
}
5656

57-
server.coroutineContext.job.join()
57+
server.internalScope.coroutineContext.job.join()
5858
}
5959
}

krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/KrpcServer.kt

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ public abstract class KrpcServer(
4747
private val config: KrpcConfig.Server,
4848
transport: KrpcTransport,
4949
) : RpcServer, KrpcEndpoint {
50-
// we make a child here, so we can send cancellation messages before closing the connection
51-
final override val coroutineContext: CoroutineContext = SupervisorJob(transport.coroutineContext.job)
50+
@InternalRpcApi
51+
public val internalScope: CoroutineScope = CoroutineScope(SupervisorJob(transport.coroutineContext.job))
5252

5353
private val logger = RpcInternalCommonLogger.logger(rpcInternalObjectId())
5454

@@ -73,13 +73,13 @@ public abstract class KrpcServer(
7373
private var cancelledByClient = false
7474

7575
init {
76-
coroutineContext.job.invokeOnCompletion(onCancelling = true) {
76+
internalScope.coroutineContext.job.invokeOnCompletion(onCancelling = true) {
7777
if (!cancelledByClient) {
7878
sendCancellation(CancellationType.ENDPOINT, null, null, closeTransportAfterSending = true)
7979
}
8080
}
8181

82-
launch(CoroutineName("krpc-server-generic-protocol-messages")) {
82+
internalScope.launch(CoroutineName("krpc-server-generic-protocol-messages")) {
8383
connector.subscribeToProtocolMessages(::handleProtocolMessage)
8484

8585
connector.subscribeToGenericMessages(::handleGenericMessage)
@@ -108,7 +108,7 @@ public abstract class KrpcServer(
108108
) {
109109
val descriptor = serviceDescriptorOf(serviceKClass)
110110

111-
launch(CoroutineName("krpc-server-service-$descriptor")) {
111+
internalScope.launch(CoroutineName("krpc-server-service-$descriptor")) {
112112
connector.subscribeToServiceMessages(descriptor.fqName) { message ->
113113
val rpcServerService = rpcServices.computeIfAbsent(descriptor.fqName) {
114114
createNewServiceInstance(descriptor, serviceFactory)
@@ -133,7 +133,7 @@ public abstract class KrpcServer(
133133
config = config,
134134
connector = connector,
135135
supportedPlugins = supportedPlugins,
136-
serverScope = this,
136+
serverScope = internalScope,
137137
)
138138
}
139139

@@ -143,7 +143,7 @@ public abstract class KrpcServer(
143143
CancellationType.ENDPOINT -> {
144144
cancelledByClient = true
145145

146-
cancel("Server cancelled by client")
146+
internalScope.cancel("Server cancelled by client")
147147
rpcServices.clear()
148148
}
149149

krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/ProtocolTestBase.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import kotlinx.rpc.krpc.server.KrpcServer
2323
import kotlinx.rpc.registerService
2424
import kotlinx.rpc.withService
2525
import kotlinx.serialization.BinaryFormat
26-
import kotlin.coroutines.CoroutineContext
2726

2827
abstract class ProtocolTestBase {
2928
protected fun runTest(

krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationTest.kt

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ class CancellationTest {
7676
}
7777

7878
unskippableDelay(150) // wait for requests to reach server
79-
client.cancel()
79+
client.internalScope.cancel()
8080
firstRequestJob.join()
8181
secondRequestJob.join()
8282

@@ -85,8 +85,8 @@ class CancellationTest {
8585

8686
assertEquals(0, serverInstances.sumOf { it.delayCounter.value }, "Expected no requests to succeed")
8787

88-
client.join()
89-
server.join()
88+
client.internalScope.join()
89+
server.internalScope.join()
9090

9191
checkAlive(clientAlive = false, serverAlive = false)
9292
stopAllAndJoin()
@@ -105,7 +105,7 @@ class CancellationTest {
105105
}
106106

107107
unskippableDelay(150) // wait for requests to reach server
108-
server.cancel()
108+
server.internalScope.cancel()
109109
firstRequestJob.join()
110110
secondRequestJob.join()
111111

@@ -114,8 +114,8 @@ class CancellationTest {
114114

115115
assertEquals(0, serverInstances.sumOf { it.delayCounter.value }, "Expected no requests to succeed")
116116

117-
client.join()
118-
server.join()
117+
client.internalScope.join()
118+
server.internalScope.join()
119119

120120
checkAlive(clientAlive = false, serverAlive = false)
121121
stopAllAndJoin()
@@ -220,8 +220,8 @@ class CancellationTest {
220220

221221
serverInstance().firstIncomingConsumed.await()
222222

223-
client.cancel("Test request cancelled")
224-
client.join()
223+
client.internalScope.cancel("Test request cancelled")
224+
client.internalScope.join()
225225

226226
serverInstance().consumedAll.await()
227227

@@ -239,7 +239,7 @@ class CancellationTest {
239239
caught = it
240240
}.collect {
241241
if (it == 0) {
242-
client.cancel()
242+
client.internalScope.cancel()
243243
} else {
244244
fail("Expected the request to fail with cancellation of the client")
245245
}
@@ -254,7 +254,7 @@ class CancellationTest {
254254
fun testCancelledClientCancelsRequest() = runCancellationTest {
255255
launch {
256256
serverInstance().firstIncomingConsumed.await()
257-
client.cancel("Cancelled by test")
257+
client.internalScope.cancel("Cancelled by test")
258258
}
259259

260260
try {
@@ -330,8 +330,8 @@ class CancellationTest {
330330
serverAlive: Boolean = true,
331331
transportAlive: Boolean = true,
332332
) {
333-
checkAlive(clientAlive, client, "client")
334-
checkAlive(serverAlive, server, "server")
333+
checkAlive(clientAlive, client.internalScope, "client")
334+
checkAlive(serverAlive, server.internalScope, "server")
335335
checkAlive(transportAlive, transport, "transport")
336336
}
337337

krpc/krpc-test/src/jvmTest/kotlin/kotlinx/rpc/krpc/test/api/ApiVersioningTest.kt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,6 @@ class ApiVersioningTest {
8989
}
9090
}
9191

92-
private fun List<List<*>>.join() = joinToString { "[${it.joinToString()}]" }
93-
9492
companion object {
9593
val LIBRARY_VERSION_DIR = System.getenv("LIBRARY_VERSION")?.versionToDirName()
9694
?: error("Expected LIBRARY_VERSION env variable")

0 commit comments

Comments
 (0)