Skip to content

Commit bee4308

Browse files
JesseLovelaceBenWhitehead
authored andcommitted
chore: do a metadata lookup if we try to do a reconnect and don't have a generation
1 parent 28f2759 commit bee4308

File tree

4 files changed

+193
-18
lines changed

4 files changed

+193
-18
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiUnbufferedAppendableWriteableByteChannel.java

Lines changed: 61 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222
import com.google.api.gax.rpc.ApiStreamObserver;
2323
import com.google.api.gax.rpc.BidiStreamingCallable;
2424
import com.google.api.gax.rpc.ErrorDetails;
25+
import com.google.api.gax.rpc.NotFoundException;
2526
import com.google.api.gax.rpc.OutOfRangeException;
27+
import com.google.api.gax.rpc.UnaryCallable;
2628
import com.google.cloud.storage.ChunkSegmenter.ChunkSegment;
2729
import com.google.cloud.storage.Conversions.Decoder;
2830
import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown;
@@ -33,12 +35,15 @@
3335
import com.google.common.collect.ImmutableList;
3436
import com.google.common.collect.ImmutableMap;
3537
import com.google.protobuf.ByteString;
38+
import com.google.protobuf.FieldMask;
3639
import com.google.storage.v2.AppendObjectSpec;
3740
import com.google.storage.v2.BidiWriteHandle;
3841
import com.google.storage.v2.BidiWriteObjectRedirectedError;
3942
import com.google.storage.v2.BidiWriteObjectRequest;
4043
import com.google.storage.v2.BidiWriteObjectResponse;
4144
import com.google.storage.v2.ChecksummedData;
45+
import com.google.storage.v2.GetObjectRequest;
46+
import com.google.storage.v2.Object;
4247
import com.google.storage.v2.ObjectChecksums;
4348
import java.io.IOException;
4449
import java.nio.ByteBuffer;
@@ -61,6 +66,7 @@
6166
final class GapicBidiUnbufferedAppendableWritableByteChannel
6267
implements UnbufferedWritableByteChannel {
6368
private final BidiStreamingCallable<BidiWriteObjectRequest, BidiWriteObjectResponse> write;
69+
private final UnaryCallable<GetObjectRequest, Object> get;
6470
private final RetrierWithAlg retrier;
6571
private final SettableApiFuture<BidiWriteObjectResponse> resultFuture;
6672
private final ChunkSegmenter chunkSegmenter;
@@ -87,16 +93,17 @@ final class GapicBidiUnbufferedAppendableWritableByteChannel
8793

8894
GapicBidiUnbufferedAppendableWritableByteChannel(
8995
BidiStreamingCallable<BidiWriteObjectRequest, BidiWriteObjectResponse> write,
96+
UnaryCallable<GetObjectRequest, Object> get,
9097
RetrierWithAlg retrier,
9198
SettableApiFuture<BidiWriteObjectResponse> resultFuture,
9299
ChunkSegmenter chunkSegmenter,
93100
BidiWriteCtx<BidiAppendableWrite> writeCtx,
94101
Supplier<GrpcCallContext> baseContextSupplier) {
95102
this.write = write;
103+
this.get = get;
96104
this.retrier = retrier;
97105
this.resultFuture = resultFuture;
98106
this.chunkSegmenter = chunkSegmenter;
99-
100107
this.writeCtx = writeCtx;
101108
this.responseObserver = new RedirectHandlingResponseObserver(new BidiObserver());
102109
this.baseContextSupplier = baseContextSupplier;
@@ -163,16 +170,24 @@ void restart() {
163170
if (!resultFuture.isDone()) {
164171
ApiStreamObserver<BidiWriteObjectRequest> requestStream1 =
165172
openedStream(reconnectArguments.getCtx());
166-
requestStream1.onNext(req);
167-
lastWrittenRequest = req;
168-
responseObserver.await();
169-
first = false;
173+
if (req != null) {
174+
requestStream1.onNext(req);
175+
lastWrittenRequest = req;
176+
responseObserver.await();
177+
first = false;
178+
} else {
179+
// This means we did a metadata lookup and determined that GCS never received the initial
180+
// WriteObjectSpec,
181+
// So we can just start over and send it again
182+
first = true;
183+
}
170184
}
171185
}
172186

173187
public void startAppendableTakeoverStream() {
174188
BidiWriteObjectRequest req =
175189
writeCtx.newRequestBuilder().setFlush(true).setStateLookup(true).build();
190+
generation.set(req.getAppendObjectSpec().getGeneration());
176191
this.messages = Collections.singletonList(req);
177192
flush();
178193
first = false;
@@ -259,9 +274,9 @@ private BidiWriteObjectRequest finishMessage() {
259274
Crc32cLengthKnown crc32cValue = writeCtx.getCumulativeCrc32c().get();
260275

261276
BidiWriteObjectRequest.Builder b = writeCtx.newRequestBuilder();
262-
if (!first) {
263-
b.clearUploadId().clearObjectChecksums().clearWriteObjectSpec().clearAppendObjectSpec();
264-
}
277+
278+
b.clearUploadId().clearObjectChecksums().clearWriteObjectSpec().clearAppendObjectSpec();
279+
265280
b.setFinishWrite(true).setWriteOffset(offset);
266281
if (crc32cValue != null) {
267282
b.setObjectChecksums(ObjectChecksums.newBuilder().setCrc32C(crc32cValue.getValue()).build());
@@ -299,6 +314,7 @@ private void flush() {
299314
try {
300315
ApiStreamObserver<BidiWriteObjectRequest> opened = openedStream(context);
301316
for (BidiWriteObjectRequest message : this.messages) {
317+
302318
opened.onNext(message);
303319
lastWrittenRequest = message;
304320
}
@@ -509,6 +525,7 @@ public void onCompleted() {
509525

510526
private void ok(BidiWriteObjectResponse value) {
511527
last = value;
528+
first = false;
512529
sem.release();
513530
}
514531

@@ -685,6 +702,42 @@ ReconnectArguments getReconnectArguments() {
685702
long generation = this.generation.get();
686703
if (generation > 0) {
687704
spec.setGeneration(generation);
705+
} else {
706+
GetObjectRequest req =
707+
GetObjectRequest.newBuilder()
708+
.setBucket(spec.getBucket())
709+
.setObject(spec.getObject())
710+
.setReadMask(
711+
FieldMask.newBuilder()
712+
.addPaths(Storage.BlobField.GENERATION.getGrpcName())
713+
.build())
714+
.build();
715+
boolean objectNotFound = false;
716+
try {
717+
retrier.run(
718+
() -> {
719+
this.generation.set(get.call(req).getGeneration());
720+
return null;
721+
},
722+
Decoder.identity());
723+
} catch (Throwable t) {
724+
if (t.getCause() instanceof NotFoundException) {
725+
objectNotFound = true;
726+
} else {
727+
t.addSuppressed(new AsyncStorageTaskException());
728+
throw t;
729+
}
730+
}
731+
generation = this.generation.get();
732+
if (generation > 0) {
733+
spec.setGeneration(generation);
734+
} else if (objectNotFound) {
735+
// If the object wasn't found, that means GCS never saw the initial WriteObjectSpec, which
736+
// means we'll need
737+
// to send it again. We can process this retry by just starting over again
738+
return ReconnectArguments.of(
739+
baseContextSupplier.get().withExtraHeaders(getHeaders()), null);
740+
}
688741
}
689742

690743
BidiWriteHandle bidiWriteHandle = this.bidiWriteHandle.get();

google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiWritableByteChannelSessionBuilder.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,14 @@
2121
import com.google.api.core.ApiFuture;
2222
import com.google.api.core.SettableApiFuture;
2323
import com.google.api.gax.rpc.BidiStreamingCallable;
24+
import com.google.api.gax.rpc.UnaryCallable;
2425
import com.google.cloud.storage.ChannelSession.BufferedWriteSession;
2526
import com.google.cloud.storage.Retrying.RetrierWithAlg;
2627
import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel;
2728
import com.google.storage.v2.BidiWriteObjectRequest;
2829
import com.google.storage.v2.BidiWriteObjectResponse;
30+
import com.google.storage.v2.GetObjectRequest;
31+
import com.google.storage.v2.Object;
2932
import com.google.storage.v2.ServiceConstants.Values;
3033
import java.nio.ByteBuffer;
3134
import java.util.function.BiFunction;
@@ -192,10 +195,9 @@ BufferedAppendableUploadBuilder buffered(BufferHandle bufferHandle) {
192195
}
193196

194197
final class BufferedAppendableUploadBuilder {
195-
196198
private final BufferHandle bufferHandle;
197-
198199
private ApiFuture<BidiAppendableWrite> start;
200+
private UnaryCallable<GetObjectRequest, Object> get;
199201

200202
BufferedAppendableUploadBuilder(BufferHandle bufferHandle) {
201203
this.bufferHandle = bufferHandle;
@@ -210,6 +212,12 @@ BufferedAppendableUploadBuilder setStartAsync(ApiFuture<BidiAppendableWrite> sta
210212
return this;
211213
}
212214

215+
public BufferedAppendableUploadBuilder setGetCallable(
216+
UnaryCallable<GetObjectRequest, Object> get) {
217+
this.get = get;
218+
return this;
219+
}
220+
213221
BufferedWritableByteChannelSession<BidiWriteObjectResponse> build() {
214222
// it is theoretically possible that the setter methods for the following variables could
215223
// be called again between when this method is invoked and the resulting function is
@@ -220,6 +228,8 @@ BufferedWritableByteChannelSession<BidiWriteObjectResponse> build() {
220228
ByteStringStrategy boundStrategy = byteStringStrategy;
221229
Hasher boundHasher = hasher;
222230
RetrierWithAlg boundRetrier = retrier;
231+
UnaryCallable<GetObjectRequest, Object> boundGet =
232+
requireNonNull(get, "get must be non null");
223233
return new BufferedWriteSession<>(
224234
requireNonNull(start, "start must be non null"),
225235
((BiFunction<
@@ -229,6 +239,7 @@ BufferedWritableByteChannelSession<BidiWriteObjectResponse> build() {
229239
(start, resultFuture) ->
230240
new GapicBidiUnbufferedAppendableWritableByteChannel(
231241
write,
242+
boundGet,
232243
boundRetrier,
233244
resultFuture,
234245
new ChunkSegmenter(

google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1472,6 +1472,7 @@ public boolean shouldRetry(
14721472
}))
14731473
.buffered(BufferHandle.allocate(bufferSize))
14741474
.setStartAsync(startAppendableWrite)
1475+
.setGetCallable(storageClient.getObjectCallable())
14751476
.build();
14761477
DefaultBlobWriteSessionConfig.DecoratedWritableByteChannelSession<
14771478
BufferedWritableByteChannel, BidiWriteObjectResponse>

0 commit comments

Comments
 (0)