Skip to content

Commit 73074a5

Browse files
committed
split options
1 parent 2beb34e commit 73074a5

5 files changed

Lines changed: 96 additions & 96 deletions

File tree

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ public Future<AsyncPage<Topic>> listTopicsAsync(ListOption... options) {
289289
public Publisher newPublisher(String topic, Publisher.Settings settings) throws PubSubException {
290290
String topicName = PublisherClient.formatTopicName(getOptions().getProjectId(), topic);
291291
try {
292-
return new PublisherImpl(topicName, settings);
292+
return new PublisherImpl(topicName, getOptions(), settings);
293293
} catch (IOException e) {
294294
throw new PubSubException(e, false);
295295
}

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubOptions.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,13 @@
1616

1717
package com.google.cloud.pubsub;
1818

19+
import com.google.api.gax.grpc.ChannelProvider;
1920
import com.google.cloud.GrpcServiceOptions;
2021
import com.google.cloud.pubsub.spi.DefaultPubSubRpc;
2122
import com.google.cloud.pubsub.spi.PubSubRpc;
2223
import com.google.cloud.pubsub.spi.PubSubRpcFactory;
2324
import com.google.cloud.pubsub.spi.v1.PublisherSettings;
2425
import com.google.common.collect.ImmutableSet;
25-
2626
import java.io.IOException;
2727
import java.util.Set;
2828
import java.util.concurrent.ScheduledExecutorService;
@@ -103,6 +103,11 @@ protected ExecutorFactory<ScheduledExecutorService> getExecutorFactory() {
103103
return super.getExecutorFactory();
104104
}
105105

106+
@Override
107+
protected ChannelProvider getChannelProvider() {
108+
return super.getChannelProvider();
109+
}
110+
106111
@Override
107112
protected PubSubFactory getDefaultServiceFactory() {
108113
return DefaultPubSubFactory.INSTANCE;

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/Publisher.java

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,12 @@
1818

1919
import com.google.api.gax.bundling.FlowController;
2020
import com.google.api.gax.grpc.BundlingSettings;
21-
import com.google.auth.Credentials;
2221
import com.google.auth.oauth2.GoogleCredentials;
2322
import com.google.auto.value.AutoValue;
2423
import com.google.common.base.Optional;
2524
import com.google.common.base.Preconditions;
2625
import com.google.common.util.concurrent.ListenableFuture;
2726
import com.google.pubsub.v1.PubsubMessage;
28-
import io.grpc.ManagedChannelBuilder;
29-
import java.util.concurrent.ScheduledExecutorService;
3027
import org.joda.time.Duration;
3128

3229
/**
@@ -176,23 +173,12 @@ public abstract class Settings {
176173

177174
abstract Duration getRequestTimeout();
178175

179-
abstract Optional<Credentials> getUserCredentials();
180-
181-
abstract Optional<ManagedChannelBuilder<? extends ManagedChannelBuilder<?>>>
182-
getChannelBuilder();
183-
184-
abstract Optional<ScheduledExecutorService> getExecutor();
185-
186176
public static Builder newBuilder() {
187177
return new AutoValue_Publisher_Settings.Builder()
188178
.setFlowControlSettings(FlowController.Settings.DEFAULT)
189179
.setFailOnFlowControlLimits(false)
190180
.setSendBundleDeadline(MIN_SEND_BUNDLE_DURATION)
191181
.setRequestTimeout(DEFAULT_REQUEST_TIMEOUT)
192-
.setUserCredentials(Optional.<Credentials>absent())
193-
.setChannelBuilder(
194-
Optional.<ManagedChannelBuilder<? extends ManagedChannelBuilder<?>>>absent())
195-
.setExecutor(Optional.<ScheduledExecutorService>absent())
196182
.setBundlingSettings(DEFAULT_BUNDLING_SETTINGS);
197183
}
198184

@@ -208,26 +194,6 @@ abstract static class Builder {
208194

209195
abstract Builder setRequestTimeout(Duration value);
210196

211-
abstract Builder setUserCredentials(Optional<Credentials> value);
212-
213-
Builder setUserCredentials(Credentials value) {
214-
return setUserCredentials(Optional.of(value));
215-
}
216-
217-
abstract Builder setChannelBuilder(
218-
Optional<ManagedChannelBuilder<? extends ManagedChannelBuilder<?>>> value);
219-
220-
Builder setChannelBuilder(ManagedChannelBuilder<? extends ManagedChannelBuilder<?>> value) {
221-
return setChannelBuilder(
222-
Optional.<ManagedChannelBuilder<? extends ManagedChannelBuilder<?>>>of(value));
223-
}
224-
225-
abstract Builder setExecutor(Optional<ScheduledExecutorService> value);
226-
227-
Builder setExecutor(ScheduledExecutorService value) {
228-
return setExecutor(Optional.of(value));
229-
}
230-
231197
abstract Settings autoBuild();
232198

233199
public Settings build() {

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PublisherImpl.java

Lines changed: 15 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,13 @@
1717
package com.google.cloud.pubsub;
1818

1919
import com.google.api.gax.bundling.FlowController;
20-
import com.google.auth.oauth2.GoogleCredentials;
2120
import com.google.common.base.Optional;
2221
import com.google.common.collect.ImmutableList;
2322
import com.google.common.primitives.Ints;
2423
import com.google.common.util.concurrent.FutureCallback;
2524
import com.google.common.util.concurrent.Futures;
2625
import com.google.common.util.concurrent.ListenableFuture;
2726
import com.google.common.util.concurrent.SettableFuture;
28-
import com.google.common.util.concurrent.ThreadFactoryBuilder;
2927
import com.google.pubsub.v1.PublishRequest;
3028
import com.google.pubsub.v1.PublishResponse;
3129
import com.google.pubsub.v1.PublisherGrpc;
@@ -34,15 +32,10 @@
3432
import io.grpc.Channel;
3533
import io.grpc.Status;
3634
import io.grpc.auth.MoreCallCredentials;
37-
import io.grpc.netty.GrpcSslContexts;
38-
import io.grpc.netty.NegotiationType;
39-
import io.grpc.netty.NettyChannelBuilder;
4035
import java.io.IOException;
41-
import java.util.Collections;
4236
import java.util.Iterator;
4337
import java.util.LinkedList;
4438
import java.util.List;
45-
import java.util.concurrent.Executors;
4639
import java.util.concurrent.ScheduledExecutorService;
4740
import java.util.concurrent.ScheduledFuture;
4841
import java.util.concurrent.TimeUnit;
@@ -90,7 +83,7 @@ final class PublisherImpl implements Publisher {
9083
private final Duration sendBundleDeadline;
9184
private ScheduledFuture<?> currentAlarmFuture;
9285

93-
PublisherImpl(String topic, Settings settings) throws IOException {
86+
PublisherImpl(String topic, PubSubOptions psOptions, Settings settings) throws IOException {
9487
this.topic = topic;
9588
maxBundleMessages = settings.getBundlingSettings().getElementCountThreshold();
9689
maxBundleBytes = settings.getBundlingSettings().getRequestByteThreshold();
@@ -108,36 +101,23 @@ final class PublisherImpl implements Publisher {
108101
messagesBundle = new LinkedList<>();
109102
messagesBundleLock = new ReentrantLock();
110103
activeAlarm = new AtomicBoolean(false);
111-
int numCores = Math.max(1, Runtime.getRuntime().availableProcessors());
112-
executor =
113-
settings.getExecutor().isPresent()
114-
? settings.getExecutor().get()
115-
: Executors.newScheduledThreadPool(
116-
numCores * DEFAULT_MIN_THREAD_POOL_SIZE,
117-
new ThreadFactoryBuilder()
118-
.setDaemon(true)
119-
.setNameFormat("cloud-pubsub-publisher-thread-%d")
120-
.build());
104+
shutdown = new AtomicBoolean(false);
105+
messagesWaiter = new MessagesWaiter();
106+
107+
int numCores = Runtime.getRuntime().availableProcessors();
108+
executor = psOptions.getExecutorFactory().get();
109+
121110
channels = new Channel[numCores];
122-
channelIndex = new AtomicLong(0);
123111
for (int i = 0; i < numCores; i++) {
124-
channels[i] =
125-
settings.getChannelBuilder().isPresent()
126-
? settings.getChannelBuilder().get().build()
127-
: NettyChannelBuilder.forAddress(Publisher.Settings.PUBSUB_API_ADDRESS, 443)
128-
.negotiationType(NegotiationType.TLS)
129-
.sslContext(GrpcSslContexts.forClient().ciphers(null).build())
130-
.executor(executor)
131-
.build();
112+
if (psOptions.getChannelProvider().needsExecutor()) {
113+
channels[i] = psOptions.getChannelProvider().getChannel(executor);
114+
} else {
115+
channels[i] = psOptions.getChannelProvider().getChannel();
116+
}
132117
}
133-
credentials =
134-
MoreCallCredentials.from(
135-
settings.getUserCredentials().isPresent()
136-
? settings.getUserCredentials().get()
137-
: GoogleCredentials.getApplicationDefault()
138-
.createScoped(Collections.singletonList(Publisher.Settings.PUBSUB_API_SCOPE)));
139-
shutdown = new AtomicBoolean(false);
140-
messagesWaiter = new MessagesWaiter();
118+
channelIndex = new AtomicLong(0);
119+
120+
credentials = MoreCallCredentials.from(psOptions.getCredentials());
141121
}
142122

143123
@Override

0 commit comments

Comments
 (0)