1717package com .google .cloud .pubsub ;
1818
1919import com .google .api .gax .bundling .FlowController ;
20- import com .google .auth .oauth2 .GoogleCredentials ;
2120import com .google .common .base .Optional ;
2221import com .google .common .collect .ImmutableList ;
2322import com .google .common .primitives .Ints ;
2423import com .google .common .util .concurrent .FutureCallback ;
2524import com .google .common .util .concurrent .Futures ;
2625import com .google .common .util .concurrent .ListenableFuture ;
2726import com .google .common .util .concurrent .SettableFuture ;
28- import com .google .common .util .concurrent .ThreadFactoryBuilder ;
2927import com .google .pubsub .v1 .PublishRequest ;
3028import com .google .pubsub .v1 .PublishResponse ;
3129import com .google .pubsub .v1 .PublisherGrpc ;
3432import io .grpc .Channel ;
3533import io .grpc .Status ;
3634import io .grpc .auth .MoreCallCredentials ;
37- import io .grpc .netty .GrpcSslContexts ;
38- import io .grpc .netty .NegotiationType ;
39- import io .grpc .netty .NettyChannelBuilder ;
4035import java .io .IOException ;
41- import java .util .Collections ;
4236import java .util .Iterator ;
4337import java .util .LinkedList ;
4438import java .util .List ;
45- import java .util .concurrent .Executors ;
4639import java .util .concurrent .ScheduledExecutorService ;
4740import java .util .concurrent .ScheduledFuture ;
4841import java .util .concurrent .TimeUnit ;
@@ -90,7 +83,7 @@ final class PublisherImpl implements Publisher {
9083 private final Duration sendBundleDeadline ;
9184 private ScheduledFuture <?> currentAlarmFuture ;
9285
93- PublisherImpl (String topic , Settings settings ) throws IOException {
86+ PublisherImpl (String topic , PubSubOptions psOptions , Settings settings ) throws IOException {
9487 this .topic = topic ;
9588 maxBundleMessages = settings .getBundlingSettings ().getElementCountThreshold ();
9689 maxBundleBytes = settings .getBundlingSettings ().getRequestByteThreshold ();
@@ -108,36 +101,23 @@ final class PublisherImpl implements Publisher {
108101 messagesBundle = new LinkedList <>();
109102 messagesBundleLock = new ReentrantLock ();
110103 activeAlarm = new AtomicBoolean (false );
111- int numCores = Math .max (1 , Runtime .getRuntime ().availableProcessors ());
112- executor =
113- settings .getExecutor ().isPresent ()
114- ? settings .getExecutor ().get ()
115- : Executors .newScheduledThreadPool (
116- numCores * DEFAULT_MIN_THREAD_POOL_SIZE ,
117- new ThreadFactoryBuilder ()
118- .setDaemon (true )
119- .setNameFormat ("cloud-pubsub-publisher-thread-%d" )
120- .build ());
104+ shutdown = new AtomicBoolean (false );
105+ messagesWaiter = new MessagesWaiter ();
106+
107+ int numCores = Runtime .getRuntime ().availableProcessors ();
108+ executor = psOptions .getExecutorFactory ().get ();
109+
121110 channels = new Channel [numCores ];
122- channelIndex = new AtomicLong (0 );
123111 for (int i = 0 ; i < numCores ; i ++) {
124- channels [i ] =
125- settings .getChannelBuilder ().isPresent ()
126- ? settings .getChannelBuilder ().get ().build ()
127- : NettyChannelBuilder .forAddress (Publisher .Settings .PUBSUB_API_ADDRESS , 443 )
128- .negotiationType (NegotiationType .TLS )
129- .sslContext (GrpcSslContexts .forClient ().ciphers (null ).build ())
130- .executor (executor )
131- .build ();
112+ if (psOptions .getChannelProvider ().needsExecutor ()) {
113+ channels [i ] = psOptions .getChannelProvider ().getChannel (executor );
114+ } else {
115+ channels [i ] = psOptions .getChannelProvider ().getChannel ();
116+ }
132117 }
133- credentials =
134- MoreCallCredentials .from (
135- settings .getUserCredentials ().isPresent ()
136- ? settings .getUserCredentials ().get ()
137- : GoogleCredentials .getApplicationDefault ()
138- .createScoped (Collections .singletonList (Publisher .Settings .PUBSUB_API_SCOPE )));
139- shutdown = new AtomicBoolean (false );
140- messagesWaiter = new MessagesWaiter ();
118+ channelIndex = new AtomicLong (0 );
119+
120+ credentials = MoreCallCredentials .from (psOptions .getCredentials ());
141121 }
142122
143123 @ Override
0 commit comments