Skip to content

Commit 9f9f0a7

Browse files
committed
Polish suspending functions support in RSocketServiceMethod
Closes gh-35473
1 parent 3190269 commit 9f9f0a7

File tree

2 files changed

+30
-7
lines changed

2 files changed

+30
-7
lines changed

spring-messaging/src/main/java/org/springframework/messaging/rsocket/service/RSocketServiceMethod.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -137,13 +137,11 @@ private static Function<RSocketRequestValues, Object> initResponseFunction(
137137

138138
MethodParameter returnParam = new MethodParameter(method, -1);
139139
Class<?> returnType = returnParam.getParameterType();
140-
boolean isFlowReturnType = COROUTINES_FLOW_CLASS_NAME.equals(returnType.getName());
141-
boolean isUnwrapped = KotlinDetector.isSuspendingFunction(method) && !isFlowReturnType;
142-
if (isUnwrapped) {
143-
returnType = Mono.class;
144-
}
145-
else if (isFlowReturnType) {
146-
returnType = Flux.class;
140+
boolean isSuspending = KotlinDetector.isSuspendingFunction(method);
141+
boolean hasFlowReturnType = COROUTINES_FLOW_CLASS_NAME.equals(returnType.getName());
142+
boolean isUnwrapped = isSuspending && !hasFlowReturnType;
143+
if (isSuspending) {
144+
returnType = (hasFlowReturnType ? Flux.class : Mono.class);
147145
}
148146

149147
ReactiveAdapter reactiveAdapter = reactiveRegistry.getAdapter(returnType);

spring-messaging/src/test/kotlin/org/springframework/messaging/rsocket/service/RSocketServiceMethodKotlinTests.kt

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import reactor.core.publisher.Mono
3737
* Kotlin tests for [RSocketServiceMethod].
3838
*
3939
* @author Dmitry Sulman
40+
* @author Sebastien Deleuze
4041
*/
4142
class RSocketServiceMethodKotlinTests {
4243

@@ -95,6 +96,23 @@ class RSocketServiceMethodKotlinTests {
9596
assertThat(rsocket.savedPayload?.dataUtf8).isEqualTo(requestPayload)
9697
}
9798

99+
@Test
100+
fun nonSuspendingRequestStream(): Unit = runBlocking {
101+
val service = proxyFactory.createClient(NonSuspendingFunctionsService::class.java)
102+
103+
val requestPayload = "request"
104+
val responsePayload1 = "response1"
105+
val responsePayload2 = "response2"
106+
rsocket.setPayloadFluxToReturn(
107+
Flux.just(DefaultPayload.create(responsePayload1), DefaultPayload.create(responsePayload2)))
108+
val response = service.requestStream(requestPayload).toList()
109+
110+
assertThat(response).containsExactly(responsePayload1, responsePayload2)
111+
assertThat(rsocket.savedMethodName).isEqualTo("requestStream")
112+
assertThat(rsocket.savedPayload?.metadataUtf8).isEqualTo("rs")
113+
assertThat(rsocket.savedPayload?.dataUtf8).isEqualTo(requestPayload)
114+
}
115+
98116
@Test
99117
fun requestChannel(): Unit = runBlocking {
100118
val service = proxyFactory.createClient(SuspendingFunctionsService::class.java)
@@ -131,4 +149,11 @@ class RSocketServiceMethodKotlinTests {
131149
@RSocketExchange("rc")
132150
suspend fun requestChannel(input: Flow<String>): Flow<String>
133151
}
152+
153+
private interface NonSuspendingFunctionsService {
154+
155+
@RSocketExchange("rs")
156+
fun requestStream(input: String): Flow<String>
157+
}
158+
134159
}

0 commit comments

Comments
 (0)