@@ -442,6 +442,8 @@ private record ReactorExchangeResponseFunction(
442442 @ Nullable ReactiveAdapter returnTypeAdapter ,
443443 boolean blockForOptional , @ Nullable Duration blockTimeout ) implements ResponseFunction {
444444
445+ private static final String COROUTINES_FLOW_CLASS_NAME = "kotlinx.coroutines.flow.Flow" ;
446+
445447 @ Override
446448 @ Nullable
447449 public Object execute (HttpRequestValues requestValues ) {
@@ -472,14 +474,16 @@ public static ResponseFunction create(ReactorHttpExchangeAdapter client, Method
472474 MethodParameter returnParam = new MethodParameter (method , -1 );
473475 Class <?> returnType = returnParam .getParameterType ();
474476 boolean isSuspending = KotlinDetector .isSuspendingFunction (method );
477+ boolean hasFlowReturnType = COROUTINES_FLOW_CLASS_NAME .equals (returnType .getName ());
478+ boolean isUnwrapped = isSuspending && !hasFlowReturnType ;
475479 if (isSuspending ) {
476- returnType = Mono .class ;
480+ returnType = ( hasFlowReturnType ? Flux . class : Mono .class ) ;
477481 }
478482
479483 ReactiveAdapter reactiveAdapter = client .getReactiveAdapterRegistry ().getAdapter (returnType );
480484
481485 MethodParameter actualParam = (reactiveAdapter != null ? returnParam .nested () : returnParam .nestedIfOptional ());
482- Class <?> actualType = isSuspending ? actualParam .getParameterType () : actualParam .getNestedParameterType ();
486+ Class <?> actualType = isUnwrapped ? actualParam .getParameterType () : actualParam .getNestedParameterType ();
483487
484488 Function <HttpRequestValues , Publisher <?>> responseFunction ;
485489 if (ClassUtils .isVoidType (actualType )) {
@@ -492,18 +496,18 @@ else if (actualType.equals(HttpHeaders.class)) {
492496 responseFunction = client ::exchangeForHeadersMono ;
493497 }
494498 else if (actualType .equals (ResponseEntity .class )) {
495- MethodParameter bodyParam = isSuspending ? actualParam : actualParam .nested ();
499+ MethodParameter bodyParam = isUnwrapped ? actualParam : actualParam .nested ();
496500 Class <?> bodyType = bodyParam .getNestedParameterType ();
497501 if (bodyType .equals (Void .class )) {
498502 responseFunction = client ::exchangeForBodilessEntityMono ;
499503 }
500504 else {
501505 ReactiveAdapter bodyAdapter = client .getReactiveAdapterRegistry ().getAdapter (bodyType );
502- responseFunction = initResponseEntityFunction (client , bodyParam , bodyAdapter , isSuspending );
506+ responseFunction = initResponseEntityFunction (client , bodyParam , bodyAdapter , isUnwrapped );
503507 }
504508 }
505509 else {
506- responseFunction = initBodyFunction (client , actualParam , reactiveAdapter , isSuspending );
510+ responseFunction = initBodyFunction (client , actualParam , reactiveAdapter , isUnwrapped );
507511 }
508512
509513 return new ReactorExchangeResponseFunction (
@@ -513,7 +517,7 @@ else if (actualType.equals(ResponseEntity.class)) {
513517 @ SuppressWarnings ("ConstantConditions" )
514518 private static Function <HttpRequestValues , Publisher <?>> initResponseEntityFunction (
515519 ReactorHttpExchangeAdapter client , MethodParameter methodParam ,
516- @ Nullable ReactiveAdapter reactiveAdapter , boolean isSuspending ) {
520+ @ Nullable ReactiveAdapter reactiveAdapter , boolean isUnwrapped ) {
517521
518522 if (reactiveAdapter == null ) {
519523 return request -> client .exchangeForEntityMono (
@@ -524,7 +528,7 @@ private static Function<HttpRequestValues, Publisher<?>> initResponseEntityFunct
524528 "ResponseEntity body must be a concrete value or a multi-value Publisher" );
525529
526530 ParameterizedTypeReference <?> bodyType =
527- ParameterizedTypeReference .forType (isSuspending ? methodParam .nested ().getGenericParameterType () :
531+ ParameterizedTypeReference .forType (isUnwrapped ? methodParam .nested ().getGenericParameterType () :
528532 methodParam .nested ().getNestedGenericParameterType ());
529533
530534 // Shortcut for Flux
0 commit comments