Skip to content

Commit 38b950f

Browse files
committed
core: add transportInUse() to transport listener.
A transport is "in use" iff number of streams > 0. In following changes the channel will use this information when deciding whether it should transit to the IDLE mode (grpc#1276).
1 parent 33cd6be commit 38b950f

File tree

10 files changed

+149
-6
lines changed

10 files changed

+149
-6
lines changed

core/src/main/java/io/grpc/inprocess/InProcessTransport.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -247,8 +247,11 @@ private InProcessStream(MethodDescriptor<?, ?> method, Metadata headers) {
247247
private void streamClosed() {
248248
synchronized (InProcessTransport.this) {
249249
boolean justRemovedAnElement = streams.remove(this);
250-
if (shutdown && streams.isEmpty() && justRemovedAnElement) {
251-
notifyTerminated();
250+
if (streams.isEmpty() && justRemovedAnElement) {
251+
clientTransportListener.transportInUse(false);
252+
if (shutdown) {
253+
notifyTerminated();
254+
}
252255
}
253256
}
254257
}
@@ -548,6 +551,9 @@ public void start(ClientStreamListener listener) {
548551
serverStream, method.getFullMethodName(), headers);
549552
clientStream.setListener(serverStreamListener);
550553
streams.add(InProcessTransport.InProcessStream.this);
554+
if (streams.size() == 1) {
555+
clientTransportListener.transportInUse(true);
556+
}
551557
}
552558
}
553559

core/src/main/java/io/grpc/internal/DelayedClientTransport.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,9 @@ public ClientStream newStream(MethodDescriptor<?, ?> method, Metadata headers, C
124124
}
125125
PendingStream pendingStream = new PendingStream(method, headers, callOptions);
126126
pendingStreams.add(pendingStream);
127+
if (pendingStreams.size() == 1) {
128+
listener.transportInUse(true);
129+
}
127130
return pendingStream;
128131
}
129132
}
@@ -257,6 +260,15 @@ public void setTransportSupplier(final Supplier<ClientTransport> supplier) {
257260
for (final PendingStream stream : savedPendingStreams) {
258261
stream.createRealStream(supplier.get());
259262
}
263+
// TODO(zhangkun83): some transports (e.g., netty) may have a short delay between
264+
// stream.start() and transportInUse(true). If netty's transportInUse(true) is called
265+
// after the delayed transport's transportInUse(false), the channel may have a brief
266+
// period where all transports are not in-use, and may go to IDLE mode unexpectedly if
267+
// the IDLE timeout is shorter (e.g., 0) than that brief period. Maybe we should
268+
// have a minimum IDLE timeout?
269+
synchronized (lock) {
270+
listener.transportInUse(false);
271+
}
260272
}
261273
});
262274
}
@@ -375,10 +387,13 @@ public void cancel(Status reason) {
375387
super.cancel(reason);
376388
synchronized (lock) {
377389
if (pendingStreams != null) {
378-
pendingStreams.remove(this);
379-
if (shutdown && pendingStreams.isEmpty()) {
380-
pendingStreams = null;
381-
listener.transportTerminated();
390+
boolean justRemovedAnElement = pendingStreams.remove(this);
391+
if (pendingStreams.isEmpty() && justRemovedAnElement) {
392+
listener.transportInUse(false);
393+
if (shutdown) {
394+
pendingStreams = null;
395+
listener.transportTerminated();
396+
}
382397
}
383398
}
384399
}

core/src/main/java/io/grpc/internal/ManagedChannelImpl.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -521,6 +521,8 @@ private class InterimTransportImpl implements InterimTransport<ClientTransport>
521521
}
522522

523523
@Override public void transportReady() {}
524+
525+
@Override public void transportInUse(boolean inUse) {}
524526
});
525527
boolean savedShutdown;
526528
synchronized (lock) {

core/src/main/java/io/grpc/internal/ManagedClientTransport.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,5 +106,11 @@ interface Listener {
106106
* called at most once.
107107
*/
108108
void transportReady();
109+
110+
/**
111+
* Called whenever the transport's in-use state has changed. A transport is in-use when it has
112+
* at least one stream.
113+
*/
114+
void transportInUse(boolean inUse);
109115
}
110116
}

