Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1514,13 +1514,18 @@ public AppendableUploadState getAppendableState(
: getBidiWriteObjectRequest(info, opts, /* appendable= */ true);
AppendableUploadState state;
if (takeOver) {
Crc32cValue.Crc32cLengthKnown initialCrc32c = null;
if (info.getCrc32c() != null && info.getSize() != null) {
initialCrc32c =
Crc32cValue.of(Utils.crc32cCodec.decode(info.getCrc32c()), info.getSize());
}
state =
BidiUploadState.appendableTakeover(
req,
Retrying::newCallContext,
maxPendingBytes,
SettableApiFuture.create(),
/* initialCrc32c= */ null);
initialCrc32c);
} else {
state =
BidiUploadState.appendableNew(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -921,6 +921,85 @@ public void crc32cWorks() throws Exception {
}
}

@Test
public void takeoverChecksumsWorks() throws Exception {
byte[] b = new byte[10];
DataGenerator.base64Characters().fill(b, 0, 3); // ABC
DataGenerator.base64Characters().fill(b, 3, 7); // DEFGHIJ
ChecksummedTestContent existing = ChecksummedTestContent.of(b, 0, 3);
ChecksummedTestContent appendData = ChecksummedTestContent.of(b, 3, 7);
ChecksummedTestContent all = ChecksummedTestContent.of(b);

BidiWriteObjectRequest takeoverInitialReq =
BidiWriteObjectRequest.newBuilder()
.setAppendObjectSpec(
AppendObjectSpec.newBuilder()
.setBucket(METADATA.getBucket())
.setObject(METADATA.getName())
.setGeneration(METADATA.getGeneration())
.build())
.setStateLookup(true)
.build();
BidiWriteObjectResponse takeoverResNoResource =
BidiWriteObjectResponse.newBuilder()
.setPersistedSize(3)
.build();

BidiWriteObjectRequest req1 =
BidiWriteObjectRequest.newBuilder()
.setWriteOffset(3)
.setChecksummedData(appendData.asChecksummedData())
.setFinishWrite(true)
.setObjectChecksums(ObjectChecksums.newBuilder().setCrc32C(all.getCrc32c()).build())
.build();
BidiWriteObjectResponse res1 =
BidiWriteObjectResponse.newBuilder()
.setResource(
Object.newBuilder()
.setName(METADATA.getName())
.setBucket(METADATA.getBucket())
.setGeneration(METADATA.getGeneration())
.setSize(10)
.setFinalizeTime(timestampNow())
.setChecksums(ObjectChecksums.newBuilder().setCrc32C(all.getCrc32c()).build())
.build())
.build();

FakeStorage fake =
FakeStorage.of(
ImmutableMap.of(
takeoverInitialReq,
respond -> respond.onNext(takeoverResNoResource),
req1,
respond -> {
respond.onNext(res1);
respond.onCompleted();
}));
try (FakeServer fakeServer = FakeServer.of(fake);
Storage storage = fakeServer.getGrpcStorageOptions().toBuilder().build().getService()) {
BlobId id = BlobId.of("b", "o", METADATA.getGeneration());

BlobInfo done1 =
BlobInfo.newBuilder(id)
.setSize(3L)
.setCrc32c(Utils.crc32cCodec.encode(existing.getCrc32c()))
.build();

BlobAppendableUploadConfig config =
BlobAppendableUploadConfig.of()
.withFlushPolicy(FlushPolicy.maxFlushSize(10))
.withCloseAction(CloseAction.FINALIZE_WHEN_CLOSING);
BlobAppendableUpload upload =
storage.blobAppendableUpload(done1, config);
try (AppendableUploadWriteableByteChannel channel = upload.open()) {
StorageChannelUtils.blockingEmptyTo(ByteBuffer.wrap(appendData.getBytes()), channel);
}
ApiFuture<BlobInfo> result = upload.getResult();
BlobInfo finalInfo = result.get(5, TimeUnit.SECONDS);
assertThat(finalInfo.getCrc32c()).isEqualTo(Utils.crc32cCodec.encode(all.getCrc32c()));
}
}

/**
* If a stream is held open for an extended period (i.e. longer than the configured retry timeout)
* and the server returns an error, we want to make sure the currently pending request is able to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,16 +189,15 @@ public void appendableBlobUploadTakeover() throws Exception {
assertThat(done1.getCrc32c()).isEqualTo(Utils.crc32cCodec.encode(c1.getCrc32c()));

BlobAppendableUpload takeOver =
storage.blobAppendableUpload(
BlobInfo.newBuilder(done1.getBlobId()).build(), p.uploadConfig);
storage.blobAppendableUpload(done1, p.uploadConfig);
try (AppendableUploadWriteableByteChannel channel = takeOver.open()) {
int written = Buffers.emptyTo(ByteBuffer.wrap(c2.getBytes()), channel);
assertThat(written).isEqualTo(c2.length());
}
BlobInfo done2 = takeOver.getResult().get(5, TimeUnit.SECONDS);

assertThat(done2.getSize()).isEqualTo(p.content.length());
assertThat(done2.getCrc32c()).isAnyOf(Utils.crc32cCodec.encode(p.content.getCrc32c()), null);
assertThat(done2.getCrc32c()).isEqualTo(Utils.crc32cCodec.encode(p.content.getCrc32c()));
}

@Test
Expand Down Expand Up @@ -251,15 +250,16 @@ public void takeoverJustToFinalizeWorks() throws Exception {
assertThat(done1.getCrc32c()).isEqualTo(Utils.crc32cCodec.encode(p.content.getCrc32c()));

BlobAppendableUpload takeOver =
storage.blobAppendableUpload(
BlobInfo.newBuilder(done1.getBlobId()).build(), p.uploadConfig);
storage.blobAppendableUpload(done1, p.uploadConfig);
takeOver.open().finalizeAndClose();

BlobInfo done2 = takeOver.getResult().get(5, TimeUnit.SECONDS);
assertAll(
() -> assertThat(done2).isNotNull(),
() -> assertThat(done2.getSize()).isEqualTo(p.content.length()),
() -> assertThat(done2.getCrc32c()).isNotNull());
() ->
assertThat(done2.getCrc32c())
.isEqualTo(Utils.crc32cCodec.encode(p.content.getCrc32c())));
}

private void checkTestbenchIssue733() {
Expand Down
Loading