1+ /*
2+ * Copyright 2002-present the original author or authors.
3+ *
4+ * Licensed under the Apache License, Version 2.0 (the "License");
5+ * you may not use this file except in compliance with the License.
6+ * You may obtain a copy of the License at
7+ *
8+ * https://www.apache.org/licenses/LICENSE-2.0
9+ *
10+ * Unless required by applicable law or agreed to in writing, software
11+ * distributed under the License is distributed on an "AS IS" BASIS,
12+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+ * See the License for the specific language governing permissions and
14+ * limitations under the License.
15+ */
16+
17+ package org.springframework.messaging.rsocket.service
18+
19+ import io.rsocket.util.DefaultPayload
20+ import kotlinx.coroutines.flow.Flow
21+ import kotlinx.coroutines.flow.flowOf
22+ import kotlinx.coroutines.flow.map
23+ import kotlinx.coroutines.flow.toList
24+ import kotlinx.coroutines.reactive.asFlow
25+ import kotlinx.coroutines.runBlocking
26+ import org.assertj.core.api.Assertions.assertThat
27+ import org.junit.jupiter.api.BeforeEach
28+ import org.junit.jupiter.api.Test
29+ import org.springframework.messaging.rsocket.RSocketRequester
30+ import org.springframework.messaging.rsocket.RSocketStrategies
31+ import org.springframework.messaging.rsocket.TestRSocket
32+ import org.springframework.util.MimeTypeUtils.TEXT_PLAIN
33+ import reactor.core.publisher.Flux
34+ import reactor.core.publisher.Mono
35+
36+ /* *
37+ * Kotlin tests for [RSocketServiceMethod].
38+ *
39+ * @author Dmitry Sulman
40+ */
41+ class RSocketServiceMethodKotlinTests {
42+
43+ private lateinit var rsocket: TestRSocket
44+
45+ private lateinit var proxyFactory: RSocketServiceProxyFactory
46+
47+ @BeforeEach
48+ fun setUp () {
49+ rsocket = TestRSocket ()
50+ val requester = RSocketRequester .wrap(rsocket, TEXT_PLAIN , TEXT_PLAIN , RSocketStrategies .create())
51+ proxyFactory = RSocketServiceProxyFactory .builder(requester).build()
52+ }
53+
54+ @Test
55+ fun fireAndForget (): Unit = runBlocking {
56+ val service = proxyFactory.createClient(SuspendingFunctionsService ::class .java)
57+
58+ val requestPayload = " request"
59+ service.fireAndForget(requestPayload)
60+
61+ assertThat(rsocket.savedMethodName).isEqualTo(" fireAndForget" )
62+ assertThat(rsocket.savedPayload?.metadataUtf8).isEqualTo(" ff" )
63+ assertThat(rsocket.savedPayload?.dataUtf8).isEqualTo(requestPayload)
64+ }
65+
66+ @Test
67+ fun requestResponse (): Unit = runBlocking {
68+ val service = proxyFactory.createClient(SuspendingFunctionsService ::class .java)
69+
70+ val requestPayload = " request"
71+ val responsePayload = " response"
72+ rsocket.setPayloadMonoToReturn(Mono .just(DefaultPayload .create(responsePayload)))
73+ val response = service.requestResponse(requestPayload)
74+
75+ assertThat(response).isEqualTo(responsePayload)
76+ assertThat(rsocket.savedMethodName).isEqualTo(" requestResponse" )
77+ assertThat(rsocket.savedPayload?.metadataUtf8).isEqualTo(" rr" )
78+ assertThat(rsocket.savedPayload?.dataUtf8).isEqualTo(requestPayload)
79+ }
80+
81+ @Test
82+ fun requestStream (): Unit = runBlocking {
83+ val service = proxyFactory.createClient(SuspendingFunctionsService ::class .java)
84+
85+ val requestPayload = " request"
86+ val responsePayload1 = " response1"
87+ val responsePayload2 = " response2"
88+ rsocket.setPayloadFluxToReturn(
89+ Flux .just(DefaultPayload .create(responsePayload1), DefaultPayload .create(responsePayload2)))
90+ val response = service.requestStream(requestPayload).toList()
91+
92+ assertThat(response).containsExactly(responsePayload1, responsePayload2)
93+ assertThat(rsocket.savedMethodName).isEqualTo(" requestStream" )
94+ assertThat(rsocket.savedPayload?.metadataUtf8).isEqualTo(" rs" )
95+ assertThat(rsocket.savedPayload?.dataUtf8).isEqualTo(requestPayload)
96+ }
97+
98+ @Test
99+ fun requestChannel (): Unit = runBlocking {
100+ val service = proxyFactory.createClient(SuspendingFunctionsService ::class .java)
101+
102+ val requestPayload1 = " request1"
103+ val requestPayload2 = " request2"
104+ val responsePayload1 = " response1"
105+ val responsePayload2 = " response2"
106+ rsocket.setPayloadFluxToReturn(
107+ Flux .just(DefaultPayload .create(responsePayload1), DefaultPayload .create(responsePayload2)))
108+ val response = service.requestChannel(flowOf(requestPayload1, requestPayload2)).toList()
109+
110+ assertThat(response).containsExactly(responsePayload1, responsePayload2)
111+ assertThat(rsocket.savedMethodName).isEqualTo(" requestChannel" )
112+
113+ val savedPayloads = rsocket.savedPayloadFlux
114+ ?.asFlow()
115+ ?.map { it.dataUtf8 }
116+ ?.toList()
117+ assertThat(savedPayloads).containsExactly(requestPayload1, requestPayload2)
118+ }
119+
120+ private interface SuspendingFunctionsService {
121+
122+ @RSocketExchange(" ff" )
123+ suspend fun fireAndForget (input : String )
124+
125+ @RSocketExchange(" rr" )
126+ suspend fun requestResponse (input : String ): String
127+
128+ @RSocketExchange(" rs" )
129+ suspend fun requestStream (input : String ): Flow <String >
130+
131+ @RSocketExchange(" rc" )
132+ suspend fun requestChannel (input : Flow <String >): Flow <String >
133+ }
134+ }
0 commit comments