Skip to content

Commit 1d5b58b

Browse files
committed
test: add unit tests for listener drain exception handling in DelayedStream and DelayedClientCall
1 parent 3ea6a40 commit 1d5b58b

2 files changed

Lines changed: 95 additions & 0 deletions

File tree

core/src/test/java/io/grpc/internal/DelayedClientCallTest.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static org.mockito.ArgumentMatchers.same;
2424
import static org.mockito.Mockito.mock;
2525
import static org.mockito.Mockito.never;
26+
import static org.mockito.Mockito.times;
2627
import static org.mockito.Mockito.verify;
2728

2829
import com.google.common.util.concurrent.MoreExecutors;
@@ -256,6 +257,55 @@ public Void answer(org.mockito.invocation.InvocationOnMock invocation) {
256257
verify(mockRealCall).cancel(eq("Failed to drain pending calls"), same(error));
257258
}
258259

260+
@Test
261+
public void drainPendingCallbacksFails() {
262+
DelayedClientCall<String, Integer> delayedClientCall =
263+
new DelayedClientCall<>(callExecutor, fakeClock.getScheduledExecutorService(), null);
264+
delayedClientCall.start(listener, new Metadata());
265+
266+
final RuntimeException error = new RuntimeException("fail");
267+
org.mockito.Mockito.doAnswer(new org.mockito.stubbing.Answer<Void>() {
268+
@Override
269+
public Void answer(org.mockito.invocation.InvocationOnMock invocation) {
270+
throw error;
271+
}
272+
}).when(listener).onReady();
273+
274+
final AtomicReference<ClientCall.Listener<Integer>> listenerCaptor = new AtomicReference<>();
275+
org.mockito.Mockito.doAnswer(new org.mockito.stubbing.Answer<Void>() {
276+
@Override
277+
public Void answer(org.mockito.invocation.InvocationOnMock invocation) {
278+
ClientCall.Listener<Integer> delayedListener = invocation.getArgument(0);
279+
listenerCaptor.set(delayedListener);
280+
delayedListener.onReady();
281+
return null;
282+
}
283+
}).when(mockRealCall).start(any(ClientCall.Listener.class), any(Metadata.class));
284+
285+
Runnable runnable = delayedClientCall.setCall(mockRealCall);
286+
assertThat(runnable).isNotNull();
287+
288+
try {
289+
runnable.run();
290+
org.junit.Assert.fail("Should have thrown");
291+
} catch (RuntimeException e) {
292+
assertThat(e).isSameInstanceAs(error);
293+
}
294+
295+
ClientCall.Listener<Integer> delayedListener = listenerCaptor.get();
296+
assertThat(delayedListener).isNotNull();
297+
298+
// Verify it transitioned to passThrough by showing it forwards.
299+
try {
300+
delayedListener.onReady();
301+
org.junit.Assert.fail("Should have thrown");
302+
} catch (RuntimeException e) {
303+
assertThat(e).isSameInstanceAs(error);
304+
}
305+
// Verify it was called twice (once during drain, once just now)
306+
verify(listener, times(2)).onReady();
307+
}
308+
259309
private void callMeMaybe(Runnable r) {
260310
if (r != null) {
261311
r.run();

core/src/test/java/io/grpc/internal/DelayedStreamTest.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -502,6 +502,51 @@ public Void answer(InvocationOnMock invocation) {
502502
verify(listener).closed(same(statusCaptor.getValue()), any(RpcProgress.class), any(Metadata.class));
503503
}
504504

505+
@Test
506+
public void drainPendingCallbacksFails() {
507+
stream.start(listener);
508+
final RuntimeException error = new RuntimeException("fail");
509+
doAnswer(new Answer<Void>() {
510+
@Override
511+
public Void answer(InvocationOnMock invocation) {
512+
throw error;
513+
}
514+
}).when(listener).onReady();
515+
516+
doAnswer(new Answer<Void>() {
517+
@Override
518+
public Void answer(InvocationOnMock invocation) {
519+
ClientStreamListener delayedListener = invocation.getArgument(0);
520+
delayedListener.onReady();
521+
return null;
522+
}
523+
}).when(realStream).start(any(ClientStreamListener.class));
524+
525+
Runnable runnable = stream.setStream(realStream);
526+
assertNotNull(runnable);
527+
528+
try {
529+
runnable.run();
530+
org.junit.Assert.fail("Should have thrown");
531+
} catch (RuntimeException e) {
532+
assertThat(e).isSameInstanceAs(error);
533+
}
534+
535+
verify(realStream).start(listenerCaptor.capture());
536+
ClientStreamListener delayedListener = listenerCaptor.getValue();
537+
538+
// Verify it transitioned to passThrough. If it didn't, this might NPE or buffer.
539+
// If it is passThrough, it will forward to the listener, which we know throws.
540+
try {
541+
delayedListener.onReady();
542+
org.junit.Assert.fail("Should have thrown");
543+
} catch (RuntimeException e) {
544+
assertThat(e).isSameInstanceAs(error);
545+
}
546+
// Verify it was called twice (once during drain, once just now)
547+
verify(listener, times(2)).onReady();
548+
}
549+
505550
private void callMeMaybe(Runnable r) {
506551
if (r != null) {
507552
r.run();

0 commit comments

Comments
 (0)