Skip to content

Commit 2a92e35

Browse files
committed
chore: make accumulating read implement ApiFuture to allow integration with cancellation
1 parent 3b195fe commit 2a92e35

5 files changed

Lines changed: 89 additions & 44 deletions

File tree

google-cloud-storage/src/main/java/com/google/cloud/storage/ObjectReadSessionImpl.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import com.google.api.core.ApiFuture;
2222
import com.google.api.core.ApiFutures;
23-
import com.google.api.core.SettableApiFuture;
2423
import com.google.api.gax.grpc.GrpcCallContext;
2524
import com.google.cloud.storage.GrpcUtils.ZeroCopyBidiStreamingCallable;
2625
import com.google.cloud.storage.ObjectReadSessionStreamRead.AccumulatingRead;
@@ -89,12 +88,11 @@ public ApiFuture<byte[]> readRangeAsBytes(RangeSpec range) {
8988
try {
9089
checkState(open, "stream already closed");
9190
long readId = state.newReadId();
92-
SettableApiFuture<byte[]> future = SettableApiFuture.create();
9391
AccumulatingRead<byte[]> read =
9492
ObjectReadSessionStreamRead.createByteArrayAccumulatingRead(
95-
readId, range, retryContextProvider.create(), future);
93+
readId, range, retryContextProvider.create());
9694
registerReadInState(readId, read);
97-
return future;
95+
return read;
9896
} finally {
9997
lock.unlock();
10098
}
@@ -120,12 +118,11 @@ public ApiFuture<DisposableByteString> readRangeAsByteString(RangeSpec range) {
120118
try {
121119
checkState(open, "stream already closed");
122120
long readId = state.newReadId();
123-
SettableApiFuture<DisposableByteString> future = SettableApiFuture.create();
124121
AccumulatingRead<DisposableByteString> read =
125122
ObjectReadSessionStreamRead.createZeroCopyByteStringAccumulatingRead(
126-
readId, range, retryContextProvider.create(), future);
123+
readId, range, retryContextProvider.create());
127124
registerReadInState(readId, read);
128-
return future;
125+
return read;
129126
} finally {
130127
lock.unlock();
131128
}

google-cloud-storage/src/main/java/com/google/cloud/storage/ObjectReadSessionStreamRead.java

Lines changed: 49 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@
3838
import java.util.List;
3939
import java.util.concurrent.ArrayBlockingQueue;
4040
import java.util.concurrent.BlockingQueue;
41+
import java.util.concurrent.CancellationException;
42+
import java.util.concurrent.ExecutionException;
43+
import java.util.concurrent.Executor;
44+
import java.util.concurrent.TimeUnit;
45+
import java.util.concurrent.TimeoutException;
4146
import java.util.concurrent.atomic.AtomicLong;
4247
import org.checkerframework.checker.nullness.qual.Nullable;
4348

@@ -139,40 +144,33 @@ void setOnCloseCallback(IOAutoCloseable onCloseCallback) {
139144
protected abstract void internalClose() throws IOException;
140145

141146
static AccumulatingRead<byte[]> createByteArrayAccumulatingRead(
142-
long readId,
143-
RangeSpec rangeSpec,
144-
RetryContext retryContext,
145-
SettableApiFuture<byte[]> complete) {
146-
return new ByteArrayAccumulatingRead(
147-
readId, rangeSpec, retryContext, complete, IOAutoCloseable.noOp());
147+
long readId, RangeSpec rangeSpec, RetryContext retryContext) {
148+
return new ByteArrayAccumulatingRead(readId, rangeSpec, retryContext, IOAutoCloseable.noOp());
148149
}
149150

150151
static ZeroCopyByteStringAccumulatingRead createZeroCopyByteStringAccumulatingRead(
151-
long readId,
152-
RangeSpec rangeSpec,
153-
RetryContext retryContext,
154-
SettableApiFuture<DisposableByteString> complete) {
152+
long readId, RangeSpec rangeSpec, RetryContext retryContext) {
155153
return new ZeroCopyByteStringAccumulatingRead(
156-
readId, rangeSpec, retryContext, complete, IOAutoCloseable.noOp());
154+
readId, rangeSpec, retryContext, IOAutoCloseable.noOp());
157155
}
158156

159157
static StreamingRead streamingRead(long readId, RangeSpec rangeSpec, RetryContext retryContext) {
160158
return new StreamingRead(readId, rangeSpec, retryContext, false, IOAutoCloseable.noOp());
161159
}
162160

