Skip to content

Commit 4b1f0e9

Browse files
authored
Use Futures.transform instead of lazyTransform when possible (#1168)
1 parent d7c51de commit 4b1f0e9

2 files changed

Lines changed: 50 additions & 33 deletions

File tree

gcloud-java-logging/src/main/java/com/google/cloud/logging/LoggingImpl.java

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import static com.google.cloud.logging.Logging.WriteOption.OptionType.LABELS;
2525
import static com.google.cloud.logging.Logging.WriteOption.OptionType.LOG_NAME;
2626
import static com.google.cloud.logging.Logging.WriteOption.OptionType.RESOURCE;
27-
import static com.google.common.util.concurrent.Futures.lazyTransform;
2827

2928
import com.google.cloud.AsyncPage;
3029
import com.google.cloud.AsyncPageImpl;
@@ -43,6 +42,8 @@
4342
import com.google.common.collect.Iterables;
4443
import com.google.common.collect.Lists;
4544
import com.google.common.collect.Maps;
45+
import com.google.common.util.concurrent.Futures;
46+
import com.google.common.util.concurrent.ListenableFuture;
4647
import com.google.common.util.concurrent.Uninterruptibles;
4748
import com.google.logging.v2.CreateLogMetricRequest;
4849
import com.google.logging.v2.CreateSinkRequest;
@@ -103,6 +104,14 @@ private static <V> V get(Future<V> future) {
103104
}
104105
}
105106

