Skip to content

Commit c2485ba

Browse files
hiranya911garrettjonesgoogle
authored andcommitted
Added transform and transformAsync overrides that accept an Executor (#38)
1 parent 8f2d2aa commit c2485ba

2 files changed

Lines changed: 77 additions & 0 deletions

File tree

sdk-platform-java/api-common-java/src/main/java/com/google/api/core/ApiFutures.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,17 @@ public static <V, X> ApiFuture<X> transform(
100100
listenableFutureForApiFuture(input), new GaxFunctionToGuavaFunction<V, X>(function)));
101101
}
102102

103+
public static <V, X> ApiFuture<X> transform(
104+
ApiFuture<? extends V> input,
105+
final ApiFunction<? super V, ? extends X> function,
106+
Executor executor) {
107+
return new ListenableFutureToApiFuture<>(
108+
Futures.transform(
109+
listenableFutureForApiFuture(input),
110+
new GaxFunctionToGuavaFunction<V, X>(function),
111+
executor));
112+
}
113+
103114
public static <V> ApiFuture<List<V>> allAsList(
104115
Iterable<? extends ApiFuture<? extends V>> futures) {
105116
return new ListenableFutureToApiFuture<>(
@@ -128,6 +139,22 @@ public ListenableFuture<O> apply(I input) throws Exception {
128139
return new ListenableFutureToApiFuture<>(listenableOutput);
129140
}
130141

142+
public static <I, O> ApiFuture<O> transformAsync(
143+
ApiFuture<I> input, final ApiAsyncFunction<I, O> function, Executor executor) {
144+
ListenableFuture<I> listenableInput = listenableFutureForApiFuture(input);
145+
ListenableFuture<O> listenableOutput =
146+
Futures.transformAsync(
147+
listenableInput,
148+
new AsyncFunction<I, O>() {
149+
@Override
150+
public ListenableFuture<O> apply(I input) throws Exception {
151+
return listenableFutureForApiFuture(function.apply(input));
152+
}
153+
},
154+
executor);
155+
return new ListenableFutureToApiFuture<>(listenableOutput);
156+
}
157+
131158
private static <V> ListenableFuture<V> listenableFutureForApiFuture(ApiFuture<V> apiFuture) {
132159
ListenableFuture<V> listenableFuture;
133160
if (apiFuture instanceof AbstractApiFuture) {

sdk-platform-java/api-common-java/src/test/java/com/google/api/core/ApiFuturesTest.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.List;
3737
import java.util.concurrent.CancellationException;
3838
import java.util.concurrent.ExecutionException;
39+
import java.util.concurrent.Executor;
3940
import java.util.concurrent.atomic.AtomicInteger;
4041
import org.junit.Test;
4142

@@ -95,6 +96,31 @@ public String apply(Integer input) {
9596
assertThat(transformedFuture.get()).isEqualTo("6");
9697
}
9798

99+
@Test
100+
public void testTransformWithExecutor() throws Exception {
101+
SettableApiFuture<Integer> inputFuture = SettableApiFuture.<Integer>create();
102+
final AtomicInteger counter = new AtomicInteger(0);
103+
ApiFuture<String> transformedFuture =
104+
ApiFutures.transform(
105+
inputFuture,
106+
new ApiFunction<Integer, String>() {
107+
@Override
108+
public String apply(Integer input) {
109+
return input.toString();
110+
}
111+
},
112+
new Executor() {
113+
@Override
114+
public void execute(Runnable command) {
115+
counter.incrementAndGet();
116+
command.run();
117+
}
118+
});
119+
inputFuture.set(6);
120+
assertThat(transformedFuture.get()).isEqualTo("6");
121+
assertThat(counter.get()).isEqualTo(1);
122+
}
123+
98124
@Test
99125
public void testAllAsList() throws Exception {
100126
SettableApiFuture<Integer> inputFuture1 = SettableApiFuture.<Integer>create();
@@ -121,6 +147,30 @@ public ApiFuture<Integer> apply(Integer input) {
121147
assertThat(outputFuture.get()).isEqualTo(1);
122148
}
123149

150+
@Test
151+
public void testTransformAsyncWithExecutor() throws Exception {
152+
ApiFuture<Integer> inputFuture = ApiFutures.immediateFuture(0);
153+
final AtomicInteger counter = new AtomicInteger(0);
154+
ApiFuture<Integer> outputFuture =
155+
ApiFutures.transformAsync(
156+
inputFuture,
157+
new ApiAsyncFunction<Integer, Integer>() {
158+
@Override
159+
public ApiFuture<Integer> apply(Integer input) {
160+
return ApiFutures.immediateFuture(input + 1);
161+
}
162+
},
163+
new Executor() {
164+
@Override
165+
public void execute(Runnable command) {
166+
counter.incrementAndGet();
167+
command.run();
168+
}
169+
});
170+
assertThat(outputFuture.get()).isEqualTo(1);
171+
assertThat(counter.get()).isEqualTo(1);
172+
}
173+
124174
@Test
125175
public void testImmediateFailedFuture() throws InterruptedException {
126176
ApiFuture<String> future =

0 commit comments

Comments
 (0)