163161
/** Base class of a read that will accumulate before completing by resolving a future */
164-
abstract static class AccumulatingRead<Result> extends ObjectReadSessionStreamRead {
162+
abstract static class AccumulatingRead<Result> extends ObjectReadSessionStreamRead
163+
implements ApiFuture<Result> {
165164
protected final List<ChildRef> childRefs;
166165
protected final SettableApiFuture<Result> complete;
167166

168167
private AccumulatingRead(
169168
long readId,
170169
RangeSpec rangeSpec,
171170
RetryContext retryContext,
172-
SettableApiFuture<Result> complete,
173171
IOAutoCloseable onCloseCallback) {
174172
super(readId, rangeSpec, retryContext, onCloseCallback);
175-
this.complete = complete;
173+
this.complete = SettableApiFuture.create();
176174
this.childRefs = Collections.synchronizedList(new ArrayList<>());
177175
}
178176

@@ -209,7 +207,7 @@ ApiFuture<?> fail(Throwable t) {
209207
tombstoned = true;
210208
close();
211209
} catch (IOException e) {
212-
t.addSuppressed(t);
210+
t.addSuppressed(e);
213211
} finally {
214212
complete.setException(t);
215213
}
@@ -225,6 +223,40 @@ public void internalClose() throws IOException {
225223
}
226224
}
227225

226+
@Override
227+
public void addListener(Runnable listener, Executor executor) {
228+
complete.addListener(listener, executor);
229+
}
230+
231+
@Override
232+
public boolean cancel(boolean mayInterruptIfRunning) {
233+
if (!complete.isCancelled()) {
234+
fail(new CancellationException());
235+
}
236+
return complete.cancel(mayInterruptIfRunning);
237+
}
238+
239+
@Override
240+
public Result get() throws InterruptedException, ExecutionException {
241+
return complete.get();
242+
}
243+
244+
@Override
245+
public Result get(long timeout, TimeUnit unit)
246+
throws InterruptedException, ExecutionException, TimeoutException {
247+
return complete.get(timeout, unit);
248+
}
249+
250+
@Override
251+
public boolean isCancelled() {
252+
return complete.isCancelled();
253+
}
254+
255+
@Override
256+
public boolean isDone() {
257+
return complete.isDone();
258+
}
259+
228260
@Override
229261
protected boolean canShareStreamWith(Class<? extends ObjectReadSessionStreamRead> clazz) {
230262
return AccumulatingRead.class.isAssignableFrom(clazz);
@@ -476,9 +508,8 @@ private ByteArrayAccumulatingRead(
476508
long readId,
477509
RangeSpec rangeSpec,
478510
RetryContext retryContext,
479-
SettableApiFuture<byte[]> complete,
480511
IOAutoCloseable onCloseCallback) {
481-
super(readId, rangeSpec, retryContext, complete, onCloseCallback);
512+
super(readId, rangeSpec, retryContext, onCloseCallback);
482513
}
483514

484515
private ByteArrayAccumulatingRead(
@@ -539,9 +570,8 @@ private ZeroCopyByteStringAccumulatingRead(
539570
long readId,
540571
RangeSpec rangeSpec,
541572
RetryContext retryContext,
542-
SettableApiFuture<DisposableByteString> complete,
543573
IOAutoCloseable onCloseCallback) {
544-
super(readId, rangeSpec, retryContext, complete, onCloseCallback);
574+
super(readId, rangeSpec, retryContext, onCloseCallback);
545575
}
546576

547577
public ZeroCopyByteStringAccumulatingRead(

google-cloud-storage/src/test/java/com/google/cloud/storage/ObjectReadSessionStateTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,10 +80,10 @@ public void getOpenArguments_includesAllRelevantModifications() throws Exception
8080

8181
AccumulatingRead<byte[]> r1 =
8282
ObjectReadSessionStreamRead.createByteArrayAccumulatingRead(
83-
1, RangeSpec.of(3, 4), neverRetry, f1);
83+
1, RangeSpec.of(3, 4), neverRetry);
8484
AccumulatingRead<byte[]> r2 =
8585
ObjectReadSessionStreamRead.createByteArrayAccumulatingRead(
86-
2, RangeSpec.of(19, 14), neverRetry, f2);
86+
2, RangeSpec.of(19, 14), neverRetry);
8787

8888
state.putOutstandingRead(1, r1);
8989
state.putOutstandingRead(2, r2);
@@ -223,7 +223,7 @@ public void canHandleNewRead() throws Exception {
223223

224224
try (AccumulatingRead<byte[]> bytes =
225225
ObjectReadSessionStreamRead.createByteArrayAccumulatingRead(
226-
2, RangeSpec.all(), RetryContext.neverRetry(), SettableApiFuture.create());
226+
2, RangeSpec.all(), RetryContext.neverRetry());
227227
StreamingRead streaming2 =
228228
ObjectReadSessionStreamRead.streamingRead(
229229
4, RangeSpec.all(), RetryContext.neverRetry())) {

google-cloud-storage/src/test/java/com/google/cloud/storage/ObjectReadSessionStreamReadTest.java

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import static com.google.common.truth.Truth.assertThat;
2222
import static org.junit.Assert.assertThrows;
2323

24-
import com.google.api.core.SettableApiFuture;
2524
import com.google.cloud.storage.ObjectReadSessionStreamRead.AccumulatingRead;
2625
import com.google.cloud.storage.ObjectReadSessionStreamRead.StreamingRead;
2726
import com.google.cloud.storage.ObjectReadSessionStreamRead.ZeroCopyByteStringAccumulatingRead;
@@ -60,17 +59,16 @@ public void byteArrayAccumulatingRead_happyPath()
6059
handle.borrow(Function.identity());
6160
handle.close();
6261

63-
SettableApiFuture<byte[]> complete = SettableApiFuture.create();
6462
AccumulatingRead<byte[]> byteArrayAccumulatingRead =
6563
ObjectReadSessionStreamRead.createByteArrayAccumulatingRead(
66-
1, RangeSpec.of(0, 137), RetryContext.neverRetry(), complete);
64+
1, RangeSpec.of(0, 137), RetryContext.neverRetry());
6765

6866
byteArrayAccumulatingRead.accept(childRef);
6967
byteArrayAccumulatingRead.eof();
7068

7169
String expectedBytes = xxd(genBytes);
7270

73-
byte[] actualFutureBytes = complete.get(1, TimeUnit.SECONDS);
71+
byte[] actualFutureBytes = byteArrayAccumulatingRead.get(1, TimeUnit.SECONDS);
7472
assertThat(xxd(actualFutureBytes)).isEqualTo(expectedBytes);
7573
assertThat(closed.get()).isTrue();
7674
}
@@ -90,10 +88,9 @@ public void byteArrayAccumulatingRead_childRef_close_ioException_propagated() th
9088
handle.borrow(Function.identity());
9189
handle.close();
9290

93-
SettableApiFuture<byte[]> complete = SettableApiFuture.create();
9491
AccumulatingRead<byte[]> byteArrayAccumulatingRead =
9592
ObjectReadSessionStreamRead.createByteArrayAccumulatingRead(
96-
1, RangeSpec.of(0, 137), RetryContext.neverRetry(), complete);
93+
1, RangeSpec.of(0, 137), RetryContext.neverRetry());
9794

