Skip to content

Commit c6fd94c

Browse files
committed
Implement shutdownNow
Fixes grpc#448
1 parent 631a9d5 commit c6fd94c

18 files changed

Lines changed: 529 additions & 19 deletions

core/src/main/java/io/grpc/ManagedChannel.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,6 @@ public abstract class ManagedChannel extends Channel {
6464
* Initiates a forceful shutdown in which preexisting and new calls are cancelled. Although
6565
* forceful, the shutdown process is still not instantaneous; {@link #isTerminated()} will likely
6666
* return {@code false} immediately after this method returns.
67-
*
68-
* <p>NOT YET IMPLEMENTED. This method currently behaves identically to shutdown().
6967
*/
7068
public abstract ManagedChannel shutdownNow();
7169

core/src/main/java/io/grpc/inprocess/InProcessTransport.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,9 @@
5252

5353
import java.io.InputStream;
5454
import java.util.ArrayDeque;
55+
import java.util.ArrayList;
5556
import java.util.HashSet;
57+
import java.util.List;
5658
import java.util.Set;
5759
import java.util.concurrent.Executor;
5860
import java.util.logging.Level;
@@ -171,6 +173,22 @@ public synchronized void shutdown() {
171173
}
172174
}
173175

176+
@Override
177+
public void shutdownNow(Status reason) {
178+
checkNotNull(reason, "reason");
179+
List<InProcessStream> streamsCopy;
180+
synchronized (this) {
181+
shutdown();
182+
if (terminated) {
183+
return;
184+
}
185+
streamsCopy = new ArrayList<InProcessStream>(streams);
186+
}
187+
for (InProcessStream stream : streamsCopy) {
188+
stream.clientStream.cancel(reason);
189+
}
190+
}
191+
174192
@Override
175193
public String toString() {
176194
return getLogId() + "(" + name + ")";
@@ -460,6 +478,7 @@ public synchronized boolean isReady() {
460478
return serverRequested > 0;
461479
}
462480

481+
// Must be thread-safe for shutdownNow()
463482
@Override
464483
public void cancel(Status reason) {
465484
if (!internalCancel(stripCause(reason))) {

core/src/main/java/io/grpc/internal/DelayedClientTransport.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ public void shutdown() {
155155
* Shuts down this transport and cancels all streams that it owns, hence immediately terminates
156156
* this transport.
157157
*/
158+
@Override
158159
public void shutdownNow(Status status) {
159160
shutdown();
160161
Collection<PendingStream> savedPendingStreams = null;

core/src/main/java/io/grpc/internal/ManagedChannelImpl.java

Lines changed: 43 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -128,12 +128,14 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI
128128
new HashSet<DelayedClientTransport>();
129129

130130
@GuardedBy("lock")
131-
private final HashSet<OobTransportProvider<ClientTransport>> oobTransports =
132-
new HashSet<OobTransportProvider<ClientTransport>>();
131+
private final HashSet<OobTransportProviderImpl> oobTransports =
132+
new HashSet<OobTransportProviderImpl>();
133133

134134
@GuardedBy("lock")
135135
private boolean shutdown;
136136
@GuardedBy("lock")
137+
private boolean shutdownNowed;
138+
@GuardedBy("lock")
137139
private boolean terminated;
138140

139141
private final ClientTransportProvider transportProvider = new ClientTransportProvider() {
@@ -255,6 +257,8 @@ public ManagedChannelImpl shutdown() {
255257
ArrayList<TransportSet> transportsCopy = new ArrayList<TransportSet>();
256258
ArrayList<DelayedClientTransport> delayedTransportsCopy =
257259
new ArrayList<DelayedClientTransport>();
260+
ArrayList<OobTransportProviderImpl> oobTransportsCopy =
261+
new ArrayList<OobTransportProviderImpl>();
258262
synchronized (lock) {
259263
if (shutdown) {
260264
return this;
@@ -266,6 +270,7 @@ public ManagedChannelImpl shutdown() {
266270
if (!terminated) {
267271
transportsCopy.addAll(transports.values());
268272
delayedTransportsCopy.addAll(delayedTransports);
273+
oobTransportsCopy.addAll(oobTransports);
269274
}
270275
}
271276
loadBalancer.shutdown();
@@ -276,7 +281,7 @@ public ManagedChannelImpl shutdown() {
276281
for (DelayedClientTransport transport : delayedTransportsCopy) {
277282
transport.shutdown();
278283
}
279-
for (OobTransportProvider<ClientTransport> provider : oobTransports) {
284+
for (OobTransportProviderImpl provider : oobTransportsCopy) {
280285
provider.close();
281286
}
282287
if (log.isLoggable(Level.FINE)) {
@@ -289,14 +294,39 @@ public ManagedChannelImpl shutdown() {
289294
* Initiates a forceful shutdown in which preexisting and new calls are cancelled. Although
290295
* forceful, the shutdown process is still not instantaneous; {@link #isTerminated()} will likely
291296
* return {@code false} immediately after this method returns.
292-
*
293-
* <p>NOT YET IMPLEMENTED. This method currently behaves identically to shutdown().
294297
*/
295-
// TODO(ejona86): cancel preexisting calls.
296298
@Override
297299
public ManagedChannelImpl shutdownNow() {
300+
synchronized (lock) {
301+
// Short-circuiting not strictly necessary, but prevents transports from needing to handle
302+
// multiple shutdownNow invocations.
303+
if (shutdownNowed) {
304+
return this;
305+
}
306+
shutdownNowed = true;
307+
}
298308
shutdown();
299-
// TODO(zhangkun): also call shutdownNow() on oobTransports.
309+
List<TransportSet> transportsCopy;
310+
List<DelayedClientTransport> delayedTransportsCopy;
311+
List<OobTransportProviderImpl> oobTransportsCopy;
312+
synchronized (lock) {
313+
transportsCopy = new ArrayList<TransportSet>(transports.values());
314+
delayedTransportsCopy = new ArrayList<DelayedClientTransport>(delayedTransports);
315+
oobTransportsCopy = new ArrayList<OobTransportProviderImpl>(oobTransports);
316+
}
317+
if (log.isLoggable(Level.FINE)) {
318+
log.log(Level.FINE, "[{0}] Shutting down now", getLogId());
319+
}
320+
Status nowStatus = Status.UNAVAILABLE.withDescription("Channel shutdownNow invoked");
321+
for (TransportSet ts : transportsCopy) {
322+
ts.shutdownNow(nowStatus);
323+
}
324+
for (DelayedClientTransport transport : delayedTransportsCopy) {
325+
transport.shutdownNow(nowStatus);
326+
}
327+
for (OobTransportProviderImpl provider : oobTransportsCopy) {
328+
provider.shutdownNow(nowStatus);
329+
}
300330
return this;
301331
}
302332

@@ -549,5 +579,11 @@ public void close() {
549579
transportSet.shutdown();
550580
}
551581
}
582+
583+
void shutdownNow(Status reason) {
584+
if (transportSet != null) {
585+
transportSet.shutdownNow(reason);
586+
}
587+
}
552588
}
553589
}

core/src/main/java/io/grpc/internal/ManagedClientTransport.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,12 @@ public interface ManagedClientTransport extends ClientTransport, WithLogId {
6868
*/
6969
void shutdown();
7070

71+
/**
72+
* Initiates a forceful shutdown in which preexisting and new calls are closed. Existing calls
73+
* should be closed with the provided {@code reason}.
74+
*/
75+
void shutdownNow(Status reason);
76+
7177
/**
7278
* Receives notifications for the transport life-cycle events. Implementation does not need to be
7379
* thread-safe, so notifications must be properly synchronized externally.

core/src/main/java/io/grpc/internal/ServerImpl.java

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,10 @@ public final class ServerImpl extends io.grpc.Server {
8484
private final HandlerRegistry fallbackRegistry;
8585
private boolean started;
8686
private boolean shutdown;
87+
/** non-{@code null} if immediate shutdown has been requested. */
88+
private Status shutdownNowStatus;
89+
/** {@code true} if ServerListenerImpl.serverShutdown() was called. */
90+
private boolean serverShutdownCallbackInvoked;
8791
private boolean terminated;
8892
/** Service encapsulating something similar to an accept() socket. */
8993
private final InternalServer transportServer;
@@ -179,13 +183,31 @@ public ServerImpl shutdown() {
179183
return this;
180184
}
181185

182-
/**
183-
* NOT YET IMPLEMENTED. This method currently behaves identically to shutdown().
184-
*/
185-
// TODO(ejona86): cancel preexisting calls.
186186
@Override
187187
public ServerImpl shutdownNow() {
188188
shutdown();
189+
Collection<ServerTransport> transportsCopy;
190+
Status nowStatus = Status.UNAVAILABLE.withDescription("Server shutdownNow invoked");
191+
boolean savedServerShutdownCallbackInvoked;
192+
synchronized (lock) {
193+
// Short-circuiting not strictly necessary, but prevents transports from needing to handle
194+
// multiple shutdownNow invocations if shutdownNow is called multiple times.
195+
if (shutdownNowStatus != null) {
196+
return this;
197+
}
198+
shutdownNowStatus = nowStatus;
199+
transportsCopy = new ArrayList<ServerTransport>(transports);
200+
savedServerShutdownCallbackInvoked = serverShutdownCallbackInvoked;
201+
}
202+
// Short-circuiting not strictly necessary, but prevents transports from needing to handle
203+
// multiple shutdownNow invocations, between here and the serverShutdown callback.
204+
if (serverShutdownCallbackInvoked) {
205+
// Have to call shutdownNow, because serverShutdown callback only called shutdown, not
206+
// shutdownNow
207+
for (ServerTransport transport : transportsCopy) {
208+
transport.shutdownNow(nowStatus);
209+
}
210+
}
189211
return this;
190212
}
191213

@@ -265,13 +287,20 @@ public ServerTransportListener transportCreated(ServerTransport transport) {
265287
@Override
266288
public void serverShutdown() {
267289
ArrayList<ServerTransport> copiedTransports;
290+
Status shutdownNowStatusCopy;
268291
synchronized (lock) {
269292
// transports collection can be modified during shutdown(), even if we hold the lock, due
270293
// to reentrancy.
271294
copiedTransports = new ArrayList<ServerTransport>(transports);
295+
shutdownNowStatusCopy = shutdownNowStatus;
296+
serverShutdownCallbackInvoked = true;
272297
}
273298
for (ServerTransport transport : copiedTransports) {
274-
transport.shutdown();
299+
if (shutdownNowStatusCopy == null) {
300+
transport.shutdown();
301+
} else {
302+
transport.shutdownNow(shutdownNowStatusCopy);
303+
}
275304
}
276305
synchronized (lock) {
277306
transportServerTerminated = true;

core/src/main/java/io/grpc/internal/ServerTransport.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131

3232
package io.grpc.internal;
3333

34+
import io.grpc.Status;
35+
3436
/** An inbound connection. */
3537
public interface ServerTransport {
3638
/**
@@ -39,4 +41,10 @@ public interface ServerTransport {
3941
* be processed on a separate thread. May only be called once.
4042
*/
4143
void shutdown();
44+
45+
/**
46+
* Initiates a forceful shutdown in which preexisting and new calls are closed. Existing calls
47+
* should be closed with the provided {@code reason}.
48+
*/
49+
void shutdownNow(Status reason);
4250
}

core/src/main/java/io/grpc/internal/TransportSet.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,17 @@ final void shutdown() {
292292
}
293293
}
294294

295+
void shutdownNow(Status reason) {
296+
shutdown();
297+
Collection<ManagedClientTransport> transportsCopy;
298+
synchronized (lock) {
299+
transportsCopy = new ArrayList<ManagedClientTransport>(transports);
300+
}
301+
for (ManagedClientTransport transport : transportsCopy) {
302+
transport.shutdownNow(reason);
303+
}
304+
}
305+
295306
@GuardedBy("lock")
296307
private void cancelReconnectTask() {
297308
if (reconnectTask != null) {

core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,113 @@ public void twoCallsAndGracefulShutdown() {
264264
verifyNoMoreInteractions(mockStream);
265265
}
266266

267+
@Test
268+
public void callAndShutdownNow() {
269+
FakeNameResolverFactory nameResolverFactory = new FakeNameResolverFactory(true);
270+
ManagedChannel channel = createChannel(nameResolverFactory, NO_INTERCEPTOR);
271+
verifyNoMoreInteractions(mockTransportFactory);
272+
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
273+
verifyNoMoreInteractions(mockTransportFactory);
274+
275+
// Create transport and call
276+
ClientStream mockStream = mock(ClientStream.class);
277+
Metadata headers = new Metadata();
278+
when(mockTransportFactory.newClientTransport(
279+
any(SocketAddress.class), any(String.class), any(String.class)))
280+
.thenReturn(mockTransport);
281+
when(mockTransport.newStream(same(method), same(headers))).thenReturn(mockStream);
282+
call.start(mockCallListener, headers);
283+
verify(mockTransportFactory, timeout(1000))
284+
.newClientTransport(same(socketAddress), eq(authority), any(String.class));
285+
verify(mockTransport, timeout(1000)).start(transportListenerCaptor.capture());
286+
ManagedClientTransport.Listener transportListener = transportListenerCaptor.getValue();
287+
transportListener.transportReady();
288+
verify(mockTransport, timeout(1000)).newStream(same(method), same(headers));
289+
verify(mockStream, timeout(1000)).start(streamListenerCaptor.capture());
290+
verify(mockStream).setCompressor(isA(Compressor.class));
291+
// Depends on how quick the real transport is created, ClientCallImpl may start on mockStream
292+
// directly, or on a DelayedStream which later starts mockStream. In the former case,
293+
// setMessageCompression() is not called. In the latter case, it is (in
294+
// DelayedStream.startStream()).
295+
verify(mockStream, atMost(1)).setMessageCompression(anyBoolean());
296+
ClientStreamListener streamListener = streamListenerCaptor.getValue();
297+
298+
// ShutdownNow
299+
channel.shutdownNow();
300+
assertTrue(channel.isShutdown());
301+
assertFalse(channel.isTerminated());
302+
// ShutdownNow may or may not invoke shutdown. Ideally it wouldn't, but it doesn't matter much
303+
// either way.
304+
verify(mockTransport, atMost(1)).shutdown();
305+
verify(mockTransport).shutdownNow(any(Status.class));
306+
assertEquals(1, nameResolverFactory.resolvers.size());
307+
assertTrue(nameResolverFactory.resolvers.get(0).shutdown);
308+
assertEquals(1, loadBalancerFactory.balancers.size());
309+
verify(loadBalancerFactory.balancers.get(0)).shutdown();
310+
311+
// Further calls should fail without going to the transport
312+
ClientCall<String, Integer> call3 = channel.newCall(method, CallOptions.DEFAULT);
313+
call3.start(mockCallListener3, new Metadata());
314+
verify(mockCallListener3, timeout(1000))
315+
.onClose(statusCaptor.capture(), any(Metadata.class));
316+
assertSame(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode());
317+
318+
// Finish shutdown
319+
transportListener.transportShutdown(Status.CANCELLED);
320+
assertFalse(channel.isTerminated());
321+
Metadata trailers = new Metadata();
322+
streamListener.closed(Status.CANCELLED, trailers);
323+
verify(mockCallListener, timeout(1000)).onClose(Status.CANCELLED, trailers);
324+
assertFalse(channel.isTerminated());
325+
326+
transportListener.transportTerminated();
327+
assertTrue(channel.isTerminated());
328+
329+
verify(mockTransportFactory).close();
330+
verifyNoMoreInteractions(mockTransportFactory);
331+
verify(mockTransport, atLeast(0)).getLogId();
332+
verifyNoMoreInteractions(mockTransport);
333+
verifyNoMoreInteractions(mockStream);
334+
}
335+
336+
/** Make sure shutdownNow() after shutdown() has an effect. */
337+
@Test
338+
public void callAndShutdownAndShutdownNow() {
339+
ManagedChannel channel = createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
340+
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
341+
342+
// Create transport and call
343+
ClientStream mockStream = mock(ClientStream.class);
344+
Metadata headers = new Metadata();
345+
when(mockTransport.newStream(same(method), same(headers))).thenReturn(mockStream);
346+
call.start(mockCallListener, headers);
347+
verify(mockTransport, timeout(1000)).start(transportListenerCaptor.capture());
348+
ManagedClientTransport.Listener transportListener = transportListenerCaptor.getValue();
349+
transportListener.transportReady();
350+
verify(mockStream, timeout(1000)).start(streamListenerCaptor.capture());
351+
ClientStreamListener streamListener = streamListenerCaptor.getValue();
352+
353+
// ShutdownNow
354+
channel.shutdown();
355+
channel.shutdownNow();
356+
// ShutdownNow may or may not invoke shutdown. Ideally it wouldn't, but it doesn't matter much
357+
// either way.
358+
verify(mockTransport, atMost(2)).shutdown();
359+
verify(mockTransport).shutdownNow(any(Status.class));
360+
361+
// Finish shutdown
362+
transportListener.transportShutdown(Status.CANCELLED);
363+
assertFalse(channel.isTerminated());
364+
Metadata trailers = new Metadata();
365+
streamListener.closed(Status.CANCELLED, trailers);
366+
verify(mockCallListener, timeout(1000)).onClose(Status.CANCELLED, trailers);
367+
assertFalse(channel.isTerminated());
368+
369+
transportListener.transportTerminated();
370+
assertTrue(channel.isTerminated());
371+
}
372+
373+
267374
@Test
268375
public void interceptor() throws Exception {
269376
final AtomicLong atomic = new AtomicLong();

0 commit comments

Comments
 (0)