@@ -268,29 +268,31 @@ public void testMap() {
268268 });
269269
270270 // Expect 2 rows from SELECT of 2 rows
271- consumeOne (connection .createStatement (
272- "SELECT x,y FROM testMap WHERE x = 0 ORDER BY y" )
273- .execute (),
274- selectResult -> {
275-
276- // Expect IllegalArgumentException for a null mapping function
277- assertThrows (
278- IllegalArgumentException .class , () -> selectResult .map (null ));
279-
280- Publisher <List <?>> selectRowPublisher =
281- selectResult .map ((row , metadata ) ->
282- asList (row .get ("x" , Integer .class ), row .get ("y" , Integer .class )));
283- awaitMany (
284- asList (asList (0 , 1 ), asList (0 , 2 )), selectRowPublisher );
285-
286- // Expect IllegalStateException from multiple Result consumptions.
287- assertThrows (IllegalStateException .class ,
288- () -> selectResult .map ((row , metadata ) -> "unexpected" ));
289- assertThrows (IllegalStateException .class , selectResult ::getRowsUpdated );
290-
291- // Expect row data publisher to reject multiple subscribers
292- awaitError (IllegalStateException .class , selectRowPublisher );
293- });
271+ awaitMany (asList (asList (0 , 1 ), asList (0 , 2 )),
272+ Mono .from (connection .createStatement (
273+ "SELECT x,y FROM testMap WHERE x = 0 ORDER BY y" )
274+ .execute ())
275+ .flatMapMany (selectResult -> {
276+ // Expect IllegalArgumentException for a null mapping function
277+ assertThrows (
278+ IllegalArgumentException .class , () -> selectResult .map (null ));
279+
280+ Publisher <List <Integer >> selectRowPublisher =
281+ selectResult .map ((row , metadata ) ->
282+ asList (
283+ row .get ("x" , Integer .class ),
284+ row .get ("y" , Integer .class )));
285+
286+ // Expect IllegalStateException from multiple Result consumptions.
287+ assertThrows (IllegalStateException .class ,
288+ () -> selectResult .map ((row , metadata ) -> "unexpected" ));
289+ assertThrows (IllegalStateException .class , selectResult ::getRowsUpdated );
290+
291+ return Flux .from (selectRowPublisher )
292+ .doFinally (signalType ->
293+ // Expect row data publisher to reject multiple subscribers
294+ awaitError (IllegalStateException .class , selectRowPublisher ));
295+ }));
294296
295297 // Expect a Row to not be valid outside of the mapping function
296298 List <Row > rows = awaitMany (Flux .from (connection .createStatement (
@@ -302,56 +304,32 @@ public void testMap() {
302304 assertThrows (IllegalStateException .class , () -> row0 .get (1 ));
303305 Row row1 = rows .get (1 );
304306 assertThrows (IllegalStateException .class , () -> row1 .get (0 ));
305- assertThrows (IllegalStateException .class , () -> row1 .get ("y" ));
306-
307- consumeOne (connection .createStatement (
308- "SELECT x,y FROM testMap WHERE x = 0 ORDER BY y" )
309- .execute (),
310- select2Result -> {
311- assertThrows (
312- IllegalArgumentException .class , () -> select2Result .map (null ));
313-
314- Publisher <List <?>> select2RowPublisher =
315- select2Result .map ((row , metadata ) ->
316- asList (row .get ("x" , Integer .class ), row .get ("y" , Integer .class )));
317- awaitMany (
318- asList (asList (0 , 1 ), asList (0 , 2 )), select2RowPublisher );
319-
320- // Expect IllegalStateException from multiple Result consumptions.
321- assertThrows (IllegalStateException .class ,
322- () -> select2Result .map ((row , metadata ) -> "unexpected" ));
323- assertThrows (IllegalStateException .class , select2Result ::getRowsUpdated );
324-
325- // Expect row data publisher to reject multiple subscribers
326- awaitError (IllegalStateException .class , select2RowPublisher );
327- });
307+ assertThrows (IllegalStateException .class , () -> row1 .get ("y" ));;
328308
329309 // Expect onError for a mapping function that throws
330310 RuntimeException thrown = new RuntimeException ("Expected" );
331- consumeOne (connection .createStatement (
311+ awaitMany (asList (Signal .next (asList (0 , 1 )), Signal .error (thrown )),
312+ Mono .from (connection .createStatement (
332313 "SELECT x,y FROM testMap WHERE x = 0 ORDER BY y" )
333- .execute (),
334- select3Result -> {
335- awaitMany (
336- asList (Signal .next (asList (0 , 1 )), Signal .error (thrown )),
337- Flux .from (select3Result .map ((row , metadata ) -> {
314+ .execute ())
315+ .flatMapMany (select3Result ->
316+ Flux .from (select3Result .map ((row , metadata ) -> {
338317 if (row .get ("y" , Integer .class ) == 1 ) {
339318 return asList (
340319 row .get ("x" , Integer .class ), row .get ("y" , Integer .class ));
341320 }
342321 else {
343322 throw thrown ;
344323 }
345- })). materialize ());
346- } );
324+ }))
325+ . materialize ()) );
347326
348327 // Expect onError for a mapping function that outputs null
349- consumeOne (connection .createStatement (
350- "SELECT x,y FROM testMap WHERE x = 0 ORDER BY y" )
351- .execute (),
352- select4Result -> {
353- List <Signal <List <Integer >>> signals =
354- awaitMany (
328+ List <Signal <List <Integer >>> signals = awaitMany (Mono .from (
329+ connection .createStatement (
330+ "SELECT x,y FROM testMap WHERE x = 0 ORDER BY y" )
331+ .execute ())
332+ .flatMapMany (select4Result ->
355333 Flux .from (select4Result .map ((row , metadata ) -> {
356334 if (row .get ("y" , Integer .class ) == 1 ) {
357335 return asList (
@@ -360,31 +338,32 @@ public void testMap() {
360338 else {
361339 return null ;
362340 }
363- })).materialize ());
364- assertEquals (signals .get (0 ).get (), asList (0 , 1 ));
365- assertEquals (
366- signals .get (1 ).getThrowable ().getClass (),
367- NullPointerException .class );
368- });
341+ })).materialize ()));
342+ assertEquals (signals .get (0 ).get (), asList (0 , 1 ));
343+ assertEquals (
344+ signals .get (1 ).getThrowable ().getClass (),
345+ NullPointerException .class );
369346
370347 // Expect no rows from DELETE
371- consumeOne (connection .createStatement (
348+ awaitNone ( Mono . from (connection .createStatement (
372349 "DELETE FROM testMap WHERE x <>:y" )
373350 .bind ("y" , 99 )
374- .execute (),
375- deleteResult -> {
351+ .execute ())
352+ .flatMap (deleteResult -> {
353+
376354 Publisher <Object > deleteRowPublisher =
377355 deleteResult .map ((row , metatdata ) -> row .get ("z" ));
378- awaitNone (deleteRowPublisher );
379356
380357 // Expect IllegalStateException from multiple Result consumptions.
381358 assertThrows (IllegalStateException .class ,
382359 () -> deleteResult .map ((row , metadata ) -> "unexpected" ));
383360 assertThrows (IllegalStateException .class , deleteResult ::getRowsUpdated );
384361
385- // Expect row data publisher to reject multiple subscribers
386- awaitError (IllegalStateException .class , deleteRowPublisher );
387- });
362+ return Mono .from (deleteRowPublisher )
363+ .doOnTerminate (() ->
364+ // Expect row data publisher to reject multiple subscribers
365+ awaitError (IllegalStateException .class , deleteRowPublisher ));
366+ }));
388367 }
389368 finally {
390369 tryAwaitExecution (connection .createStatement ("DROP TABLE testMap" ));
0 commit comments