2323import static org .junit .Assert .assertNull ;
2424import static org .junit .Assert .assertSame ;
2525import static org .junit .Assert .assertTrue ;
26+ import static org .junit .Assert .fail ;
2627
2728import com .google .cloud .AsyncPage ;
2829import com .google .cloud .Page ;
2930import com .google .cloud .RetryParams ;
3031import com .google .cloud .pubsub .PubSub .ListOption ;
3132import com .google .cloud .pubsub .spi .PubSubRpc ;
33+ import com .google .cloud .pubsub .spi .PubSubRpc .PullCallback ;
34+ import com .google .cloud .pubsub .spi .PubSubRpc .PullFuture ;
3235import com .google .cloud .pubsub .spi .PubSubRpcFactory ;
3336import com .google .common .base .Function ;
3437import com .google .common .collect .ImmutableList ;
5558import com .google .pubsub .v1 .PullRequest ;
5659import com .google .pubsub .v1 .PullResponse ;
5760
61+ import org .easymock .Capture ;
5862import org .easymock .EasyMock ;
5963import org .junit .After ;
6064import org .junit .Before ;
6165import org .junit .Rule ;
6266import org .junit .Test ;
6367import org .junit .rules .ExpectedException ;
6468
69+ import java .io .IOException ;
6570import java .util .Iterator ;
6671import java .util .List ;
6772import java .util .concurrent .ExecutionException ;
@@ -1229,7 +1234,7 @@ public void testListTopicSubscriptionsAsyncWithOptions()
12291234 }
12301235
12311236 @ Test
1232- public void testPullMessages () {
1237+ public void testPullMessages () throws ExecutionException , InterruptedException {
12331238 pubsub = new PubSubImpl (options , renewerMock );
12341239 PullRequest request = PullRequest .newBuilder ()
12351240 .setSubscription (SUBSCRIPTION_NAME_PB )
@@ -1243,10 +1248,16 @@ public void testPullMessages() {
12431248 .addReceivedMessages (MESSAGE_PB1 )
12441249 .addReceivedMessages (MESSAGE_PB2 )
12451250 .build ();
1246- EasyMock .expect (pubsubRpcMock .pull (request )).andReturn (Futures .immediateFuture (response ));
1251+ Capture <PullCallback > callback = Capture .newInstance ();
1252+ PullFuture futureMock = EasyMock .createStrictMock (PullFuture .class );
1253+ futureMock .addCallback (EasyMock .capture (callback ));
1254+ EasyMock .expectLastCall ();
1255+ EasyMock .expect (futureMock .get ()).andReturn (response );
1256+ EasyMock .expect (pubsubRpcMock .pull (request )).andReturn (futureMock );
12471257 renewerMock .add (SUBSCRIPTION , ImmutableList .of ("ackId1" , "ackId2" ));
1248- EasyMock .replay (pubsubRpcMock , renewerMock );
1258+ EasyMock .replay (pubsubRpcMock , renewerMock , futureMock );
12491259 Iterator <ReceivedMessage > messageIterator = pubsub .pull (SUBSCRIPTION , 42 );
1260+ callback .getValue ().success (response );
12501261 EasyMock .reset (renewerMock );
12511262 for (ReceivedMessage message : messageList ) {
12521263 renewerMock .remove (SUBSCRIPTION , message .ackId ());
@@ -1256,6 +1267,7 @@ public void testPullMessages() {
12561267 while (messageIterator .hasNext ()) {
12571268 messageIterator .next ();
12581269 }
1270+ EasyMock .verify (futureMock );
12591271 }
12601272
12611273 @ Test
@@ -1273,10 +1285,16 @@ public void testPullMessagesAsync() throws ExecutionException, InterruptedExcept
12731285 .addReceivedMessages (MESSAGE_PB1 )
12741286 .addReceivedMessages (MESSAGE_PB2 )
12751287 .build ();
1276- EasyMock .expect (pubsubRpcMock .pull (request )).andReturn (Futures .immediateFuture (response ));
1288+ Capture <PullCallback > callback = Capture .newInstance ();
1289+ PullFuture futureMock = EasyMock .createStrictMock (PullFuture .class );
1290+ futureMock .addCallback (EasyMock .capture (callback ));
1291+ EasyMock .expectLastCall ();
1292+ EasyMock .expect (futureMock .get ()).andReturn (response );
1293+ EasyMock .expect (pubsubRpcMock .pull (request )).andReturn (futureMock );
12771294 renewerMock .add (SUBSCRIPTION , ImmutableList .of ("ackId1" , "ackId2" ));
1278- EasyMock .replay (pubsubRpcMock , renewerMock );
1295+ EasyMock .replay (pubsubRpcMock , renewerMock , futureMock );
12791296 Iterator <ReceivedMessage > messageIterator = pubsub .pullAsync (SUBSCRIPTION , 42 ).get ();
1297+ callback .getValue ().success (response );
12801298 EasyMock .reset (renewerMock );
12811299 for (ReceivedMessage message : messageList ) {
12821300 renewerMock .remove (SUBSCRIPTION , message .ackId ());
@@ -1286,6 +1304,55 @@ public void testPullMessagesAsync() throws ExecutionException, InterruptedExcept
12861304 while (messageIterator .hasNext ()) {
12871305 messageIterator .next ();
12881306 }
1307+ EasyMock .verify (futureMock );
1308+ }
1309+
1310+ @ Test
1311+ public void testPullMessagesError () throws ExecutionException , InterruptedException {
1312+ pubsub = new PubSubImpl (options , renewerMock );
1313+ PullRequest request = PullRequest .newBuilder ()
1314+ .setSubscription (SUBSCRIPTION_NAME_PB )
1315+ .setMaxMessages (42 )
1316+ .setReturnImmediately (true )
1317+ .build ();
1318+ PubSubException exception = new PubSubException (new IOException (), false );
1319+ PullFuture futureMock = EasyMock .createStrictMock (PullFuture .class );
1320+ futureMock .addCallback (EasyMock .anyObject (PullCallback .class ));
1321+ EasyMock .expectLastCall ();
1322+ EasyMock .expect (futureMock .get ()).andThrow (new ExecutionException (exception ));
1323+ EasyMock .expect (pubsubRpcMock .pull (request )).andReturn (futureMock );
1324+ EasyMock .replay (pubsubRpcMock , renewerMock , futureMock );
1325+ try {
1326+ pubsub .pull (SUBSCRIPTION , 42 );
1327+ fail ("Expected PubSubException" );
1328+ } catch (PubSubException ex ) {
1329+ assertSame (exception , ex );
1330+ }
1331+ EasyMock .verify (futureMock );
1332+ }
1333+
1334+ @ Test
1335+ public void testPullMessagesAsyncError () throws ExecutionException , InterruptedException {
1336+ pubsub = new PubSubImpl (options , renewerMock );
1337+ PullRequest request = PullRequest .newBuilder ()
1338+ .setSubscription (SUBSCRIPTION_NAME_PB )
1339+ .setMaxMessages (42 )
1340+ .setReturnImmediately (true )
1341+ .build ();
1342+ PubSubException exception = new PubSubException (new IOException (), false );
1343+ PullFuture futureMock = EasyMock .createStrictMock (PullFuture .class );
1344+ futureMock .addCallback (EasyMock .anyObject (PullCallback .class ));
1345+ EasyMock .expectLastCall ();
1346+ EasyMock .expect (futureMock .get ()).andThrow (new ExecutionException (exception ));
1347+ EasyMock .expect (pubsubRpcMock .pull (request )).andReturn (futureMock );
1348+ EasyMock .replay (pubsubRpcMock , renewerMock , futureMock );
1349+ try {
1350+ pubsub .pullAsync (SUBSCRIPTION , 42 ).get ();
1351+ fail ("Expected ExecutionException" );
1352+ } catch (ExecutionException ex ) {
1353+ assertSame (exception , ex .getCause ());
1354+ }
1355+ EasyMock .verify (futureMock );
12891356 }
12901357
12911358 @ Test
0 commit comments