Skip to content

Commit 7d464fc

Browse files
committed
inprocess: Avoid creating unnecessary threads
Implementations of ManagedClientTransport.start() are restricted from calling the passed listener until start() returns, in order to avoid reentrency problems with locks. For most transports this isn't a problem, because they need additional threads anyway. InProcess uses no additional threads naturally so ends up needing a thread just to notifyReady. Now transports can just return a Runnable that can be run after locks are dropped. This was originally intended to be a performance optimization, but the thread also causes nondeterminism because RPCs are delayed until notifyReady is called. So avoiding the thread reduces needless fakes during tests.
1 parent a8700a7 commit 7d464fc

File tree

8 files changed

+85
-59
lines changed

8 files changed

+85
-59
lines changed

core/src/main/java/io/grpc/inprocess/InProcessTransport.java

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import java.util.logging.Level;
6363
import java.util.logging.Logger;
6464

65+
import javax.annotation.CheckReturnValue;
6566
import javax.annotation.concurrent.GuardedBy;
6667
import javax.annotation.concurrent.ThreadSafe;
6768

@@ -89,8 +90,9 @@ public InProcessTransport(String name) {
8990
.build();
9091
}
9192

93+
@CheckReturnValue
9294
@Override
93-
public synchronized void start(ManagedClientTransport.Listener listener) {
95+
public synchronized Runnable start(ManagedClientTransport.Listener listener) {
9496
this.clientTransportListener = listener;
9597
InProcessServer server = InProcessServer.findServer(name);
9698
if (server != null) {
@@ -99,31 +101,24 @@ public synchronized void start(ManagedClientTransport.Listener listener) {
99101
if (serverTransportListener == null) {
100102
shutdownStatus = Status.UNAVAILABLE.withDescription("Could not find server: " + name);
101103
final Status localShutdownStatus = shutdownStatus;
102-
Thread shutdownThread = new Thread(new Runnable() {
104+
return new Runnable() {
103105
@Override
104106
public void run() {
105107
synchronized (InProcessTransport.this) {
106108
notifyShutdown(localShutdownStatus);
107109
notifyTerminated();
108110
}
109111
}
110-
});
111-
shutdownThread.setDaemon(true);
112-
shutdownThread.setName("grpc-inprocess-shutdown");
113-
shutdownThread.start();
114-
return;
112+
};
115113
}
116-
Thread readyThread = new Thread(new Runnable() {
114+
return new Runnable() {
117115
@Override
118116
public void run() {
119117
synchronized (InProcessTransport.this) {
120118
clientTransportListener.transportReady();
121119
}
122120
}
123-
});
124-
readyThread.setDaemon(true);
125-
readyThread.setName("grpc-inprocess-ready");
126-
readyThread.start();
121+
};
127122
}
128123

129124
@Override

core/src/main/java/io/grpc/internal/DelayedClientTransport.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,9 @@ class DelayedClientTransport implements ManagedClientTransport {
9696
}
9797

9898
@Override
99-
public void start(Listener listener) {
99+
public Runnable start(Listener listener) {
100100
this.listener = Preconditions.checkNotNull(listener, "listener");
101+
return null;
101102
}
102103

103104
/**

core/src/main/java/io/grpc/internal/ForwardingConnectionClientTransport.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@
4141

4242
abstract class ForwardingConnectionClientTransport implements ConnectionClientTransport {
4343
@Override
44-
public void start(Listener listener) {
45-
delegate().start(listener);
44+
public Runnable start(Listener listener) {
45+
return delegate().start(listener);
4646
}
4747

4848
@Override

core/src/main/java/io/grpc/internal/ManagedClientTransport.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333

3434
import io.grpc.Status;
3535

36+
import javax.annotation.CheckReturnValue;
37+
import javax.annotation.Nullable;
3638
import javax.annotation.concurrent.ThreadSafe;
3739

3840
/**
@@ -53,12 +55,16 @@ public interface ManagedClientTransport extends ClientTransport, WithLogId {
5355
* Starts transport. This method may only be called once.
5456
*
5557
* <p>Implementations must not call {@code listener} from within {@link #start}; implementations
56-
* are expected to notify listener on a separate thread. This method should not throw any
57-
* exceptions.
58+
* are expected to notify listener on a separate thread or when the returned {@link Runnable} is
59+
* run. This method and the returned {@code Runnable} should not throw any exceptions.
5860
*
5961
* @param listener non-{@code null} listener of transport events
62+
* @return a {@link Runnable} that is executed after-the-fact by the original caller, typically
63+
* after locks are released
6064
*/
61-
void start(Listener listener);
65+
@CheckReturnValue
66+
@Nullable
67+
Runnable start(Listener listener);
6268

6369
/**
6470
* Initiates an orderly shutdown of the transport. Existing streams continue, but the transport

core/src/main/java/io/grpc/internal/TransportSet.java

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import java.util.logging.Level;
5151
import java.util.logging.Logger;
5252

53+
import javax.annotation.CheckReturnValue;
5354
import javax.annotation.Nullable;
5455
import javax.annotation.concurrent.GuardedBy;
5556
import javax.annotation.concurrent.ThreadSafe;
@@ -173,25 +174,32 @@ final ClientTransport obtainActiveTransport() {
173174
if (savedTransport != null) {
174175
return savedTransport;
175176
}
177+
Runnable runnable;
176178
synchronized (lock) {
177179
// Check again, since it could have changed before acquiring the lock
178-
if (activeTransport == null) {
179-
if (shutdown) {
180-
return SHUTDOWN_TRANSPORT;
181-
}
182-
// Transition to CONNECTING
183-
DelayedClientTransport delayedTransport = new DelayedClientTransport(appExecutor);
184-
transports.add(delayedTransport);
185-
delayedTransport.start(new BaseTransportListener(delayedTransport));
186-
activeTransport = delayedTransport;
187-
startNewTransport(delayedTransport);
180+
savedTransport = activeTransport;
181+
if (savedTransport != null) {
182+
return savedTransport;
183+
}
184+
if (shutdown) {
185+
return SHUTDOWN_TRANSPORT;
188186
}
189-
return activeTransport;
187+
// Transition to CONNECTING
188+
DelayedClientTransport delayedTransport = new DelayedClientTransport(appExecutor);
189+
transports.add(delayedTransport);
190+
delayedTransport.start(new BaseTransportListener(delayedTransport));
191+
savedTransport = activeTransport = delayedTransport;
192+
runnable = startNewTransport(delayedTransport);
190193
}
194+
if (runnable != null) {
195+
runnable.run();
196+
}
197+
return savedTransport;
191198
}
192199

200+
@CheckReturnValue
193201
@GuardedBy("lock")
194-
private void startNewTransport(DelayedClientTransport delayedTransport) {
202+
private Runnable startNewTransport(DelayedClientTransport delayedTransport) {
195203
Preconditions.checkState(reconnectTask == null, "Should have no reconnectTask scheduled");
196204

197205
if (nextAddressIndex == 0) {
@@ -211,7 +219,7 @@ private void startNewTransport(DelayedClientTransport delayedTransport) {
211219
}
212220
pendingTransport = transport;
213221
transports.add(transport);
214-
transport.start(new TransportListener(transport, delayedTransport, address));
222+
return transport.start(new TransportListener(transport, delayedTransport, address));
215223
}
216224

217225
/**
@@ -239,17 +247,21 @@ public void run() {
239247
try {
240248
delayedTransport.endBackoff();
241249
boolean shutdownDelayedTransport = false;
250+
Runnable runnable = null;
242251
synchronized (lock) {
243252
reconnectTask = null;
244253
if (delayedTransport.hasPendingStreams()) {
245254
// Transition directly to CONNECTING
246-
startNewTransport(delayedTransport);
255+
runnable = startNewTransport(delayedTransport);
247256
} else {
248257
// Transition to IDLE (or already SHUTDOWN)
249258
activeTransport = null;
250259
shutdownDelayedTransport = true;
251260
}
252261
}
262+
if (runnable != null) {
263+
runnable.run();
264+
}
253265
if (shutdownDelayedTransport) {
254266
delayedTransport.setTransportSupplier(new Supplier<ClientTransport>() {
255267
@Override
@@ -425,6 +437,7 @@ public void transportShutdown(Status s) {
425437
new Object[] {getLogId(), transport.getLogId(), address, s});
426438
}
427439
super.transportShutdown(s);
440+
Runnable runnable = null;
428441
synchronized (lock) {
429442
if (activeTransport == transport) {
430443
// This is true only if the transport was ready.
@@ -440,10 +453,13 @@ public void transportShutdown(Status s) {
440453
scheduleBackoff(delayedTransport, s);
441454
} else {
442455
// Still CONNECTING
443-
startNewTransport(delayedTransport);
456+
runnable = startNewTransport(delayedTransport);
444457
}
445458
}
446459
}
460+
if (runnable != null) {
461+
runnable.run();
462+
}
447463
loadBalancer.handleTransportShutdown(addressGroup, s);
448464
if (allAddressesFailed) {
449465
callback.onAllAddressesFailed();

netty/src/main/java/io/grpc/netty/NettyClientTransport.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ public ClientStream newStream(MethodDescriptor<?, ?> method, Metadata headers) {
133133
}
134134

135135
@Override
136-
public void start(Listener transportListener) {
136+
public Runnable start(Listener transportListener) {
137137
lifecycleManager = new ClientTransportLifecycleManager(
138138
Preconditions.checkNotNull(transportListener, "listener"));
139139

@@ -191,6 +191,7 @@ public void operationComplete(ChannelFuture future) throws Exception {
191191
Status.INTERNAL.withDescription("Connection closed with unknown cause"));
192192
}
193193
});
194+
return null;
194195
}
195196

196197
@Override

okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,7 @@ void removePendingStream(OkHttpClientStream pendingStream) {
344344
}
345345

346346
@Override
347-
public void start(Listener listener) {
347+
public Runnable start(Listener listener) {
348348
this.listener = Preconditions.checkNotNull(listener, "listener");
349349

350350
if (enableKeepAlive) {
@@ -433,6 +433,7 @@ public void close() {}
433433
}
434434
}
435435
});
436+
return null;
436437
}
437438

438439
@Override

0 commit comments

Comments
 (0)