9895
IOException ioException =
9996
assertThrows(
@@ -108,11 +105,10 @@ public void byteArrayAccumulatingRead_childRef_close_ioException_propagated() th
108105
@Test
109106
public void byteArrayAccumulatingRead_producesAnAccurateReadRange()
110107
throws IOException, ExecutionException, InterruptedException, TimeoutException {
111-
SettableApiFuture<byte[]> complete = SettableApiFuture.create();
112108
int readId = 1;
113109
try (AccumulatingRead<byte[]> read =
114110
ObjectReadSessionStreamRead.createByteArrayAccumulatingRead(
115-
readId, RangeSpec.of(0, 137), RetryContext.neverRetry(), complete)) {
111+
readId, RangeSpec.of(0, 137), RetryContext.neverRetry())) {
116112

117113
ReadRange readRange1 = read.makeReadRange();
118114
ReadRange expectedReadRange1 =
@@ -151,7 +147,7 @@ public void byteArrayAccumulatingRead_producesAnAccurateReadRange()
151147
ReadRange.newBuilder().setReadId(readId).setReadOffset(137).setReadLength(0).build();
152148
assertThat(readRange4).isEqualTo(expectedReadRange4);
153149

154-
byte[] actualBytes = complete.get(1, TimeUnit.SECONDS);
150+
byte[] actualBytes = read.get(1, TimeUnit.SECONDS);
155151
assertThat(xxd(actualBytes)).isEqualTo(xxd(DataGenerator.base64Characters().genBytes(137)));
156152
}
157153
}
@@ -456,10 +452,10 @@ public void streamingRead_withNewReadIdDoesNotOrphanAnyData() throws Exception {
456452
public void canShareStreamWith() throws Exception {
457453
try (AccumulatingRead<byte[]> bytes =
458454
ObjectReadSessionStreamRead.createByteArrayAccumulatingRead(
459-
1, RangeSpec.all(), RetryContext.neverRetry(), SettableApiFuture.create());
455+
1, RangeSpec.all(), RetryContext.neverRetry());
460456
ZeroCopyByteStringAccumulatingRead byteString =
461457
ObjectReadSessionStreamRead.createZeroCopyByteStringAccumulatingRead(
462-
2, RangeSpec.all(), RetryContext.neverRetry(), SettableApiFuture.create());
458+
2, RangeSpec.all(), RetryContext.neverRetry());
463459
StreamingRead streamingRead =
464460
ObjectReadSessionStreamRead.streamingRead(
465461
3, RangeSpec.all(), RetryContext.neverRetry())) {
@@ -505,6 +501,30 @@ protected void internalClose() throws IOException {
505501
() -> assertThat(closed.get()).isTrue());
506502
}
507503

