@@ -10,8 +10,6 @@ import spock.lang.Specification
1010import java.util.concurrent.CompletableFuture
1111import java.util.concurrent.CompletionException
1212import java.util.concurrent.CompletionStage
13- import java.util.concurrent.CountDownLatch
14- import java.util.concurrent.TimeUnit
1513import java.util.function.Function
1614
1715class CompletionStageMappingPublisherTest extends Specification {
@@ -132,30 +130,29 @@ class CompletionStageMappingPublisherTest extends Specification {
132130 when :
133131 Publisher<Integer > rxIntegers = Flowable . range(0 , 10 )
134132
135- CountDownLatch latch = new CountDownLatch ( 5 )
136- CountDownLatch exceptionLatch = new CountDownLatch ( 1 )
133+ List< Runnable > completions = []
134+
137135 def mapper = new Function<Integer , CompletionStage<String > > () {
138136 @Override
139137 CompletionStage<String > apply (Integer integer ) {
140138
141- if (integer == 5 ) {
142- return CompletableFuture . supplyAsync {
143- exceptionLatch. await(10_000, TimeUnit . SECONDS )
144- sleep(100 )
145- throw new RuntimeException (" Bang" )
146- }
139+ if (integer < 5 ) {
140+ def cf = new CompletableFuture<String > ()
141+ completions. add({ cf. complete(String . valueOf(integer)) })
142+ return cf
143+ } else if (integer == 5 ) {
144+ def cf = new CompletableFuture<String > ()
145+ completions. add({ cf. completeExceptionally(new RuntimeException (" Bang" )) })
146+ return cf
147147 } else if (integer == 6 ) {
148- return CompletableFuture . supplyAsync {
149- latch . countDown ()
150- exceptionLatch . countDown() // number 5 is now alive
151- return String . valueOf(integer )
148+ // complete 5,4,3,2,1 so we have to queue
149+ def reverse = completions . reverse ()
150+ for ( Runnable r : (reverse)) {
151+ r . run( )
152152 }
153+ return CompletableFuture . completedFuture(String . valueOf(integer))
153154 } else {
154- return CompletableFuture . supplyAsync {
155- latch. countDown()
156- latch. await(10_000, TimeUnit . SECONDS )
157- return String . valueOf(integer)
158- }
155+ return CompletableFuture . completedFuture(String . valueOf(integer))
159156 }
160157 }
161158 }
0 commit comments