Skip to content

Commit 944e0bc

Browse files
committed
chore: add test to validate checksum validation handling for RangeProjectionConfigs.asChannel
Cleanup state change to reuse the same channel instance rather than allocating a new one. For byte[] a new container isn't a big deal, as that container is still below the future. For ScatteringByteChannel the channel is provided to the user of the method, and they will hold onto the instance for its lifetime. This avoids a potential inconsistency on the external state of the BlobReadSession and the internal state, where the readId could diverge despite using the same response queue.
1 parent eb8c5c8 commit 944e0bc

File tree

5 files changed

+121
-57
lines changed

5 files changed

+121
-57
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/BaseObjectReadSessionStreamRead.java

Lines changed: 29 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050
abstract class BaseObjectReadSessionStreamRead<Projection>
5151
implements ObjectReadSessionStreamRead<Projection> {
5252

53-
protected final long readId;
5453
protected final RangeSpec rangeSpec;
5554
protected final RetryContext retryContext;
5655
protected final AtomicLong readOffset;
@@ -59,22 +58,16 @@ abstract class BaseObjectReadSessionStreamRead<Projection>
5958
protected IOAutoCloseable onCloseCallback;
6059

6160
BaseObjectReadSessionStreamRead(
62-
long readId,
63-
RangeSpec rangeSpec,
64-
RetryContext retryContext,
65-
IOAutoCloseable onCloseCallback) {
66-
this(
67-
readId, rangeSpec, new AtomicLong(rangeSpec.begin()), retryContext, onCloseCallback, false);
61+
RangeSpec rangeSpec, RetryContext retryContext, IOAutoCloseable onCloseCallback) {
62+
this(rangeSpec, new AtomicLong(rangeSpec.begin()), retryContext, onCloseCallback, false);
6863
}
6964

7065
BaseObjectReadSessionStreamRead(
71-
long readId,
7266
RangeSpec rangeSpec,
7367
AtomicLong readOffset,
7468
RetryContext retryContext,
7569
IOAutoCloseable onCloseCallback,
7670
boolean closed) {
77-
this.readId = readId;
7871
this.rangeSpec = rangeSpec;
7972
this.retryContext = retryContext;
8073
this.readOffset = readOffset;
@@ -83,6 +76,8 @@ abstract class BaseObjectReadSessionStreamRead<Projection>
8376
this.onCloseCallback = onCloseCallback;
8477
}
8578

79+
abstract long readId();
80+
8681
@Override
8782
public long readOffset() {
8883
return readOffset.get();
@@ -96,7 +91,7 @@ public final void preFail() {
9691
@Override
9792
public final ReadRange makeReadRange() {
9893
long currentOffset = readOffset.get();
99-
ReadRange.Builder b = ReadRange.newBuilder().setReadId(readId).setReadOffset(currentOffset);
94+
ReadRange.Builder b = ReadRange.newBuilder().setReadId(readId()).setReadOffset(currentOffset);
10095
rangeSpec
10196
.limit()
10297
.ifPresent(
@@ -141,13 +136,15 @@ abstract static class AccumulatingRead<Result>
141136
extends BaseObjectReadSessionStreamRead<ApiFuture<Result>> implements ApiFuture<Result> {
142137
protected final List<ChildRef> childRefs;
143138
protected final SettableApiFuture<Result> complete;
139+
protected final long readId;
144140

145141
private AccumulatingRead(
146142
long readId,
147143
RangeSpec rangeSpec,
148144
RetryContext retryContext,
149145
IOAutoCloseable onCloseCallback) {
150-
super(readId, rangeSpec, retryContext, onCloseCallback);
146+
super(rangeSpec, retryContext, onCloseCallback);
147+
this.readId = readId;
151148
this.complete = SettableApiFuture.create();
152149
this.childRefs = Collections.synchronizedList(new ArrayList<>());
153150
}
@@ -161,11 +158,17 @@ private AccumulatingRead(
161158
boolean closed,
162159
SettableApiFuture<Result> complete,
163160
IOAutoCloseable onCloseCallback) {
164-
super(readId, rangeSpec, readOffset, retryContext, onCloseCallback, closed);
161+
super(rangeSpec, readOffset, retryContext, onCloseCallback, closed);
162+
this.readId = readId;
165163
this.childRefs = childRefs;
166164
this.complete = complete;
167165
}
168166

167+
@Override
168+
long readId() {
169+
return readId;
170+
}
171+
169172
@Override
170173
public boolean acceptingBytes() {
171174
return !complete.isDone() && !tombstoned;
@@ -247,47 +250,31 @@ public boolean canShareStreamWith(ObjectReadSessionStreamRead<?> other) {
247250
*/
248251
static class StreamingRead extends BaseObjectReadSessionStreamRead<ScatteringByteChannel>
249252
implements UnbufferedReadableByteChannel {
253+
250254
private final SettableApiFuture<Void> failFuture;
251255
private final BlockingQueue<Closeable> queue;
252256

257+
private AtomicLong readId;
253258
private boolean complete;
254259
@Nullable private ChildRefHelper leftovers;
255260

256261
StreamingRead(
257262
long readId,
258263
RangeSpec rangeSpec,
259264
RetryContext retryContext,
260-
boolean closed,
261265
IOAutoCloseable onCloseCallback) {
262-
this(
263-
readId,
264-
rangeSpec,
265-
new AtomicLong(rangeSpec.begin()),
266-
retryContext,
267-
closed,
268-
SettableApiFuture.create(),
269-
new ArrayBlockingQueue<>(2),
270-
false,
271-
null,
272-
onCloseCallback);
266+
super(rangeSpec, retryContext, onCloseCallback);
267+
this.readId = new AtomicLong(readId);
268+
this.closed = false;
269+
this.failFuture = SettableApiFuture.create();
270+
this.queue = new ArrayBlockingQueue<>(2);
271+
this.complete = false;
272+
this.leftovers = null;
273273
}
274274

275-
private StreamingRead(
276-
long newReadId,
277-
RangeSpec rangeSpec,
278-
AtomicLong readOffset,
279-
RetryContext retryContext,
280-
boolean closed,
281-
SettableApiFuture<Void> failFuture,
282-
BlockingQueue<Closeable> queue,
283-
boolean complete,
284-
@Nullable ChildRefHelper leftovers,
285-
IOAutoCloseable onCloseCallback) {
286-
super(newReadId, rangeSpec, readOffset, retryContext, onCloseCallback, closed);
287-
this.failFuture = failFuture;
288-
this.queue = queue;
289-
this.complete = complete;
290-
this.leftovers = leftovers;
275+
@Override
276+
long readId() {
277+
return readId.get();
291278
}
292279

293280
@Override
@@ -323,18 +310,8 @@ public ApiFuture<?> fail(Throwable t) {
323310

324311
@Override
325312
public StreamingRead withNewReadId(long newReadId) {
326-
tombstoned = true;
327-
return new StreamingRead(
328-
newReadId,
329-
rangeSpec,
330-
readOffset,
331-
retryContext,
332-
closed,
333-
failFuture,
334-
queue,
335-
complete,
336-
leftovers,
337-
onCloseCallback);
313+
readId.set(newReadId);
314+
return this;
338315
}
339316

340317
@Override

google-cloud-storage/src/main/java/com/google/cloud/storage/ObjectReadSessionStreamRead.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,6 @@ static ZeroCopyByteStringAccumulatingRead createZeroCopyByteStringAccumulatingRe
6969
}
7070

7171
static StreamingRead streamingRead(long readId, RangeSpec rangeSpec, RetryContext retryContext) {
72-
return new StreamingRead(readId, rangeSpec, retryContext, false, IOAutoCloseable.noOp());
72+
return new StreamingRead(readId, rangeSpec, retryContext, IOAutoCloseable.noOp());
7373
}
7474
}

google-cloud-storage/src/test/java/com/google/cloud/storage/ITObjectReadSessionFakeTest.java

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
import java.nio.channels.AsynchronousCloseException;
8282
import java.nio.channels.Channels;
8383
import java.nio.channels.ScatteringByteChannel;
84+
import java.nio.channels.WritableByteChannel;
8485
import java.security.Key;
8586
import java.util.Collection;
8687
import java.util.Collections;
@@ -1453,6 +1454,87 @@ public void gettingSessionFromFastOpenKeepsTheSessionOpenUntilClosed() throws Ex
14531454
}
14541455
}
14551456

1457+
@Test
1458+
public void expectRetryForRangeWithFailedChecksumValidation_channel() throws Exception {
1459+
ChecksummedTestContent expected = ChecksummedTestContent.of(ALL_OBJECT_BYTES, 10, 20);
1460+
1461+
ChecksummedTestContent content2_1 = ChecksummedTestContent.of(ALL_OBJECT_BYTES, 10, 10);
1462+
ChecksummedTestContent content2_2 = ChecksummedTestContent.of(ALL_OBJECT_BYTES, 20, 10);
1463+
BidiReadObjectRequest req2 = read(1, 10, 20);
1464+
BidiReadObjectResponse res2_1 =
1465+
BidiReadObjectResponse.newBuilder()
1466+
.addObjectDataRanges(
1467+
ObjectRangeData.newBuilder()
1468+
.setChecksummedData(content2_1.asChecksummedData())
1469+
.setReadRange(getReadRange(1, 10, 10))
1470+
.build())
1471+
.build();
1472+
BidiReadObjectResponse res2_2 =
1473+
BidiReadObjectResponse.newBuilder()
1474+
.setMetadata(METADATA)
1475+
.addObjectDataRanges(
1476+
ObjectRangeData.newBuilder()
1477+
.setChecksummedData(content2_2.asChecksummedData().toBuilder().setCrc32C(1))
1478+
.setReadRange(getReadRange(1, 20, 10))
1479+
.setRangeEnd(true)
1480+
.build())
1481+
.build();
1482+
1483+
BidiReadObjectRequest req3 =
1484+
BidiReadObjectRequest.newBuilder().addReadRanges(getReadRange(2, 20, 10)).build();
1485+
BidiReadObjectResponse res3 =
1486+
BidiReadObjectResponse.newBuilder()
1487+
.addObjectDataRanges(
1488+
ObjectRangeData.newBuilder()
1489+
.setChecksummedData(content2_2.asChecksummedData())
1490+
.setReadRange(getReadRange(2, 20, 10))
1491+
.setRangeEnd(true)
1492+
.build())
1493+
.build();
1494+
1495+
FakeStorage fake =
1496+
FakeStorage.of(
1497+
ImmutableMap.of(
1498+
REQ_OPEN,
1499+
respond -> respond.onNext(RES_OPEN),
1500+
req2,
1501+
respond -> {
1502+
respond.onNext(res2_1);
1503+
respond.onNext(res2_2);
1504+
},
1505+
req3,
1506+
respond -> respond.onNext(res3)));
1507+
1508+
try (FakeServer fakeServer = FakeServer.of(fake);
1509+
Storage storage = fakeServer.getGrpcStorageOptions().getService()) {
1510+
1511+
BlobId id = BlobId.of("b", "o");
1512+
ApiFuture<BlobReadSession> futureObjectDescriptor = storage.blobReadSession(id);
1513+
1514+
try (BlobReadSession bd = futureObjectDescriptor.get(5, TimeUnit.SECONDS)) {
1515+
1516+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
1517+
try (ScatteringByteChannel r =
1518+
bd.readRange(RangeSpec.of(10, 20), RangeProjectionConfigs.asChannel());
1519+
WritableByteChannel w = Channels.newChannel(baos)) {
1520+
ByteStreams.copy(r, w);
1521+
}
1522+
1523+
byte[] actual = baos.toByteArray();
1524+
Crc32cLengthKnown actualCrc32c = Hasher.enabled().hash(ByteBuffer.wrap(actual));
1525+
1526+
byte[] expectedBytes = expected.getBytes();
1527+
Crc32cLengthKnown expectedCrc32c =
1528+
Crc32cValue.of(expected.getCrc32c(), expectedBytes.length);
1529+
1530+
assertAll(
1531+
() -> assertThat(actual).hasLength(expectedBytes.length),
1532+
() -> assertThat(xxd(actual)).isEqualTo(xxd(expectedBytes)),
1533+
() -> assertThat(actualCrc32c).isEqualTo(expectedCrc32c));
1534+
}
1535+
}
1536+
}
1537+
14561538
private static void runTestAgainstFakeServer(
14571539
FakeStorage fakeStorage, RangeSpec range, ChecksummedTestContent expected) throws Exception {
14581540

google-cloud-storage/src/test/java/com/google/cloud/storage/ObjectReadSessionStreamReadTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -435,10 +435,9 @@ public void streamingRead_withNewReadIdDoesNotOrphanAnyData() throws Exception {
435435
assertThat(read1.read(slice2)).isEqualTo(16);
436436
buf.position(buf.position() + 16);
437437

438-
// switch read is (like would happen during a retry)
438+
// update read id (like would happen during a retry)
439439
StreamingRead read2 = read1.withNewReadId(2);
440-
assertThat(read1.acceptingBytes()).isFalse(); // make sure read1 is no longer accepting bytes
441-
assertThat(read2.acceptingBytes()).isTrue();
440+
assertThat(read2).isSameInstanceAs(read1);
442441

443442
// make sure we can read from both read1 and read 2
444443
ByteBuffer slice3 = (ByteBuffer) buf.slice().limit(16);

google-cloud-storage/src/test/java/com/google/cloud/storage/ObjectReadSessionStreamTest.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -327,17 +327,23 @@ static class TestObjectReadSessionStreamRead
327327
extends BaseObjectReadSessionStreamRead<java.lang.Object> {
328328

329329
private static final AtomicLong readIdSeq = new AtomicLong(1);
330+
protected final long readId;
330331
private boolean readyToSend = false;
331332
private final SettableApiFuture<Throwable> fail = SettableApiFuture.create();
332333

333334
TestObjectReadSessionStreamRead(long readId, RangeSpec rangeSpec, RetryContext retryContext) {
334335
super(
335-
readId,
336336
rangeSpec,
337337
new AtomicLong(rangeSpec.begin()),
338338
retryContext,
339339
IOAutoCloseable.noOp(),
340340
false);
341+
this.readId = readId;
342+
}
343+
344+
@Override
345+
long readId() {
346+
return readId;
341347
}
342348

343349
@Override

0 commit comments

Comments
 (0)