504+
@Test
505+
public void accumulating_futureCancel_disposes() throws IOException {
506+
byte[] genBytes = DataGenerator.base64Characters().genBytes(137);
507+
ByteString byteString = UnsafeByteOperations.unsafeWrap(genBytes);
508+
AtomicBoolean closed = new AtomicBoolean(false);
509+
Closeable close = () -> closed.set(true);
510+
ResponseContentLifecycleHandle<ByteString> handle =
511+
ResponseContentLifecycleHandle.create(
512+
byteString, ByteString::asReadOnlyByteBufferList, close);
513+
ResponseContentLifecycleHandle<ByteString>.ChildRef childRef =
514+
handle.borrow(Function.identity());
515+
handle.close();
516+
517+
AccumulatingRead<byte[]> byteArrayAccumulatingRead =
518+
ObjectReadSessionStreamRead.createByteArrayAccumulatingRead(
519+
1, RangeSpec.of(0, 137), RetryContext.neverRetry());
520+
521+
byteArrayAccumulatingRead.accept(childRef);
522+
523+
byteArrayAccumulatingRead.cancel(true);
524+
525+
assertThat(closed.get()).isTrue();
526+
}
527+
508528
private static ResponseContentLifecycleHandle<ByteString> noopContentHandle(
509529
ByteString byteString) {
510530
return ResponseContentLifecycleHandle.create(

google-cloud-storage/src/test/java/com/google/cloud/storage/ObjectReadSessionStreamTest.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -282,10 +282,9 @@ public void streamingRead_mustCloseQueuedResponsesWhenFailed() throws Exception
282282

283283
@Test
284284
public void accumulatingRead_mustCloseQueuedResponsesWhenFailed() throws Exception {
285-
SettableApiFuture<byte[]> complete = SettableApiFuture.create();
286285
try (AccumulatingRead<byte[]> read1 =
287286
ObjectReadSessionStreamRead.createByteArrayAccumulatingRead(
288-
1, RangeSpec.all(), RetryContext.neverRetry(), complete)) {
287+
1, RangeSpec.all(), RetryContext.neverRetry())) {
289288
state.putOutstandingRead(1, read1);
290289
ObjectReadSessionStream stream =
291290
ObjectReadSessionStream.create(exec, callable, state, RetryContext.neverRetry());
@@ -314,8 +313,7 @@ public void accumulatingRead_mustCloseQueuedResponsesWhenFailed() throws Excepti
314313
stream.close();
315314

316315
StorageException se =
317-
assertThrows(
318-
StorageException.class, () -> TestUtils.await(complete, 2, TimeUnit.SECONDS));
316+
assertThrows(StorageException.class, () -> TestUtils.await(read1, 2, TimeUnit.SECONDS));
319317
assertAll(
320318
() -> assertThat(bytes1Close.get()).isTrue(),
321319
() -> assertThat(bytes2Close.get()).isTrue(),

0 commit comments

Comments
 (0)