2222import com .google .api .stats .Distribution ;
2323import com .google .auth .Credentials ;
2424import com .google .cloud .Clock ;
25- import com .google .cloud .pubsub .spi .v1 .MessagesProcessor . AcksProcessor ;
26- import com .google .cloud .pubsub .spi .v1 .MessagesProcessor .PendingModifyAckDeadline ;
25+ import com .google .cloud .pubsub .spi .v1 .MessageDispatcher . AckProcessor ;
26+ import com .google .cloud .pubsub .spi .v1 .MessageDispatcher .PendingModifyAckDeadline ;
2727import com .google .common .collect .Lists ;
2828import com .google .common .util .concurrent .AbstractService ;
2929import com .google .common .util .concurrent .FutureCallback ;
4949import org .slf4j .LoggerFactory ;
5050
5151/** Implementation of {@link AbstractSubscriberConnection} based on Cloud Pub/Sub streaming pull. */
52- final class StreamingSubscriberConnection extends AbstractService implements AcksProcessor {
52+ final class StreamingSubscriberConnection extends AbstractService implements AckProcessor {
5353 private static final Logger logger = LoggerFactory .getLogger (StreamingSubscriberConnection .class );
5454
5555 private static final Duration INITIAL_CHANNEL_RECONNECT_BACKOFF = new Duration (100 ); // 100ms
@@ -62,7 +62,7 @@ final class StreamingSubscriberConnection extends AbstractService implements Ack
6262
6363 private final String subscription ;
6464 private final ScheduledExecutorService executor ;
65- private final MessagesProcessor messagesProcessor ;
65+ private final MessageDispatcher messageDispatcher ;
6666 private ClientCallStreamObserver <StreamingPullRequest > requestObserver ;
6767
6868 public StreamingSubscriberConnection (
@@ -80,16 +80,16 @@ public StreamingSubscriberConnection(
8080 this .executor = executor ;
8181 this .credentials = credentials ;
8282 this .channel = channel ;
83- this .messagesProcessor =
84- new MessagesProcessor (
83+ this .messageDispatcher =
84+ new MessageDispatcher (
8585 receiver ,
8686 this ,
8787 ackExpirationPadding ,
8888 ackLatencyDistribution ,
8989 flowController ,
9090 executor ,
9191 clock );
92- messagesProcessor .setMessageDeadlineSeconds (streamAckDeadlineSeconds );
92+ messageDispatcher .setMessageDeadlineSeconds (streamAckDeadlineSeconds );
9393 }
9494
9595 @ Override
@@ -101,7 +101,7 @@ protected void doStart() {
101101
102102 @ Override
103103 protected void doStop () {
104- messagesProcessor .stop ();
104+ messageDispatcher .stop ();
105105 notifyStopped ();
106106 requestObserver .onError (Status .CANCELLED .asException ());
107107 }
@@ -123,7 +123,7 @@ public void beforeStart(ClientCallStreamObserver<StreamingPullRequest> requestOb
123123
124124 @ Override
125125 public void onNext (StreamingPullResponse response ) {
126- messagesProcessor .processReceivedMessages (response .getReceivedMessagesList ());
126+ messageDispatcher .processReceivedMessages (response .getReceivedMessagesList ());
127127 // Only if not shutdown we will request one more bundles of messages to be delivered.
128128 if (isAlive ()) {
129129 requestObserver .request (1 );
@@ -157,11 +157,11 @@ private void initialize() {
157157 logger .debug (
158158 "Initializing stream to subscription {} with deadline {}" ,
159159 subscription ,
160- messagesProcessor .getMessageDeadlineSeconds ());
160+ messageDispatcher .getMessageDeadlineSeconds ());
161161 requestObserver .onNext (
162162 StreamingPullRequest .newBuilder ()
163163 .setSubscription (subscription )
164- .setStreamAckDeadlineSeconds (messagesProcessor .getMessageDeadlineSeconds ())
164+ .setStreamAckDeadlineSeconds (messageDispatcher .getMessageDeadlineSeconds ())
165165 .build ());
166166 requestObserver .request (1 );
167167
@@ -240,7 +240,7 @@ public void sendAckOperations(
240240 }
241241
242242 public void updateStreamAckDeadline (int newAckDeadlineSeconds ) {
243- messagesProcessor .setMessageDeadlineSeconds (newAckDeadlineSeconds );
243+ messageDispatcher .setMessageDeadlineSeconds (newAckDeadlineSeconds );
244244 requestObserver .onNext (
245245 StreamingPullRequest .newBuilder ()
246246 .setStreamAckDeadlineSeconds (newAckDeadlineSeconds )
0 commit comments