2323import static org .junit .Assert .fail ;
2424import static org .mockito .Matchers .any ;
2525import static org .mockito .Matchers .isA ;
26+ import static org .mockito .Matchers .same ;
2627import static org .mockito .Mockito .doThrow ;
28+ import static org .mockito .Mockito .never ;
29+ import static org .mockito .Mockito .times ;
2730import static org .mockito .Mockito .verify ;
2831import static org .mockito .Mockito .when ;
2932
4851import org .junit .runner .RunWith ;
4952import org .junit .runners .JUnit4 ;
5053import org .mockito .ArgumentCaptor ;
51- import org .mockito .Captor ;
5254import org .mockito .Mock ;
53- import org .mockito .Mockito ;
5455import org .mockito .MockitoAnnotations ;
5556
5657@ RunWith (JUnit4 .class )
5758public class ServerCallImplTest {
5859 @ Rule public final ExpectedException thrown = ExpectedException .none ();
5960 @ Mock private ServerStream stream ;
6061 @ Mock private ServerCall .Listener <Long > callListener ;
61- @ Captor private ArgumentCaptor <Status > statusCaptor ;
6262
6363 private ServerCallImpl <Long , Long > call ;
6464 private Context .CancellableContext context ;
6565
66- private final MethodDescriptor <Long , Long > method = MethodDescriptor .<Long , Long >newBuilder ()
67- .setType (MethodType .UNARY )
68- .setFullMethodName ("/service/method" )
69- .setRequestMarshaller (new LongMarshaller ())
70- .setResponseMarshaller (new LongMarshaller ())
71- .build ();
66+ private static final MethodDescriptor <Long , Long > UNARY_METHOD =
67+ MethodDescriptor .<Long , Long >newBuilder ()
68+ .setType (MethodType .UNARY )
69+ .setFullMethodName ("/service/method" )
70+ .setRequestMarshaller (new LongMarshaller ())
71+ .setResponseMarshaller (new LongMarshaller ())
72+ .build ();
73+
74+ private static final MethodDescriptor <Long , Long > CLIENT_STREAMING_METHOD =
75+ MethodDescriptor .<Long , Long >newBuilder ()
76+ .setType (MethodType .UNARY )
77+ .setFullMethodName ("/service/method" )
78+ .setRequestMarshaller (new LongMarshaller ())
79+ .setResponseMarshaller (new LongMarshaller ())
80+ .build ();
7281
7382 private final Metadata requestHeaders = new Metadata ();
7483
7584 @ Before
7685 public void setUp () {
7786 MockitoAnnotations .initMocks (this );
7887 context = Context .ROOT .withCancellation ();
79- call = new ServerCallImpl <Long , Long >(stream , method , requestHeaders , context ,
88+ call = new ServerCallImpl <Long , Long >(stream , UNARY_METHOD , requestHeaders , context ,
8089 DecompressorRegistry .getDefaultInstance (), CompressorRegistry .getDefaultInstance ());
8190 }
8291
@@ -158,6 +167,114 @@ public void sendMessage_closesOnFailure() {
158167 verify (stream ).close (isA (Status .class ), isA (Metadata .class ));
159168 }
160169
170+ @ Test
171+ public void sendMessage_serverSendsOne_closeOnSecondCall_unary () {
172+ sendMessage_serverSendsOne_closeOnSecondCall (UNARY_METHOD );
173+ }
174+
175+ @ Test
176+ public void sendMessage_serverSendsOne_closeOnSecondCall_clientStreaming () {
177+ sendMessage_serverSendsOne_closeOnSecondCall (CLIENT_STREAMING_METHOD );
178+ }
179+
180+ private void sendMessage_serverSendsOne_closeOnSecondCall (
181+ MethodDescriptor <Long , Long > method ) {
182+ ServerCallImpl <Long , Long > serverCall = new ServerCallImpl <Long , Long >(
183+ stream ,
184+ method ,
185+ requestHeaders ,
186+ context ,
187+ DecompressorRegistry .getDefaultInstance (),
188+ CompressorRegistry .getDefaultInstance ());
189+ serverCall .sendHeaders (new Metadata ());
190+ serverCall .sendMessage (1L );
191+ verify (stream , times (1 )).writeMessage (any (InputStream .class ));
192+ verify (stream , never ()).close (any (Status .class ), any (Metadata .class ));
193+
194+ // trying to send a second message causes gRPC to close the underlying stream
195+ serverCall .sendMessage (1L );
196+ verify (stream , times (1 )).writeMessage (any (InputStream .class ));
197+ ArgumentCaptor <Status > statusCaptor = ArgumentCaptor .forClass (Status .class );
198+ ArgumentCaptor <Metadata > metadataCaptor = ArgumentCaptor .forClass (Metadata .class );
199+ verify (stream , times (1 )).close (statusCaptor .capture (), metadataCaptor .capture ());
200+ assertEquals (Status .Code .INTERNAL , statusCaptor .getValue ().getCode ());
201+ assertEquals (ServerCallImpl .TOO_MANY_RESPONSES , statusCaptor .getValue ().getDescription ());
202+ assertTrue (metadataCaptor .getValue ().keys ().isEmpty ());
203+ }
204+
205+ @ Test
206+ public void sendMessage_serverSendsOne_closeOnSecondCall_appRunToCompletion_unary () {
207+ sendMessage_serverSendsOne_closeOnSecondCall_appRunToCompletion (UNARY_METHOD );
208+ }
209+
210+ @ Test
211+ public void sendMessage_serverSendsOne_closeOnSecondCall_appRunToCompletion_clientStreaming () {
212+ sendMessage_serverSendsOne_closeOnSecondCall_appRunToCompletion (CLIENT_STREAMING_METHOD );
213+ }
214+
215+ private void sendMessage_serverSendsOne_closeOnSecondCall_appRunToCompletion (
216+ MethodDescriptor <Long , Long > method ) {
217+ ServerCallImpl <Long , Long > serverCall = new ServerCallImpl <Long , Long >(
218+ stream ,
219+ method ,
220+ requestHeaders ,
221+ context ,
222+ DecompressorRegistry .getDefaultInstance (),
223+ CompressorRegistry .getDefaultInstance ());
224+ serverCall .sendHeaders (new Metadata ());
225+ serverCall .sendMessage (1L );
226+ serverCall .sendMessage (1L );
227+ verify (stream , times (1 )).writeMessage (any (InputStream .class ));
228+ verify (stream , times (1 )).close (any (Status .class ), any (Metadata .class ));
229+
230+ // App runs to completion but everything is ignored
231+ serverCall .sendMessage (1L );
232+ serverCall .close (Status .OK , new Metadata ());
233+ try {
234+ serverCall .close (Status .OK , new Metadata ());
235+ fail ("calling a second time should still cause an error" );
236+ } catch (IllegalStateException expected ) {
237+ // noop
238+ }
239+ }
240+
241+ @ Test
242+ public void serverSendsOne_okFailsOnMissingResponse_unary () {
243+ serverSendsOne_okFailsOnMissingResponse (UNARY_METHOD );
244+ }
245+
246+ @ Test
247+ public void serverSendsOne_okFailsOnMissingResponse_clientStreaming () {
248+ serverSendsOne_okFailsOnMissingResponse (CLIENT_STREAMING_METHOD );
249+ }
250+
251+ private void serverSendsOne_okFailsOnMissingResponse (
252+ MethodDescriptor <Long , Long > method ) {
253+ ServerCallImpl <Long , Long > serverCall = new ServerCallImpl <Long , Long >(
254+ stream ,
255+ method ,
256+ requestHeaders ,
257+ context ,
258+ DecompressorRegistry .getDefaultInstance (),
259+ CompressorRegistry .getDefaultInstance ());
260+ serverCall .close (Status .OK , new Metadata ());
261+ ArgumentCaptor <Status > statusCaptor = ArgumentCaptor .forClass (Status .class );
262+ ArgumentCaptor <Metadata > metadataCaptor = ArgumentCaptor .forClass (Metadata .class );
263+ verify (stream , times (1 )).close (statusCaptor .capture (), metadataCaptor .capture ());
264+ assertEquals (Status .Code .INTERNAL , statusCaptor .getValue ().getCode ());
265+ assertEquals (ServerCallImpl .MISSING_RESPONSE , statusCaptor .getValue ().getDescription ());
266+ assertTrue (metadataCaptor .getValue ().keys ().isEmpty ());
267+ }
268+
269+ @ Test
270+ public void serverSendsOne_canErrorWithoutResponse () {
271+ final String description = "test description" ;
272+ final Status status = Status .RESOURCE_EXHAUSTED .withDescription (description );
273+ final Metadata metadata = new Metadata ();
274+ call .close (status , metadata );
275+ verify (stream , times (1 )).close (same (status ), same (metadata ));
276+ }
277+
161278 @ Test
162279 public void isReady () {
163280 when (stream .isReady ()).thenReturn (true );
@@ -260,34 +377,20 @@ public void streamListener_onReady_onlyOnce() {
260377 public void streamListener_messageRead () {
261378 ServerStreamListenerImpl <Long > streamListener =
262379 new ServerCallImpl .ServerStreamListenerImpl <Long >(call , callListener , context );
263- streamListener .messageRead (method .streamRequest (1234L ));
264-
265- verify (callListener ).onMessage (1234L );
266- }
267-
268- @ Test
269- public void streamListener_messageRead_unaryFailsOnMultiple () {
270- ServerStreamListenerImpl <Long > streamListener =
271- new ServerCallImpl .ServerStreamListenerImpl <Long >(call , callListener , context );
272- streamListener .messageRead (method .streamRequest (1234L ));
273- streamListener .messageRead (method .streamRequest (1234L ));
380+ streamListener .messageRead (UNARY_METHOD .streamRequest (1234L ));
274381
275- // Makes sure this was only called once.
276382 verify (callListener ).onMessage (1234L );
277-
278- verify (stream ).close (statusCaptor .capture (), Mockito .isA (Metadata .class ));
279- assertEquals (Status .Code .INTERNAL , statusCaptor .getValue ().getCode ());
280383 }
281384
282385 @ Test
283386 public void streamListener_messageRead_onlyOnce () {
284387 ServerStreamListenerImpl <Long > streamListener =
285388 new ServerCallImpl .ServerStreamListenerImpl <Long >(call , callListener , context );
286- streamListener .messageRead (method .streamRequest (1234L ));
389+ streamListener .messageRead (UNARY_METHOD .streamRequest (1234L ));
287390 // canceling the call should short circuit future halfClosed() calls.
288391 streamListener .closed (Status .CANCELLED );
289392
290- streamListener .messageRead (method .streamRequest (1234L ));
393+ streamListener .messageRead (UNARY_METHOD .streamRequest (1234L ));
291394
292395 verify (callListener ).onMessage (1234L );
293396 }
@@ -300,7 +403,7 @@ public void streamListener_unexpectedRuntimeException() {
300403 .when (callListener )
301404 .onMessage (any (Long .class ));
302405
303- InputStream inputStream = method .streamRequest (1234L );
406+ InputStream inputStream = UNARY_METHOD .streamRequest (1234L );
304407
305408 thrown .expect (RuntimeException .class );
306409 thrown .expectMessage ("unexpected exception" );
0 commit comments