|
31 | 31 | import static org.mockito.Mockito.verifyNoMoreInteractions; |
32 | 32 | import static org.mockito.Mockito.when; |
33 | 33 |
|
| 34 | +import com.google.common.util.concurrent.MoreExecutors; |
34 | 35 | import io.grpc.CallOptions; |
35 | 36 | import io.grpc.ClientStreamTracer; |
36 | 37 | import io.grpc.Codec; |
@@ -77,9 +78,11 @@ public class RetriableStreamTest { |
77 | 78 | .setResponseMarshaller(new StringMarshaller()) |
78 | 79 | .build(); |
79 | 80 | private final ChannelBufferMeter channelBufferUsed = new ChannelBufferMeter(); |
| 81 | + private final FakeClock fakeClock = new FakeClock(); |
80 | 82 | private final RetriableStream<String> retriableStream = |
81 | 83 | new RetriableStream<String>( |
82 | | - method, channelBufferUsed, PER_RPC_BUFFER_LIMIT, CHANNEL_BUFFER_LIMIT) { |
| 84 | + method, channelBufferUsed, PER_RPC_BUFFER_LIMIT, CHANNEL_BUFFER_LIMIT, |
| 85 | + MoreExecutors.directExecutor(), fakeClock.getScheduledExecutorService()) { |
83 | 86 | @Override |
84 | 87 | void postCommit() { |
85 | 88 | retriableStreamRecorder.postCommit(); |
@@ -168,7 +171,8 @@ public void retry_everythingDrained() { |
168 | 171 | sublistenerCaptor1.getValue().closed(Status.UNAVAILABLE, new Metadata()); |
169 | 172 |
|
170 | 173 | inOrder.verify(retriableStreamRecorder).shouldRetry(); |
171 | | - // TODO(zdapeng): send more messages during backoff, then forward backoff ticker |
| 174 | + // TODO(zdapeng): send more messages during backoff, then forward backoff ticker w/ right amount |
| 175 | + fakeClock.forwardNanos(0L); |
172 | 176 | inOrder.verify(retriableStreamRecorder).newSubstream(); |
173 | 177 | inOrder.verify(mockStream2).setAuthority(AUTHORITY); |
174 | 178 | inOrder.verify(mockStream2).setCompressor(COMPRESSOR); |
@@ -207,7 +211,8 @@ public void retry_everythingDrained() { |
207 | 211 | sublistenerCaptor2.getValue().closed(Status.UNAVAILABLE, new Metadata()); |
208 | 212 |
|
209 | 213 | inOrder.verify(retriableStreamRecorder).shouldRetry(); |
210 | | - // TODO(zdapeng): send more messages during backoff, then forward backoff ticker |
| 214 | + // TODO(zdapeng): send more messages during backoff, then forward backoff ticker w/ right amount |
| 215 | + fakeClock.forwardNanos(0L); |
211 | 216 | inOrder.verify(retriableStreamRecorder).newSubstream(); |
212 | 217 | inOrder.verify(mockStream3).setAuthority(AUTHORITY); |
213 | 218 | inOrder.verify(mockStream3).setCompressor(COMPRESSOR); |
@@ -274,11 +279,12 @@ public void retry_headersRead_cancel() { |
274 | 279 | verify(mockStream1).start(sublistenerCaptor1.capture()); |
275 | 280 |
|
276 | 281 | // retry |
277 | | - // TODO(zdapeng): forward backoff ticker |
| 282 | + // TODO(zdapeng): forward backoff ticker w/ right amount |
278 | 283 | doReturn(true).when(retriableStreamRecorder).shouldRetry(); |
279 | 284 | ClientStream mockStream2 = mock(ClientStream.class); |
280 | 285 | doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(); |
281 | 286 | sublistenerCaptor1.getValue().closed(Status.UNAVAILABLE, new Metadata()); |
| 287 | + fakeClock.forwardNanos(0L); |
282 | 288 |
|
283 | 289 | ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 = |
284 | 290 | ArgumentCaptor.forClass(ClientStreamListener.class); |
@@ -333,11 +339,12 @@ public void retry_headersRead_closed() { |
333 | 339 | verify(mockStream1).start(sublistenerCaptor1.capture()); |
334 | 340 |
|
335 | 341 | // retry |
336 | | - // TODO(zdapeng): forward backoff ticker |
| 342 | + // TODO(zdapeng): forward backoff ticker w/ right amount |
337 | 343 | doReturn(true).when(retriableStreamRecorder).shouldRetry(); |
338 | 344 | ClientStream mockStream2 = mock(ClientStream.class); |
339 | 345 | doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(); |
340 | 346 | sublistenerCaptor1.getValue().closed(Status.UNAVAILABLE, new Metadata()); |
| 347 | + fakeClock.forwardNanos(0L); |
341 | 348 |
|
342 | 349 | ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 = |
343 | 350 | ArgumentCaptor.forClass(ClientStreamListener.class); |
@@ -402,11 +409,12 @@ public void retry_cancel_closed() { |
402 | 409 | verify(mockStream1).start(sublistenerCaptor1.capture()); |
403 | 410 |
|
404 | 411 | // retry |
405 | | - // TODO(zdapeng): forward backoff ticker |
| 412 | + // TODO(zdapeng): forward backoff ticker w/ right amount |
406 | 413 | doReturn(true).when(retriableStreamRecorder).shouldRetry(); |
407 | 414 | ClientStream mockStream2 = mock(ClientStream.class); |
408 | 415 | doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(); |
409 | 416 | sublistenerCaptor1.getValue().closed(Status.UNAVAILABLE, new Metadata()); |
| 417 | + fakeClock.forwardNanos(0L); |
410 | 418 |
|
411 | 419 | ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 = |
412 | 420 | ArgumentCaptor.forClass(ClientStreamListener.class); |
@@ -468,11 +476,13 @@ public void retry_unretriableClosed_cancel() { |
468 | 476 | verify(mockStream1).start(sublistenerCaptor1.capture()); |
469 | 477 |
|
470 | 478 | // retry |
471 | | - // TODO(zdapeng): forward backoff ticker |
| 479 | + // TODO(zdapeng): forward backoff ticker w/ right amount |
| 480 | + fakeClock.forwardNanos(0L); |
472 | 481 | doReturn(true).when(retriableStreamRecorder).shouldRetry(); |
473 | 482 | ClientStream mockStream2 = mock(ClientStream.class); |
474 | 483 | doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(); |
475 | 484 | sublistenerCaptor1.getValue().closed(Status.UNAVAILABLE, new Metadata()); |
| 485 | + fakeClock.forwardNanos(0L); |
476 | 486 |
|
477 | 487 | ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 = |
478 | 488 | ArgumentCaptor.forClass(ClientStreamListener.class); |
@@ -562,10 +572,11 @@ public void request(int numMessages) { |
562 | 572 | inOrder.verify(mockStream1).writeMessage(any(InputStream.class)); // msg "substream1 request 1" |
563 | 573 |
|
564 | 574 | // retry |
565 | | - // TODO(zdapeng): send more messages during backoff, then forward backoff ticker |
| 575 | + // TODO(zdapeng): send more messages during backoff, then forward backoff ticker w/ right amount |
566 | 576 | doReturn(true).when(retriableStreamRecorder).shouldRetry(); |
567 | 577 | doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(); |
568 | 578 | sublistenerCaptor1.getValue().closed(Status.UNAVAILABLE, new Metadata()); |
| 579 | + fakeClock.forwardNanos(0L); |
569 | 580 |
|
570 | 581 | inOrder.verify(mockStream2).start(sublistenerCaptor2.get()); |
571 | 582 |
|
@@ -670,11 +681,12 @@ public boolean isReady() { |
670 | 681 | readiness.add(retriableStream.isReady()); // expected true |
671 | 682 |
|
672 | 683 | // retry |
673 | | - // TODO(zdapeng): send more messages during backoff, then forward backoff ticker |
| 684 | + // TODO(zdapeng): send more messages during backoff, then forward backoff ticker w/ right amount |
674 | 685 | doReturn(true).when(retriableStreamRecorder).shouldRetry(); |
675 | 686 | doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(); |
676 | 687 | doReturn(false).when(mockStream1).isReady(); // mockStream1 closed, so isReady false |
677 | 688 | sublistenerCaptor1.get().closed(Status.UNAVAILABLE, new Metadata()); |
| 689 | + fakeClock.forwardNanos(0L); |
678 | 690 |
|
679 | 691 | verify(mockStream2).start(any(ClientStreamListener.class)); |
680 | 692 | readiness.add(retriableStream.isReady()); // expected true |
@@ -730,8 +742,9 @@ public void start(ClientStreamListener listener) { |
730 | 742 | ClientStreamListener listener1 = sublistenerCaptor1.getValue(); |
731 | 743 |
|
732 | 744 | // retry |
733 | | - // TODO(zdapeng): send more messages during backoff, then forward backoff ticker |
| 745 | + // TODO(zdapeng): send more messages during backoff, then forward backoff ticker w/ right amount |
734 | 746 | listener1.closed(Status.UNAVAILABLE, new Metadata()); |
| 747 | + fakeClock.forwardNanos(0L); |
735 | 748 |
|
736 | 749 | retriableStream.request(1); |
737 | 750 | verify(mockStream1, never()).request(1); |
|
0 commit comments