2222import com .google .api .gax .rpc .ApiStreamObserver ;
2323import com .google .api .gax .rpc .BidiStreamingCallable ;
2424import com .google .api .gax .rpc .ErrorDetails ;
25+ import com .google .api .gax .rpc .NotFoundException ;
2526import com .google .api .gax .rpc .OutOfRangeException ;
27+ import com .google .api .gax .rpc .UnaryCallable ;
2628import com .google .cloud .storage .ChunkSegmenter .ChunkSegment ;
2729import com .google .cloud .storage .Conversions .Decoder ;
2830import com .google .cloud .storage .Crc32cValue .Crc32cLengthKnown ;
3335import com .google .common .collect .ImmutableList ;
3436import com .google .common .collect .ImmutableMap ;
3537import com .google .protobuf .ByteString ;
38+ import com .google .protobuf .FieldMask ;
3639import com .google .storage .v2 .AppendObjectSpec ;
3740import com .google .storage .v2 .BidiWriteHandle ;
3841import com .google .storage .v2 .BidiWriteObjectRedirectedError ;
3942import com .google .storage .v2 .BidiWriteObjectRequest ;
4043import com .google .storage .v2 .BidiWriteObjectResponse ;
4144import com .google .storage .v2 .ChecksummedData ;
45+ import com .google .storage .v2 .GetObjectRequest ;
46+ import com .google .storage .v2 .Object ;
4247import com .google .storage .v2 .ObjectChecksums ;
4348import java .io .IOException ;
4449import java .nio .ByteBuffer ;
6166final class GapicBidiUnbufferedAppendableWritableByteChannel
6267 implements UnbufferedWritableByteChannel {
6368 private final BidiStreamingCallable <BidiWriteObjectRequest , BidiWriteObjectResponse > write ;
69+ private final UnaryCallable <GetObjectRequest , Object > get ;
6470 private final RetrierWithAlg retrier ;
6571 private final SettableApiFuture <BidiWriteObjectResponse > resultFuture ;
6672 private final ChunkSegmenter chunkSegmenter ;
@@ -87,16 +93,17 @@ final class GapicBidiUnbufferedAppendableWritableByteChannel
8793
8894 GapicBidiUnbufferedAppendableWritableByteChannel (
8995 BidiStreamingCallable <BidiWriteObjectRequest , BidiWriteObjectResponse > write ,
96+ UnaryCallable <GetObjectRequest , Object > get ,
9097 RetrierWithAlg retrier ,
9198 SettableApiFuture <BidiWriteObjectResponse > resultFuture ,
9299 ChunkSegmenter chunkSegmenter ,
93100 BidiWriteCtx <BidiAppendableWrite > writeCtx ,
94101 Supplier <GrpcCallContext > baseContextSupplier ) {
95102 this .write = write ;
103+ this .get = get ;
96104 this .retrier = retrier ;
97105 this .resultFuture = resultFuture ;
98106 this .chunkSegmenter = chunkSegmenter ;
99-
100107 this .writeCtx = writeCtx ;
101108 this .responseObserver = new RedirectHandlingResponseObserver (new BidiObserver ());
102109 this .baseContextSupplier = baseContextSupplier ;
@@ -163,16 +170,24 @@ void restart() {
163170 if (!resultFuture .isDone ()) {
164171 ApiStreamObserver <BidiWriteObjectRequest > requestStream1 =
165172 openedStream (reconnectArguments .getCtx ());
166- requestStream1 .onNext (req );
167- lastWrittenRequest = req ;
168- responseObserver .await ();
169- first = false ;
173+ if (req != null ) {
174+ requestStream1 .onNext (req );
175+ lastWrittenRequest = req ;
176+ responseObserver .await ();
177+ first = false ;
178+ } else {
179+ // This means we did a metadata lookup and determined that GCS never received the initial
180+ // WriteObjectSpec,
181+ // So we can just start over and send it again
182+ first = true ;
183+ }
170184 }
171185 }
172186
173187 public void startAppendableTakeoverStream () {
174188 BidiWriteObjectRequest req =
175189 writeCtx .newRequestBuilder ().setFlush (true ).setStateLookup (true ).build ();
190+ generation .set (req .getAppendObjectSpec ().getGeneration ());
176191 this .messages = Collections .singletonList (req );
177192 flush ();
178193 first = false ;
@@ -259,9 +274,9 @@ private BidiWriteObjectRequest finishMessage() {
259274 Crc32cLengthKnown crc32cValue = writeCtx .getCumulativeCrc32c ().get ();
260275
261276 BidiWriteObjectRequest .Builder b = writeCtx .newRequestBuilder ();
262- if (! first ) {
263- b .clearUploadId ().clearObjectChecksums ().clearWriteObjectSpec ().clearAppendObjectSpec ();
264- }
277+
278+ b .clearUploadId ().clearObjectChecksums ().clearWriteObjectSpec ().clearAppendObjectSpec ();
279+
265280 b .setFinishWrite (true ).setWriteOffset (offset );
266281 if (crc32cValue != null ) {
267282 b .setObjectChecksums (ObjectChecksums .newBuilder ().setCrc32C (crc32cValue .getValue ()).build ());
@@ -299,6 +314,7 @@ private void flush() {
299314 try {
300315 ApiStreamObserver <BidiWriteObjectRequest > opened = openedStream (context );
301316 for (BidiWriteObjectRequest message : this .messages ) {
317+
302318 opened .onNext (message );
303319 lastWrittenRequest = message ;
304320 }
@@ -509,6 +525,7 @@ public void onCompleted() {
509525
510526 private void ok (BidiWriteObjectResponse value ) {
511527 last = value ;
528+ first = false ;
512529 sem .release ();
513530 }
514531
@@ -685,6 +702,42 @@ ReconnectArguments getReconnectArguments() {
685702 long generation = this .generation .get ();
686703 if (generation > 0 ) {
687704 spec .setGeneration (generation );
705+ } else {
706+ GetObjectRequest req =
707+ GetObjectRequest .newBuilder ()
708+ .setBucket (spec .getBucket ())
709+ .setObject (spec .getObject ())
710+ .setReadMask (
711+ FieldMask .newBuilder ()
712+ .addPaths (Storage .BlobField .GENERATION .getGrpcName ())
713+ .build ())
714+ .build ();
715+ boolean objectNotFound = false ;
716+ try {
717+ retrier .run (
718+ () -> {
719+ this .generation .set (get .call (req ).getGeneration ());
720+ return null ;
721+ },
722+ Decoder .identity ());
723+ } catch (Throwable t ) {
724+ if (t .getCause () instanceof NotFoundException ) {
725+ objectNotFound = true ;
726+ } else {
727+ t .addSuppressed (new AsyncStorageTaskException ());
728+ throw t ;
729+ }
730+ }
731+ generation = this .generation .get ();
732+ if (generation > 0 ) {
733+ spec .setGeneration (generation );
734+ } else if (objectNotFound ) {
735+ // If the object wasn't found, that means GCS never saw the initial WriteObjectSpec, which
736+ // means we'll need
737+ // to send it again. We can process this retry by just starting over again
738+ return ReconnectArguments .of (
739+ baseContextSupplier .get ().withExtraHeaders (getHeaders ()), null );
740+ }
688741 }
689742
690743 BidiWriteHandle bidiWriteHandle = this .bidiWriteHandle .get ();
0 commit comments