Skip to content

Commit 53da588

Browse files
authored
Move multiple-port ServerImpl to NettyServer (grpc#7674)
Change InternalServer to handle multiple addresses and implemented in NettyServer. It makes ServerImpl to have a single transport server, and this single transport server (NettyServer) will bind to all listening addresses during bootstrap. (grpc#7674)
1 parent ccef406 commit 53da588

File tree

20 files changed

+543
-182
lines changed

20 files changed

+543
-182
lines changed

core/src/main/java/io/grpc/inprocess/InProcessServer.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,11 +82,21 @@ public SocketAddress getListenSocketAddress() {
8282
return new InProcessSocketAddress(name);
8383
}
8484

85+
@Override
86+
public List<? extends SocketAddress> getListenSocketAddresses() {
87+
return Collections.singletonList(getListenSocketAddress());
88+
}
89+
8590
@Override
8691
public InternalInstrumented<SocketStats> getListenSocketStats() {
8792
return null;
8893
}
8994

95+
@Override
96+
public List<InternalInstrumented<SocketStats>> getListenSocketStatsList() {
97+
return null;
98+
}
99+
90100
@Override
91101
public void shutdown() {
92102
if (!registry.remove(name, this)) {

core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import io.grpc.internal.ServerImplBuilder.ClientTransportServersBuilder;
3434
import io.grpc.internal.SharedResourcePool;
3535
import java.io.File;
36-
import java.util.Collections;
3736
import java.util.List;
3837
import java.util.UUID;
3938
import java.util.concurrent.ScheduledExecutorService;
@@ -109,7 +108,7 @@ private InProcessServerBuilder(String name) {
109108

110109
final class InProcessClientTransportServersBuilder implements ClientTransportServersBuilder {
111110
@Override
112-
public List<? extends InternalServer> buildClientTransportServers(
111+
public InternalServer buildClientTransportServers(
113112
List<? extends ServerStreamTracer.Factory> streamTracerFactories) {
114113
return buildTransportServers(streamTracerFactories);
115114
}
@@ -187,9 +186,9 @@ public InProcessServerBuilder maxInboundMetadataSize(int bytes) {
187186
return this;
188187
}
189188

190-
List<InProcessServer> buildTransportServers(
189+
InProcessServer buildTransportServers(
191190
List<? extends ServerStreamTracer.Factory> streamTracerFactories) {
192-
return Collections.singletonList(new InProcessServer(this, streamTracerFactories));
191+
return new InProcessServer(this, streamTracerFactories);
193192
}
194193

195194
@Override

core/src/main/java/io/grpc/internal/InternalServer.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,13 @@
2020
import io.grpc.InternalInstrumented;
2121
import java.io.IOException;
2222
import java.net.SocketAddress;
23+
import java.util.List;
2324
import javax.annotation.Nullable;
2425
import javax.annotation.concurrent.ThreadSafe;
2526

2627
/**
27-
* An object that accepts new incoming connections. This would commonly encapsulate a bound socket
28-
* that {@code accept()}s new connections.
28+
* An object that accepts new incoming connections on one or more listening socket addresses.
29+
* This would commonly encapsulate a bound socket that {@code accept()}s new connections.
2930
*/
3031
@ThreadSafe
3132
public interface InternalServer {
@@ -49,13 +50,25 @@ public interface InternalServer {
4950
void shutdown();
5051

5152
/**
52-
* Returns the listening socket address. May change after {@link start(ServerListener)} is
53+
* Returns the first listening socket address. May change after {@link start(ServerListener)} is
5354
* called.
5455
*/
5556
SocketAddress getListenSocketAddress();
5657

5758
/**
58-
* Returns the listen socket stats of this server. May return {@code null}.
59+
* Returns the first listen socket stats of this server. May return {@code null}.
5960
*/
6061
@Nullable InternalInstrumented<SocketStats> getListenSocketStats();
62+
63+
/**
64+
* Returns a list of listening socket addresses. May change after {@link start(ServerListener)}
65+
* is called.
66+
*/
67+
List<? extends SocketAddress> getListenSocketAddresses();
68+
69+
/**
70+
* Returns a list of listen socket stats of this server. May return {@code null}.
71+
*/
72+
@Nullable List<InternalInstrumented<SocketStats>> getListenSocketStatsList();
73+
6174
}

core/src/main/java/io/grpc/internal/ServerImpl.java

Lines changed: 13 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -110,12 +110,11 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
110110
@GuardedBy("lock") private boolean serverShutdownCallbackInvoked;
111111
@GuardedBy("lock") private boolean terminated;
112112
/** Service encapsulating something similar to an accept() socket. */
113-
private final List<? extends InternalServer> transportServers;
113+
private final InternalServer transportServer;
114114
private final Object lock = new Object();
115115
@GuardedBy("lock") private boolean transportServersTerminated;
116116
/** {@code transportServer} and services encapsulating something similar to a TCP connection. */
117117
@GuardedBy("lock") private final Set<ServerTransport> transports = new HashSet<>();
118-
@GuardedBy("lock") private int activeTransportServers;
119118

120119
private final Context rootContext;
121120

@@ -131,20 +130,18 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
131130
* Construct a server.
132131
*
133132
* @param builder builder with configuration for server
134-
* @param transportServers transport servers that will create new incoming transports
133+
* @param transportServer transport servers that will create new incoming transports
135134
* @param rootContext context that callbacks for new RPCs should be derived from
136135
*/
137136
ServerImpl(
138137
ServerImplBuilder builder,
139-
List<? extends InternalServer> transportServers,
138+
InternalServer transportServer,
140139
Context rootContext) {
141140
this.executorPool = Preconditions.checkNotNull(builder.executorPool, "executorPool");
142141
this.registry = Preconditions.checkNotNull(builder.registryBuilder.build(), "registryBuilder");
143142
this.fallbackRegistry =
144143
Preconditions.checkNotNull(builder.fallbackRegistry, "fallbackRegistry");
145-
Preconditions.checkNotNull(transportServers, "transportServers");
146-
Preconditions.checkArgument(!transportServers.isEmpty(), "no servers provided");
147-
this.transportServers = new ArrayList<>(transportServers);
144+
this.transportServer = Preconditions.checkNotNull(transportServer, "transportServer");
148145
this.logId =
149146
InternalLogId.allocate("Server", String.valueOf(getListenSocketsIgnoringLifecycle()));
150147
// Fork from the passed in context so that it does not propagate cancellation, it only
@@ -179,10 +176,7 @@ public ServerImpl start() throws IOException {
179176
// Start and wait for any ports to actually be bound.
180177

181178
ServerListenerImpl listener = new ServerListenerImpl();
182-
for (InternalServer ts : transportServers) {
183-
ts.start(listener);
184-
activeTransportServers++;
185-
}
179+
transportServer.start(listener);
186180
executor = Preconditions.checkNotNull(executorPool.getObject(), "executor");
187181
started = true;
188182
return this;
@@ -195,8 +189,7 @@ public int getPort() {
195189
synchronized (lock) {
196190
checkState(started, "Not started");
197191
checkState(!terminated, "Already terminated");
198-
for (InternalServer ts : transportServers) {
199-
SocketAddress addr = ts.getListenSocketAddress();
192+
for (SocketAddress addr: transportServer.getListenSocketAddresses()) {
200193
if (addr instanceof InetSocketAddress) {
201194
return ((InetSocketAddress) addr).getPort();
202195
}
@@ -216,11 +209,7 @@ public List<SocketAddress> getListenSockets() {
216209

217210
private List<SocketAddress> getListenSocketsIgnoringLifecycle() {
218211
synchronized (lock) {
219-
List<SocketAddress> addrs = new ArrayList<>(transportServers.size());
220-
for (InternalServer ts : transportServers) {
221-
addrs.add(ts.getListenSocketAddress());
222-
}
223-
return Collections.unmodifiableList(addrs);
212+
return Collections.unmodifiableList(transportServer.getListenSocketAddresses());
224213
}
225214
}
226215

@@ -268,9 +257,7 @@ public ServerImpl shutdown() {
268257
}
269258
}
270259
if (shutdownTransportServers) {
271-
for (InternalServer ts : transportServers) {
272-
ts.shutdown();
273-
}
260+
transportServer.shutdown();
274261
}
275262
return this;
276263
}
@@ -388,8 +375,7 @@ public void serverShutdown() {
388375
ArrayList<ServerTransport> copiedTransports;
389376
Status shutdownNowStatusCopy;
390377
synchronized (lock) {
391-
activeTransportServers--;
392-
if (activeTransportServers != 0) {
378+
if (serverShutdownCallbackInvoked) {
393379
return;
394380
}
395381

@@ -662,12 +648,9 @@ public InternalLogId getLogId() {
662648
@Override
663649
public ListenableFuture<ServerStats> getStats() {
664650
ServerStats.Builder builder = new ServerStats.Builder();
665-
for (InternalServer ts : transportServers) {
666-
// TODO(carl-mastrangelo): remove the list and just add directly.
667-
InternalInstrumented<SocketStats> stats = ts.getListenSocketStats();
668-
if (stats != null ) {
669-
builder.addListenSockets(Collections.singletonList(stats));
670-
}
651+
List<InternalInstrumented<SocketStats>> stats = transportServer.getListenSocketStatsList();
652+
if (stats != null ) {
653+
builder.addListenSockets(stats);
671654
}
672655
serverCallTracer.updateBuilder(builder);
673656
SettableFuture<ServerStats> ret = SettableFuture.create();
@@ -679,7 +662,7 @@ public ListenableFuture<ServerStats> getStats() {
679662
public String toString() {
680663
return MoreObjects.toStringHelper(this)
681664
.add("logId", logId.getId())
682-
.add("transportServers", transportServers)
665+
.add("transportServer", transportServer)
683666
.toString();
684667
}
685668

core/src/main/java/io/grpc/internal/ServerImplBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public static ServerBuilder<?> forPort(int port) {
9797
* is meant for Transport implementors and should not be used by normal users.
9898
*/
9999
public interface ClientTransportServersBuilder {
100-
List<? extends InternalServer> buildClientTransportServers(
100+
InternalServer buildClientTransportServers(
101101
List<? extends ServerStreamTracer.Factory> streamTracerFactories);
102102
}
103103

core/src/test/java/io/grpc/inprocess/InProcessServerBuilderTest.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import static org.junit.Assert.assertNotNull;
2323
import static org.junit.Assert.assertSame;
2424

25-
import com.google.common.collect.Iterables;
2625
import io.grpc.ServerStreamTracer;
2726
import io.grpc.internal.FakeClock;
2827
import io.grpc.internal.ObjectPool;
@@ -55,8 +54,8 @@ public void generateName() {
5554
@Test
5655
public void scheduledExecutorService_default() {
5756
InProcessServerBuilder builder = InProcessServerBuilder.forName("foo");
58-
InProcessServer server = Iterables.getOnlyElement(
59-
builder.buildTransportServers(new ArrayList<ServerStreamTracer.Factory>()));
57+
InProcessServer server =
58+
builder.buildTransportServers(new ArrayList<ServerStreamTracer.Factory>());
6059

6160
ObjectPool<ScheduledExecutorService> scheduledExecutorServicePool =
6261
server.getScheduledExecutorServicePool();
@@ -80,8 +79,8 @@ public void scheduledExecutorService_custom() {
8079
InProcessServerBuilder builder1 = builder.scheduledExecutorService(scheduledExecutorService);
8180
assertSame(builder, builder1);
8281

83-
InProcessServer server = Iterables.getOnlyElement(
84-
builder1.buildTransportServers(new ArrayList<ServerStreamTracer.Factory>()));
82+
InProcessServer server =
83+
builder1.buildTransportServers(new ArrayList<ServerStreamTracer.Factory>());
8584
ObjectPool<ScheduledExecutorService> scheduledExecutorServicePool =
8685
server.getScheduledExecutorServicePool();
8786

core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import static org.junit.Assert.assertEquals;
2020
import static org.junit.Assert.fail;
2121

22-
import com.google.common.collect.ImmutableList;
2322
import io.grpc.CallOptions;
2423
import io.grpc.ManagedChannel;
2524
import io.grpc.Metadata;
@@ -55,16 +54,16 @@ public class InProcessTransportTest extends AbstractTransportTest {
5554
public final GrpcCleanupRule grpcCleanupRule = new GrpcCleanupRule();
5655

5756
@Override
58-
protected List<? extends InternalServer> newServer(
57+
protected InternalServer newServer(
5958
List<ServerStreamTracer.Factory> streamTracerFactories) {
6059
InProcessServerBuilder builder = InProcessServerBuilder
6160
.forName(TRANSPORT_NAME)
6261
.maxInboundMetadataSize(GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE);
63-
return ImmutableList.of(new InProcessServer(builder, streamTracerFactories));
62+
return new InProcessServer(builder, streamTracerFactories);
6463
}
6564

6665
@Override
67-
protected List<? extends InternalServer> newServer(
66+
protected InternalServer newServer(
6867
int port, List<ServerStreamTracer.Factory> streamTracerFactories) {
6968
return newServer(streamTracerFactories);
7069
}

core/src/test/java/io/grpc/inprocess/StandaloneInProcessTransportTest.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package io.grpc.inprocess;
1818

19-
import com.google.common.collect.ImmutableList;
2019
import io.grpc.InternalChannelz.SocketStats;
2120
import io.grpc.InternalInstrumented;
2221
import io.grpc.ServerStreamTracer;
@@ -31,6 +30,7 @@
3130
import io.grpc.internal.SharedResourcePool;
3231
import java.io.IOException;
3332
import java.net.SocketAddress;
33+
import java.util.Collections;
3434
import java.util.List;
3535
import java.util.concurrent.ScheduledExecutorService;
3636
import javax.annotation.Nullable;
@@ -52,13 +52,13 @@ public final class StandaloneInProcessTransportTest extends AbstractTransportTes
5252
private TestServer currentServer;
5353

5454
@Override
55-
protected List<? extends InternalServer> newServer(
55+
protected InternalServer newServer(
5656
List<ServerStreamTracer.Factory> streamTracerFactories) {
57-
return ImmutableList.of(new TestServer(streamTracerFactories));
57+
return new TestServer(streamTracerFactories);
5858
}
5959

6060
@Override
61-
protected List<? extends InternalServer> newServer(
61+
protected InternalServer newServer(
6262
int port, List<ServerStreamTracer.Factory> streamTracerFactories) {
6363
return newServer(streamTracerFactories);
6464
}
@@ -126,11 +126,22 @@ public SocketAddress getListenSocketAddress() {
126126
return new SocketAddress() {};
127127
}
128128

129+
@Override
130+
public List<SocketAddress> getListenSocketAddresses() {
131+
return Collections.singletonList(getListenSocketAddress());
132+
}
133+
129134
@Override
130135
@Nullable
131136
public InternalInstrumented<SocketStats> getListenSocketStats() {
132137
return null;
133138
}
139+
140+
@Override
141+
@Nullable
142+
public List<InternalInstrumented<SocketStats>> getListenSocketStatsList() {
143+
return null;
144+
}
134145
}
135146

136147
/** Wraps the server listener to ensure we don't accept new transports after shutdown. */

core/src/test/java/io/grpc/internal/AbstractTransportTest.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
import static org.mockito.Mockito.verify;
4141

4242
import com.google.common.base.Objects;
43-
import com.google.common.collect.Iterables;
4443
import com.google.common.collect.Lists;
4544
import com.google.common.io.ByteStreams;
4645
import com.google.common.util.concurrent.MoreExecutors;
@@ -118,13 +117,13 @@ public long currentTimeNanos() {
118117
* Returns a new server that when started will be able to be connected to from the client. Each
119118
* returned instance should be new and yet be accessible by new client transports.
120119
*/
121-
protected abstract List<? extends InternalServer> newServer(
120+
protected abstract InternalServer newServer(
122121
List<ServerStreamTracer.Factory> streamTracerFactories);
123122

124123
/**
125124
* Builds a new server that is listening on the same port as the given server instance does.
126125
*/
127-
protected abstract List<? extends InternalServer> newServer(
126+
protected abstract InternalServer newServer(
128127
int port, List<ServerStreamTracer.Factory> streamTracerFactories);
129128

130129
/**
@@ -230,7 +229,7 @@ public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata
230229

231230
@Before
232231
public void setUp() {
233-
server = Iterables.getOnlyElement(newServer(Arrays.asList(serverStreamTracerFactory)));
232+
server = newServer(Arrays.asList(serverStreamTracerFactory));
234233
callOptions = CallOptions.DEFAULT.withStreamTracerFactory(clientStreamTracerFactory);
235234
}
236235

@@ -401,8 +400,7 @@ public void serverAlreadyListening() throws Exception {
401400
if (addr instanceof InetSocketAddress) {
402401
port = ((InetSocketAddress) addr).getPort();
403402
}
404-
InternalServer server2 =
405-
Iterables.getOnlyElement(newServer(port, Arrays.asList(serverStreamTracerFactory)));
403+
InternalServer server2 = newServer(port, Arrays.asList(serverStreamTracerFactory));
406404
thrown.expect(IOException.class);
407405
server2.start(new MockServerListener());
408406
}
@@ -421,7 +419,7 @@ public void serverStartInterrupted() throws Exception {
421419
assumeTrue("transport is not using InetSocketAddress", port != -1);
422420
server.shutdown();
423421

424-
server = Iterables.getOnlyElement(newServer(port, Arrays.asList(serverStreamTracerFactory)));
422+
server = newServer(port, Arrays.asList(serverStreamTracerFactory));
425423
boolean success;
426424
Thread.currentThread().interrupt();
427425
try {
@@ -473,7 +471,7 @@ public void openStreamPreventsTermination() throws Exception {
473471
// resources. There may be cases this is impossible in the future, but for now it is a useful
474472
// property.
475473
serverListener = new MockServerListener();
476-
server = Iterables.getOnlyElement(newServer(port, Arrays.asList(serverStreamTracerFactory)));
474+
server = newServer(port, Arrays.asList(serverStreamTracerFactory));
477475
server.start(serverListener);
478476

479477
// Try to "flush" out any listener notifications on client and server. This also ensures that

0 commit comments

Comments
 (0)