|
16 | 16 |
|
17 | 17 | package com.google.cloud.pubsub.spi; |
18 | 18 |
|
| 19 | +import com.google.api.gax.core.ForwardingRpcFuture; |
| 20 | +import com.google.api.gax.core.Function; |
| 21 | +import com.google.api.gax.core.RpcFuture; |
| 22 | +import com.google.api.gax.core.RpcFutureCallback; |
19 | 23 | import com.google.api.gax.grpc.ApiException; |
20 | 24 | import com.google.api.gax.grpc.ChannelProvider; |
21 | 25 | import com.google.api.gax.grpc.ExecutorProvider; |
22 | 26 | import com.google.api.gax.grpc.FixedChannelProvider; |
23 | 27 | import com.google.api.gax.grpc.FixedExecutorProvider; |
24 | 28 | import com.google.api.gax.grpc.ProviderManager; |
25 | | -import com.google.api.gax.grpc.RpcFuture; |
26 | | -import com.google.api.gax.grpc.RpcFutureCallback; |
27 | 29 | import com.google.api.gax.grpc.UnaryCallSettings; |
28 | 30 | import com.google.cloud.GrpcServiceOptions.ExecutorFactory; |
29 | 31 | import com.google.cloud.NoCredentials; |
|
65 | 67 | import io.grpc.netty.NettyChannelBuilder; |
66 | 68 | import java.io.IOException; |
67 | 69 | import java.util.Set; |
68 | | -import java.util.concurrent.ExecutionException; |
69 | 70 | import java.util.concurrent.Future; |
70 | 71 | import java.util.concurrent.ScheduledExecutorService; |
71 | | -import java.util.concurrent.TimeUnit; |
72 | | -import java.util.concurrent.TimeoutException; |
73 | 72 | import org.joda.time.Duration; |
74 | 73 |
|
75 | 74 | public class DefaultPubSubRpc implements PubSubRpc { |
@@ -107,48 +106,19 @@ protected ChannelProvider getChannelProvider() { |
107 | 106 | } |
108 | 107 | } |
109 | 108 |
|
110 | | - private static final class PullFutureImpl |
| 109 | + private static final class PullFutureImpl extends ForwardingRpcFuture<PullResponse> |
111 | 110 | implements PullFuture { |
112 | | - |
113 | | - private final RpcFuture<PullResponse> delegate; |
114 | | - |
115 | 111 | PullFutureImpl(RpcFuture<PullResponse> delegate) { |
116 | | - this.delegate = delegate; |
117 | | - } |
118 | | - |
119 | | - @Override |
120 | | - public boolean cancel(boolean mayInterruptIfRunning) { |
121 | | - return delegate.cancel(mayInterruptIfRunning); |
122 | | - } |
123 | | - |
124 | | - @Override |
125 | | - public PullResponse get() throws InterruptedException, ExecutionException { |
126 | | - return delegate.get(); |
127 | | - } |
128 | | - |
129 | | - @Override |
130 | | - public PullResponse get(long timeout, TimeUnit unit) |
131 | | - throws InterruptedException, ExecutionException, TimeoutException { |
132 | | - return delegate.get(timeout, unit); |
133 | | - } |
134 | | - |
135 | | - @Override |
136 | | - public boolean isCancelled() { |
137 | | - return delegate.isCancelled(); |
138 | | - } |
139 | | - |
140 | | - @Override |
141 | | - public boolean isDone() { |
142 | | - return delegate.isDone(); |
| 112 | + super(delegate); |
143 | 113 | } |
144 | 114 |
|
145 | 115 | @Override |
146 | 116 | public void addCallback(final PullCallback callback) { |
147 | | - delegate.addCallback( |
| 117 | + addCallback( |
148 | 118 | new RpcFutureCallback<PullResponse>() { |
149 | 119 | @Override |
150 | | - public void onSuccess(PullResponse result) { |
151 | | - callback.success(result); |
| 120 | + public void onSuccess(PullResponse response) { |
| 121 | + callback.success(response); |
152 | 122 | } |
153 | 123 |
|
154 | 124 | @Override |
@@ -211,7 +181,7 @@ private static <V> RpcFuture<V> translate( |
211 | 181 | } |
212 | 182 | return from.catching( |
213 | 183 | ApiException.class, |
214 | | - new RpcFuture.Function<ApiException, V>() { |
| 184 | + new Function<ApiException, V>() { |
215 | 185 | @Override |
216 | 186 | public V apply(ApiException exception) { |
217 | 187 | if (returnNullOnSet.contains(exception.getStatusCode().value())) { |
|
0 commit comments