core/src/main/java/io/grpc/internal/TransportSet.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,9 @@ public BaseTransportListener(ManagedClientTransport transport) {
331331
@Override
332332
public void transportReady() {}
333333

334+
@Override
335+
public void transportInUse(boolean inUse) {}
336+
334337
@Override
335338
public void transportShutdown(Status status) {}
336339

netty/src/main/java/io/grpc/netty/ClientTransportLifecycleManager.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ final class ClientTransportLifecycleManager {
3939
private final ManagedClientTransport.Listener listener;
4040
private boolean transportReady;
4141
private boolean transportShutdown;
42+
private boolean transportInUse;
4243
/** null iff !transportShutdown. */
4344
private Status shutdownStatus;
4445
/** null iff !transportShutdown. */
@@ -67,6 +68,14 @@ public void notifyShutdown(Status s) {
6768
listener.transportShutdown(s);
6869
}
6970

71+
public void notifyInUse(boolean inUse) {
72+
if (inUse == transportInUse) {
73+
return;
74+
}
75+
transportInUse = inUse;
76+
listener.transportInUse(inUse);
77+
}
78+
7079
public void notifyTerminated(Status s) {
7180
if (transportTerminated) {
7281
return;

netty/src/main/java/io/grpc/netty/NettyClientHandler.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,18 @@ private NettyClientHandler(Http2ConnectionDecoder decoder,
177177
public void onGoAwayReceived(int lastStreamId, long errorCode, ByteBuf debugData) {
178178
goingAway(statusFromGoAway(errorCode, ByteBufUtil.getBytes(debugData)));
179179
}
180+
181+
@Override
182+
public void onStreamAdded(Http2Stream stream) {
183+
NettyClientHandler.this.lifecycleManager.notifyInUse(true);
184+
}
185+
186+
@Override
187+
public void onStreamRemoved(Http2Stream stream) {
188+
if (connection().numActiveStreams() == 0) {
189+
NettyClientHandler.this.lifecycleManager.notifyInUse(false);
190+
}
191+
}
180192
});
181193
}
182194

netty/src/main/java/io/grpc/netty/NettyClientStream.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,11 @@ public void operationComplete(ChannelFuture future) throws Exception {
116116
!method.getType().clientSendsOneMessage()).addListener(failureListener);
117117
}
118118

119+
@Override
120+
public void transportReportStatus(Status newStatus, boolean stopDelivery, Metadata trailers) {
121+
super.transportReportStatus(newStatus, stopDelivery, trailers);
122+
}
123+
119124
/**
120125
* Intended to be overriden by NettyClientTransport, which has more information about failures.
121126
* May only be called from event loop.

okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,8 @@ private static Map<ErrorCode, Status> buildErrorCodeToStatusMap() {
158158
private Http2Ping ping;
159159
@GuardedBy("lock")
160160
private boolean stopped;
161+
@GuardedBy("lock")
162+
private boolean inUse;
161163
private SSLSocketFactory sslSocketFactory;
162164
private Socket socket;
163165
@GuardedBy("lock")
@@ -269,6 +271,7 @@ void streamReadyToStart(OkHttpClientStream clientStream) {
269271
clientStream.transportReportStatus(goAwayStatus, true, new Metadata());
270272
} else if (streams.size() >= maxConcurrentStreams) {
271273
pendingStreams.add(clientStream);
274+
setInUse();
272275
} else {
273276
startStream(clientStream);
274277
}
@@ -279,6 +282,7 @@ void streamReadyToStart(OkHttpClientStream clientStream) {
279282
private void startStream(OkHttpClientStream stream) {
280283
Preconditions.checkState(stream.id() == null, "StreamId already assigned");
281284
streams.put(nextStreamId, stream);
285+
setInUse();
282286
stream.start(nextStreamId);
283287
stream.allocated();
284288
// For unary and server streaming, there will be a data frame soon, no need to flush the header.
@@ -317,6 +321,7 @@ private boolean startPendingStreams() {
317321
@GuardedBy("lock")
318322
void removePendingStream(OkHttpClientStream pendingStream) {
319323
pendingStreams.remove(pendingStream);
324+
maybeClearInUse();
320325
}
321326

322327
@Override
@@ -476,6 +481,7 @@ public void shutdownNow(Status reason) {
476481
stream.transportReportStatus(reason, true, new Metadata());
477482
}
478483
pendingStreams.clear();
484+
maybeClearInUse();
479485

480486
stopIfNecessary();
481487
}
@@ -552,6 +558,7 @@ private void startGoAway(int lastKnownStreamId, ErrorCode errorCode, Status stat
552558
stream.transportReportStatus(status, true, new Metadata());
553559
}
554560
pendingStreams.clear();
561+
maybeClearInUse();
555562

556563
stopIfNecessary();
557564
}
@@ -584,6 +591,7 @@ void finishStream(int streamId, @Nullable Status status, @Nullable ErrorCode err
584591
}
585592
if (!startPendingStreams()) {
586593
stopIfNecessary();
594+
maybeClearInUse();
587595
}
588596
}
589597
}
@@ -619,6 +627,24 @@ void stopIfNecessary() {
619627
frameWriter.close();
620628
}
621629

630+
@GuardedBy("lock")
631+
private void maybeClearInUse() {
632+
if (inUse) {
633+
if (pendingStreams.isEmpty() && streams.isEmpty()) {
634+
inUse = false;
635+
listener.transportInUse(false);
636+
}
637+
}
638+
}
639+
640+
@GuardedBy("lock")
641+
private void setInUse() {
642+
if (!inUse) {
643+
inUse = true;
644+
listener.transportInUse(true);
645+
}
646+
}
647+
622648
private Throwable getPingFailure() {
623649
synchronized (lock) {
624650
if (goAwayStatus != null) {

testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import static org.junit.Assert.fail;
4141
import static org.junit.Assume.assumeTrue;
4242
import static org.mockito.Matchers.any;
43+
import static org.mockito.Matchers.anyBoolean;
4344
import static org.mockito.Matchers.eq;
4445
import static org.mockito.Matchers.same;
4546
import static org.mockito.Mockito.doAnswer;
@@ -226,6 +227,7 @@ public void serverNotListening() {
226227
assertCodeEquals(Status.UNAVAILABLE, statusCaptor.getValue());
227228
inOrder.verify(mockClientTransportListener).transportTerminated();
228229
verify(mockClientTransportListener, never()).transportReady();
230+
verify(mockClientTransportListener, never()).transportInUse(anyBoolean());
229231
}
230232

231233
@Test
@@ -238,6 +240,7 @@ public void clientStartStop() throws Exception {
238240
inOrder.verify(mockClientTransportListener).transportShutdown(statusCaptor.capture());
239241
assertCodeEquals(Status.UNAVAILABLE, statusCaptor.getValue());
240242
inOrder.verify(mockClientTransportListener).transportTerminated();
243+
verify(mockClientTransportListener, never()).transportInUse(anyBoolean());
241244
}
242245

243246
@Test
@@ -256,6 +259,7 @@ public void clientStartAndStopOnceConnected() throws Exception {
256259
server.shutdown();
257260
assertTrue(serverListener.waitForShutdown(TIMEOUT_MS, TimeUnit.MILLISECONDS));
258261
server = null;
262+
verify(mockClientTransportListener, never()).transportInUse(anyBoolean());
259263
}
260264

261265
@Test
@@ -277,6 +281,7 @@ public void openStreamPreventsTermination() throws Exception {
277281

278282
ClientStream clientStream = client.newStream(methodDescriptor, new Metadata());
279283
clientStream.start(mockClientStreamListener);
284+
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportInUse(true);
280285
StreamCreation serverStreamCreation
281286
= serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
282287
ServerStream serverStream = serverStreamCreation.stream;
@@ -307,11 +312,13 @@ public void openStreamPreventsTermination() throws Exception {
307312
verify(mockServerStreamListener, timeout(TIMEOUT_MS)).halfClosed();
308313

309314
verify(mockClientTransportListener, never()).transportTerminated();
315+
verify(mockClientTransportListener, never()).transportInUse(false);
310316
assertFalse(serverTransportListener.isTerminated());
311317

312318
clientStream.cancel(Status.CANCELLED);
313319

314320
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated();
321+
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportInUse(false);
315322
assertTrue(serverTransportListener.waitForTermination(TIMEOUT_MS, TimeUnit.MILLISECONDS));
316323
}
317324

@@ -325,6 +332,7 @@ public void shutdownNowKillsClientStream() throws Exception {
325332

326333
ClientStream clientStream = client.newStream(methodDescriptor, new Metadata());
327334
clientStream.start(mockClientStreamListener);
335+
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportInUse(true);
328336
StreamCreation serverStreamCreation
329337
= serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
330338
ServerStream serverStream = serverStreamCreation.stream;
@@ -336,6 +344,7 @@ public void shutdownNowKillsClientStream() throws Exception {
336344

337345
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportShutdown(any(Status.class));
338346
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated();
347+
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportInUse(false);
339348
assertTrue(serverTransportListener.waitForTermination(TIMEOUT_MS, TimeUnit.MILLISECONDS));
340349
assertTrue(serverTransportListener.isTerminated());
341350

@@ -354,6 +363,7 @@ public void shutdownNowKillsServerStream() throws Exception {
354363

355364
ClientStream clientStream = client.newStream(methodDescriptor, new Metadata());
356365
clientStream.start(mockClientStreamListener);
366+
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportInUse(true);
357367
StreamCreation serverStreamCreation
358368
= serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
359369
ServerStream serverStream = serverStreamCreation.stream;
@@ -364,6 +374,7 @@ public void shutdownNowKillsServerStream() throws Exception {
364374

365375
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportShutdown(any(Status.class));
366376
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated();
377+
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportInUse(false);
367378
assertTrue(serverTransportListener.waitForTermination(TIMEOUT_MS, TimeUnit.MILLISECONDS));
368379
assertTrue(serverTransportListener.isTerminated());
369380

@@ -387,6 +398,7 @@ public void ping() throws Exception {
387398
assumeTrue(false);
388399
}
389400
verify(mockPingCallback, timeout(TIMEOUT_MS)).onSuccess(Matchers.anyInt());
401+
verify(mockClientTransportListener, never()).transportInUse(anyBoolean());
390402
}
391403

392404
@Test
@@ -470,9 +482,56 @@ public void newStream_afterTermination() throws Exception {
470482
stream.start(mockClientStreamListener);
471483
verify(mockClientStreamListener, timeout(TIMEOUT_MS))
472484
.closed(statusCaptor.capture(), any(Metadata.class));
485+
verify(mockClientTransportListener, never()).transportInUse(anyBoolean());
473486
assertCodeEquals(Status.UNAVAILABLE, statusCaptor.getValue());
474487
}
475488

489+
@Test
490+
public void transportInUse_normalClose() throws Exception {
491+
server.start(serverListener);
492+
client.start(mockClientTransportListener);
493+
ClientStream stream1 = client.newStream(methodDescriptor, new Metadata());
494+
stream1.start(mockClientStreamListener);
495+
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportInUse(true);
496+
MockServerTransportListener serverTransportListener
497+
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
498+
StreamCreation serverStreamCreation1
499+
= serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
500+
ClientStream stream2 = client.newStream(methodDescriptor, new Metadata());
501+
stream2.start(mockClientStreamListener);
502+
StreamCreation serverStreamCreation2
503+
= serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
504+
505+
stream1.halfClose();
506+
serverStreamCreation1.stream.close(Status.OK, new Metadata());
507+
stream2.halfClose();
508+
verify(mockClientTransportListener, never()).transportInUse(false);
509+
serverStreamCreation2.stream.close(Status.OK, new Metadata());
510+
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportInUse(false);
511+
// Verify that the callback has been called only once for true and false respectively
512+
verify(mockClientTransportListener).transportInUse(true);
513+
verify(mockClientTransportListener).transportInUse(false);
514+
}
515+
516+
@Test
517+
public void transportInUse_clientCancel() throws Exception {
518+
server.start(serverListener);
519+
client.start(mockClientTransportListener);
520+
ClientStream stream1 = client.newStream(methodDescriptor, new Metadata());
521+
stream1.start(mockClientStreamListener);
522+
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportInUse(true);
523+
ClientStream stream2 = client.newStream(methodDescriptor, new Metadata());
524+
stream2.start(mockClientStreamListener);
525+
526+
stream1.cancel(Status.CANCELLED);
527+
verify(mockClientTransportListener, never()).transportInUse(false);
528+
stream2.cancel(Status.CANCELLED);
529+
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportInUse(false);
530+
// Verify that the callback has been called only once for true and false respectively
531+
verify(mockClientTransportListener).transportInUse(true);
532+
verify(mockClientTransportListener).transportInUse(false);
533+
}
534+
476535
@Test
477536
public void basicStream() throws Exception {
478537
server.start(serverListener);

0 commit comments

Comments
 (0)