107+
private static <I, O> Future<O> transform(Future<I> future,
108+
Function<? super I, ? extends O> function) {
109+
if (future instanceof ListenableFuture) {
110+
return Futures.transform((ListenableFuture<I>) future, function);
111+
}
112+
return Futures.lazyTransform(future, function);
113+
}
114+
106115
private abstract static class BasePageFetcher<T> implements AsyncPageImpl.NextPageFetcher<T> {
107116

108117
private static final long serialVersionUID = 5095123855547444030L;
@@ -198,7 +207,7 @@ public Future<Sink> createAsync(SinkInfo sink) {
198207
.setParent(ConfigServiceV2Api.formatParentName(options().projectId()))
199208
.setSink(sink.toPb(options().projectId()))
200209
.build();
201-
return lazyTransform(rpc.create(request), Sink.fromPbFunction(this));
210+
return transform(rpc.create(request), Sink.fromPbFunction(this));
202211
}
203212

204213
@Override
@@ -212,7 +221,7 @@ public Future<Sink> updateAsync(SinkInfo sink) {
212221
.setSinkName(ConfigServiceV2Api.formatSinkName(options().projectId(), sink.name()))
213222
.setSink(sink.toPb(options().projectId()))
214223
.build();
215-
return lazyTransform(rpc.update(request), Sink.fromPbFunction(this));
224+
return transform(rpc.update(request), Sink.fromPbFunction(this));
216225
}
217226

218227
@Override
@@ -225,7 +234,7 @@ public Future<Sink> getSinkAsync(String sink) {
225234
GetSinkRequest request = GetSinkRequest.newBuilder()
226235
.setSinkName(ConfigServiceV2Api.formatSinkName(options().projectId(), sink))
227236
.build();
228-
return lazyTransform(rpc.get(request), Sink.fromPbFunction(this));
237+
return transform(rpc.get(request), Sink.fromPbFunction(this));
229238
}
230239

231240
private static ListSinksRequest listSinksRequest(LoggingOptions serviceOptions,
@@ -247,7 +256,7 @@ private static Future<AsyncPage<Sink>> listSinksAsync(final LoggingOptions servi
247256
final Map<Option.OptionType, ?> options) {
248257
final ListSinksRequest request = listSinksRequest(serviceOptions, options);
249258
Future<ListSinksResponse> list = serviceOptions.rpc().list(request);
250-
return lazyTransform(list, new Function<ListSinksResponse, AsyncPage<Sink>>() {
259+
return transform(list, new Function<ListSinksResponse, AsyncPage<Sink>>() {
251260
@Override
252261
public AsyncPage<Sink> apply(ListSinksResponse listSinksResponse) {
253262
List<Sink> sinks = listSinksResponse.getSinksList() == null ? ImmutableList.<Sink>of()
@@ -281,7 +290,7 @@ public Future<Boolean> deleteSinkAsync(String sink) {
281290
DeleteSinkRequest request = DeleteSinkRequest.newBuilder()
282291
.setSinkName(ConfigServiceV2Api.formatSinkName(options().projectId(), sink))
283292
.build();
284-
return lazyTransform(rpc.delete(request), EMPTY_TO_BOOLEAN_FUNCTION);
293+
return transform(rpc.delete(request), EMPTY_TO_BOOLEAN_FUNCTION);
285294
}
286295

287296
public boolean deleteLog(String log) {
@@ -292,7 +301,7 @@ public Future<Boolean> deleteLogAsync(String log) {
292301
DeleteLogRequest request = DeleteLogRequest.newBuilder()
293302
.setLogName(LoggingServiceV2Api.formatLogName(options().projectId(), log))
294303
.build();
295-
return lazyTransform(rpc.delete(request), EMPTY_TO_BOOLEAN_FUNCTION);
304+
return transform(rpc.delete(request), EMPTY_TO_BOOLEAN_FUNCTION);
296305
}
297306

298307
private static ListMonitoredResourceDescriptorsRequest listMonitoredResourceDescriptorsRequest(
@@ -316,7 +325,7 @@ private static ListMonitoredResourceDescriptorsRequest listMonitoredResourceDesc
316325
final ListMonitoredResourceDescriptorsRequest request =
317326
listMonitoredResourceDescriptorsRequest(options);
318327
Future<ListMonitoredResourceDescriptorsResponse> list = serviceOptions.rpc().list(request);
319-
return lazyTransform(list, new Function<ListMonitoredResourceDescriptorsResponse,
328+
return transform(list, new Function<ListMonitoredResourceDescriptorsResponse,
320329
AsyncPage<MonitoredResourceDescriptor>>() {
321330
@Override
322331
public AsyncPage<MonitoredResourceDescriptor> apply(
@@ -355,7 +364,7 @@ public Future<Metric> createAsync(MetricInfo metric) {
355364
.setParent(MetricsServiceV2Api.formatParentName(options().projectId()))
356365
.setMetric(metric.toPb())
357366
.build();
358-
return lazyTransform(rpc.create(request), Metric.fromPbFunction(this));
367+
return transform(rpc.create(request), Metric.fromPbFunction(this));
359368
}
360369

361370
@Override
@@ -369,7 +378,7 @@ public Future<Metric> updateAsync(MetricInfo metric) {
369378
.setMetricName(MetricsServiceV2Api.formatMetricName(options().projectId(), metric.name()))
370379
.setMetric(metric.toPb())
371380
.build();
372-
return lazyTransform(rpc.update(request), Metric.fromPbFunction(this));
381+
return transform(rpc.update(request), Metric.fromPbFunction(this));
373382
}
374383

375384
@Override
@@ -382,7 +391,7 @@ public Future<Metric> getMetricAsync(String metric) {
382391
GetLogMetricRequest request = GetLogMetricRequest.newBuilder()
383392
.setMetricName(MetricsServiceV2Api.formatMetricName(options().projectId(), metric))
384393
.build();
385-
return lazyTransform(rpc.get(request), Metric.fromPbFunction(this));
394+
return transform(rpc.get(request), Metric.fromPbFunction(this));
386395
}
387396

388397
private static ListLogMetricsRequest listMetricsRequest(LoggingOptions serviceOptions,
@@ -404,7 +413,7 @@ private static Future<AsyncPage<Metric>> listMetricsAsync(final LoggingOptions s
404413
final Map<Option.OptionType, ?> options) {
405414
final ListLogMetricsRequest request = listMetricsRequest(serviceOptions, options);
406415
Future<ListLogMetricsResponse> list = serviceOptions.rpc().list(request);
407-
return lazyTransform(list, new Function<ListLogMetricsResponse, AsyncPage<Metric>>() {
416+
return transform(list, new Function<ListLogMetricsResponse, AsyncPage<Metric>>() {
408417
@Override
409418
public AsyncPage<Metric> apply(ListLogMetricsResponse listMetricsResponse) {
410419
List<Metric> metrics = listMetricsResponse.getMetricsList() == null
@@ -438,7 +447,7 @@ public Future<Boolean> deleteMetricAsync(String metric) {
438447
DeleteLogMetricRequest request = DeleteLogMetricRequest.newBuilder()
439448
.setMetricName(MetricsServiceV2Api.formatMetricName(options().projectId(), metric))
440449
.build();
441-
return lazyTransform(rpc.delete(request), EMPTY_TO_BOOLEAN_FUNCTION);
450+
return transform(rpc.delete(request), EMPTY_TO_BOOLEAN_FUNCTION);
442451
}
443452

444453
private static WriteLogEntriesRequest writeLogEntriesRequest(LoggingOptions serviceOptions,
@@ -466,9 +475,8 @@ public void write(Iterable<LogEntry> logEntries, WriteOption... options) {
466475
}
467476

468477
public Future<Void> writeAsync(Iterable<LogEntry> logEntries, WriteOption... options) {
469-
return lazyTransform(
470-
rpc.write(writeLogEntriesRequest(options(), logEntries, optionMap(options))),
471-
WRITE_RESPONSE_TO_VOID_FUNCTION);
478+
return transform(rpc.write(writeLogEntriesRequest(options(), logEntries, optionMap(options))),
479+
WRITE_RESPONSE_TO_VOID_FUNCTION);
472480
}
473481

474482
private static ListLogEntriesRequest listLogEntriesRequest(LoggingOptions serviceOptions,
@@ -498,7 +506,7 @@ private static Future<AsyncPage<LogEntry>> listLogEntriesAsync(
498506
final LoggingOptions serviceOptions, final Map<Option.OptionType, ?> options) {
499507
final ListLogEntriesRequest request = listLogEntriesRequest(serviceOptions, options);
500508
Future<ListLogEntriesResponse> list = serviceOptions.rpc().list(request);
501-
return lazyTransform(list, new Function<ListLogEntriesResponse, AsyncPage<LogEntry>>() {
509+
return transform(list, new Function<ListLogEntriesResponse, AsyncPage<LogEntry>>() {
502510
@Override
503511
public AsyncPage<LogEntry> apply(ListLogEntriesResponse listLogEntrysResponse) {
504512
List<LogEntry> entries = listLogEntrysResponse.getEntriesList() == null

gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import static com.google.cloud.pubsub.PubSub.PullOption.OptionType.EXECUTOR_FACTORY;
2222
import static com.google.cloud.pubsub.PubSub.PullOption.OptionType.MAX_QUEUED_CALLBACKS;
2323
import static com.google.common.base.Preconditions.checkArgument;
24-
import static com.google.common.util.concurrent.Futures.lazyTransform;
2524

2625
import com.google.cloud.AsyncPage;
2726
import com.google.cloud.AsyncPageImpl;
@@ -40,6 +39,8 @@
4039
import com.google.common.collect.Iterators;
4140
import com.google.common.collect.Lists;
4241
import com.google.common.collect.Maps;
42+
import com.google.common.util.concurrent.Futures;
43+
import com.google.common.util.concurrent.ListenableFuture;
4344
import com.google.common.util.concurrent.Uninterruptibles;
4445
import com.google.protobuf.Empty;
4546
import com.google.pubsub.v1.AcknowledgeRequest;
@@ -187,14 +188,22 @@ private static <V> V get(Future<V> future) {
187188
}
188189
}
189190

191+
private static <I, O> Future<O> transform(Future<I> future,
192+
Function<? super I, ? extends O> function) {
193+
if (future instanceof ListenableFuture) {
194+
return Futures.transform((ListenableFuture<I>) future, function);
195+
}
196+
return Futures.lazyTransform(future, function);
197+
}
198+
190199
@Override
191200
public Topic create(TopicInfo topic) {
192201
return get(createAsync(topic));
193202
}
194203

195204
@Override
196205
public Future<Topic> createAsync(TopicInfo topic) {
197-
return lazyTransform(rpc.create(topic.toPb(options().projectId())), Topic.fromPbFunction(this));
206+
return transform(rpc.create(topic.toPb(options().projectId())), Topic.fromPbFunction(this));
198207
}
199208

200209
@Override
@@ -207,7 +216,7 @@ public Future<Topic> getTopicAsync(String topic) {
207216
GetTopicRequest request = GetTopicRequest.newBuilder()
208217
.setTopic(PublisherApi.formatTopicName(options().projectId(), topic))
209218
.build();
210-
return lazyTransform(rpc.get(request), Topic.fromPbFunction(this));
219+
return transform(rpc.get(request), Topic.fromPbFunction(this));
211220
}
212221

213222
@Override
@@ -220,7 +229,7 @@ public Future<Boolean> deleteTopicAsync(String topic) {
220229
DeleteTopicRequest request = DeleteTopicRequest.newBuilder()
221230
.setTopic(PublisherApi.formatTopicName(options().projectId(), topic))
222231
.build();
223-
return lazyTransform(rpc.delete(request), EMPTY_TO_BOOLEAN_FUNCTION);
232+
return transform(rpc.delete(request), EMPTY_TO_BOOLEAN_FUNCTION);
224233
}
225234

226235
private static ListTopicsRequest listTopicsRequest(PubSubOptions serviceOptions,
@@ -242,7 +251,7 @@ private static Future<AsyncPage<Topic>> listTopicsAsync(final PubSubOptions serv
242251
final Map<Option.OptionType, ?> options) {
243252
final ListTopicsRequest request = listTopicsRequest(serviceOptions, options);
244253
Future<ListTopicsResponse> list = serviceOptions.rpc().list(request);
245-
return lazyTransform(list, new Function<ListTopicsResponse, AsyncPage<Topic>>() {
254+
return transform(list, new Function<ListTopicsResponse, AsyncPage<Topic>>() {
246255
@Override
247256
public AsyncPage<Topic> apply(ListTopicsResponse listTopicsResponse) {
248257
List<Topic> topics = listTopicsResponse.getTopicsList() == null ? ImmutableList.<Topic>of()
@@ -281,7 +290,7 @@ private static PublishRequest publishRequest(PubSubOptions serviceOptions, Strin
281290

282291
@Override
283292
public Future<String> publishAsync(String topic, Message message) {
284-
return lazyTransform(
293+
return transform(
285294
rpc.publish(publishRequest(options(), topic, Collections.singletonList(message))),
286295
new Function<PublishResponse, String>() {
287296
@Override
@@ -308,7 +317,7 @@ public List<String> publish(String topic, Iterable<Message> messages) {
308317

309318
@Override
310319
public Future<List<String>> publishAsync(String topic, Iterable<Message> messages) {
311-
return lazyTransform(rpc.publish(publishRequest(options(), topic, messages)),
320+
return transform(rpc.publish(publishRequest(options(), topic, messages)),
312321
new Function<PublishResponse, List<String>>() {
313322
@Override
314323
public List<String> apply(PublishResponse publishResponse) {
@@ -324,7 +333,7 @@ public Subscription create(SubscriptionInfo subscription) {
324333

325334
@Override
326335
public Future<Subscription> createAsync(SubscriptionInfo subscription) {
327-
return lazyTransform(rpc.create(subscription.toPb(options().projectId())),
336+
return transform(rpc.create(subscription.toPb(options().projectId())),
328337
Subscription.fromPbFunction(this));
329338
}
330339

@@ -338,7 +347,7 @@ public Future<Subscription> getSubscriptionAsync(String subscription) {
338347
GetSubscriptionRequest request = GetSubscriptionRequest.newBuilder()
339348
.setSubscription(SubscriberApi.formatSubscriptionName(options().projectId(), subscription))
340349
.build();
341-
return lazyTransform(rpc.get(request), Subscription.fromPbFunction(this));
350+
return transform(rpc.get(request), Subscription.fromPbFunction(this));
342351
}
343352

344353
@Override
@@ -353,7 +362,7 @@ public Future<Void> replacePushConfigAsync(String subscription, PushConfig pushC
353362
.setPushConfig(pushConfig != null ? pushConfig.toPb()
354363
: com.google.pubsub.v1.PushConfig.getDefaultInstance())
355364
.build();
356-
return lazyTransform(rpc.modify(request), EMPTY_TO_VOID_FUNCTION);
365+
return transform(rpc.modify(request), EMPTY_TO_VOID_FUNCTION);
357366
}
358367

359368
@Override
@@ -366,7 +375,7 @@ public Future<Boolean> deleteSubscriptionAsync(String subscription) {
366375
DeleteSubscriptionRequest request = DeleteSubscriptionRequest.newBuilder()
367376
.setSubscription(SubscriberApi.formatSubscriptionName(options().projectId(), subscription))
368377
.build();
369-
return lazyTransform(rpc.delete(request), EMPTY_TO_BOOLEAN_FUNCTION);
378+
return transform(rpc.delete(request), EMPTY_TO_BOOLEAN_FUNCTION);
370379
}
371380

372381
private static ListSubscriptionsRequest listSubscriptionsRequest(PubSubOptions serviceOptions,
@@ -388,7 +397,7 @@ private static Future<AsyncPage<Subscription>> listSubscriptionsAsync(
388397
final PubSubOptions serviceOptions, final Map<Option.OptionType, ?> options) {
389398
final ListSubscriptionsRequest request = listSubscriptionsRequest(serviceOptions, options);
390399
Future<ListSubscriptionsResponse> list = serviceOptions.rpc().list(request);
391-
return lazyTransform(list, new Function<ListSubscriptionsResponse, AsyncPage<Subscription>>() {
400+
return transform(list, new Function<ListSubscriptionsResponse, AsyncPage<Subscription>>() {
392401
@Override
393402
public AsyncPage<Subscription> apply(ListSubscriptionsResponse listSubscriptionsResponse) {
394403
List<Subscription> subscriptions = listSubscriptionsResponse.getSubscriptionsList() == null
@@ -432,7 +441,7 @@ private static Future<AsyncPage<SubscriptionId>> listSubscriptionsAsync(final St
432441
final ListTopicSubscriptionsRequest request =
433442
listSubscriptionsRequest(topic, serviceOptions, options);
434443
Future<ListTopicSubscriptionsResponse> list = serviceOptions.rpc().list(request);
435-
return lazyTransform(list,
444+
return transform(list,
436445
new Function<ListTopicSubscriptionsResponse, AsyncPage<SubscriptionId>>() {
437446
@Override
438447
public AsyncPage<SubscriptionId> apply(
@@ -493,7 +502,7 @@ public void failure(Throwable error) {
493502
// ignore
494503
}
495504
});
496-
return lazyTransform(future, new Function<PullResponse, Iterator<ReceivedMessage>>() {
505+
return transform(future, new Function<PullResponse, Iterator<ReceivedMessage>>() {
497506
@Override
498507
public Iterator<ReceivedMessage> apply(PullResponse response) {
499508
return Iterators.transform(response.getReceivedMessagesList().iterator(),
@@ -540,7 +549,7 @@ public Future<Void> ackAsync(String subscription, Iterable<String> ackIds) {
540549
.setSubscription(SubscriberApi.formatSubscriptionName(options().projectId(), subscription))
541550
.addAllAckIds(ackIds)
542551
.build();
543-
return lazyTransform(rpc.acknowledge(request), EMPTY_TO_VOID_FUNCTION);
552+
return transform(rpc.acknowledge(request), EMPTY_TO_VOID_FUNCTION);
544553
}
545554

546555
@Override
@@ -589,7 +598,7 @@ public Future<Void> modifyAckDeadlineAsync(String subscription, int deadline, Ti
589598
.setAckDeadlineSeconds((int) TimeUnit.SECONDS.convert(deadline, unit))
590599
.addAllAckIds(ackIds)
591600
.build();
592-
return lazyTransform(rpc.modify(request), EMPTY_TO_VOID_FUNCTION);
601+
return transform(rpc.modify(request), EMPTY_TO_VOID_FUNCTION);
593602
}
594603

595604
static <T extends Option.OptionType> Map<Option.OptionType, ?> optionMap(Option... options) {

0 commit comments

Comments
 (0)