2424import com .google .api .core .BetaApi ;
2525import com .google .api .core .InternalApi ;
2626import com .google .api .core .SettableApiFuture ;
27- import com .google .api .gax .retrying .BasicResultRetryAlgorithm ;
28- import com .google .api .gax .rpc .AbortedException ;
29- import com .google .api .gax .rpc .ApiException ;
3027import com .google .cloud .storage .BidiUploadState .AppendableUploadState ;
3128import com .google .cloud .storage .BidiUploadState .TakeoverAppendableUploadState ;
3229import com .google .cloud .storage .BlobAppendableUpload .AppendableUploadWriteableByteChannel ;
3633import com .google .cloud .storage .UnifiedOpts .ObjectTargetOpt ;
3734import com .google .cloud .storage .UnifiedOpts .Opts ;
3835import com .google .common .base .MoreObjects ;
39- import com .google .storage .v2 .BidiWriteObjectRequest ;
4036import com .google .storage .v2 .BidiWriteObjectResponse ;
41- import com .google .storage .v2 .Object ;
4237import com .google .storage .v2 .ServiceConstants .Values ;
4338import java .util .function .BiFunction ;
4439import javax .annotation .concurrent .Immutable ;
@@ -60,20 +55,17 @@ public final class BlobAppendableUploadConfig {
6055 new BlobAppendableUploadConfig (
6156 FlushPolicy .minFlushSize (_256KiB ),
6257 Hasher .enabled (),
63- CloseAction .CLOSE_WITHOUT_FINALIZING ,
64- false );
58+ CloseAction .CLOSE_WITHOUT_FINALIZING );
6559
6660 private final FlushPolicy flushPolicy ;
6761 private final Hasher hasher ;
6862 private final CloseAction closeAction ;
69- private final boolean newImpl ;
7063
7164 private BlobAppendableUploadConfig (
72- FlushPolicy flushPolicy , Hasher hasher , CloseAction closeAction , boolean newImpl ) {
65+ FlushPolicy flushPolicy , Hasher hasher , CloseAction closeAction ) {
7366 this .flushPolicy = flushPolicy ;
7467 this .hasher = hasher ;
7568 this .closeAction = closeAction ;
76- this .newImpl = newImpl ;
7769 }
7870
7971 /**
@@ -104,7 +96,7 @@ public BlobAppendableUploadConfig withFlushPolicy(FlushPolicy flushPolicy) {
10496 if (this .flushPolicy .equals (flushPolicy )) {
10597 return this ;
10698 }
107- return new BlobAppendableUploadConfig (flushPolicy , hasher , closeAction , newImpl );
99+ return new BlobAppendableUploadConfig (flushPolicy , hasher , closeAction );
108100 }
109101
110102 /**
@@ -134,7 +126,7 @@ public BlobAppendableUploadConfig withCloseAction(CloseAction closeAction) {
134126 if (this .closeAction == closeAction ) {
135127 return this ;
136128 }
137- return new BlobAppendableUploadConfig (flushPolicy , hasher , closeAction , newImpl );
129+ return new BlobAppendableUploadConfig (flushPolicy , hasher , closeAction );
138130 }
139131
140132 /**
@@ -166,7 +158,7 @@ BlobAppendableUploadConfig withCrc32cValidationEnabled(boolean enabled) {
166158 return this ;
167159 }
168160 return new BlobAppendableUploadConfig (
169- flushPolicy , enabled ? Hasher .enabled () : Hasher .noop (), closeAction , newImpl );
161+ flushPolicy , enabled ? Hasher .enabled () : Hasher .noop (), closeAction );
170162 }
171163
172164 /** Never to be made public until {@link Hasher} is public */
@@ -175,10 +167,6 @@ Hasher getHasher() {
175167 return hasher ;
176168 }
177169
178- BlobAppendableUploadConfig useNewImpl () {
179- return new BlobAppendableUploadConfig (flushPolicy , hasher , closeAction , true );
180- }
181-
182170 @ Override
183171 public String toString () {
184172 return MoreObjects .toStringHelper (this )
@@ -239,109 +227,50 @@ public enum CloseAction {
239227 }
240228
241229 BlobAppendableUpload create (GrpcStorageImpl storage , BlobInfo info , Opts <ObjectTargetOpt > opts ) {
242- if (newImpl ) {
243- // TODO: make configurable
244- int maxRedirectsAllowed = 3 ;
245-
246- long maxPendingBytes = this .getFlushPolicy ().getMaxPendingBytes ();
247- AppendableUploadState state = storage .getAppendableState (info , opts , maxPendingBytes );
248- WritableByteChannelSession <
249- AppendableObjectBufferedWritableByteChannel , BidiWriteObjectResponse >
250- build =
251- new AppendableSession (
252- ApiFutures .immediateFuture (state ),
253- (start , resultFuture ) -> {
254- BidiUploadStreamingStream stream =
255- new BidiUploadStreamingStream (
256- start ,
257- storage .storageDataClient .executor ,
258- storage .storageClient .bidiWriteObjectCallable (),
259- maxRedirectsAllowed ,
260- storage .storageDataClient .retryContextProvider .create ());
261- ChunkSegmenter chunkSegmenter =
262- new ChunkSegmenter (
263- Hasher .enabled (),
264- ByteStringStrategy .copy (),
265- Math .min (
266- Values .MAX_WRITE_CHUNK_BYTES_VALUE ,
267- Math .toIntExact (maxPendingBytes )),
268- /* blockSize= */ 1 );
269- BidiAppendableUnbufferedWritableByteChannel c ;
270- if (state instanceof TakeoverAppendableUploadState ) {
271- // start the takeover reconciliation
272- stream .awaitTakeoverStateReconciliation ();
273- c =
274- new BidiAppendableUnbufferedWritableByteChannel (
275- stream , chunkSegmenter , state .getConfirmedBytes ());
276- } else {
277- c =
278- new BidiAppendableUnbufferedWritableByteChannel (
279- stream , chunkSegmenter , 0 );
280- }
281- return new AppendableObjectBufferedWritableByteChannel (
282- flushPolicy .createBufferedChannel (c ),
283- c ,
284- this .closeAction == CloseAction .FINALIZE_WHEN_CLOSING ,
285- newImpl );
286- },
287- state .getResultFuture ());
230+ // TODO: make configurable
231+ int maxRedirectsAllowed = 3 ;
288232
289- return new BlobAppendableUploadImpl (
290- new DefaultBlobWriteSessionConfig .DecoratedWritableByteChannelSession <>(
291- build , BidiBlobWriteSessionConfig .Factory .WRITE_OBJECT_RESPONSE_BLOB_INFO_DECODER ));
292- } else {
293- boolean takeOver = info .getGeneration () != null ;
294- BidiWriteObjectRequest req =
295- takeOver
296- ? storage .getBidiWriteObjectRequestForTakeover (info , opts )
297- : storage .getBidiWriteObjectRequest (info , opts );
233+ long maxPendingBytes = this .getFlushPolicy ().getMaxPendingBytes ();
234+ AppendableUploadState state = storage .getAppendableState (info , opts , maxPendingBytes );
235+ WritableByteChannelSession <AppendableObjectBufferedWritableByteChannel , BidiWriteObjectResponse >
236+ build =
237+ new AppendableSession (
238+ ApiFutures .immediateFuture (state ),
239+ (start , resultFuture ) -> {
240+ BidiUploadStreamingStream stream =
241+ new BidiUploadStreamingStream (
242+ start ,
243+ storage .storageDataClient .executor ,
244+ storage .storageClient .bidiWriteObjectCallable (),
245+ maxRedirectsAllowed ,
246+ storage .storageDataClient .retryContextProvider .create ());
247+ ChunkSegmenter chunkSegmenter =
248+ new ChunkSegmenter (
249+ Hasher .enabled (),
250+ ByteStringStrategy .copy (),
251+ Math .min (
252+ Values .MAX_WRITE_CHUNK_BYTES_VALUE , Math .toIntExact (maxPendingBytes )),
253+ /* blockSize= */ 1 );
254+ BidiAppendableUnbufferedWritableByteChannel c ;
255+ if (state instanceof TakeoverAppendableUploadState ) {
256+ // start the takeover reconciliation
257+ stream .awaitTakeoverStateReconciliation ();
258+ c =
259+ new BidiAppendableUnbufferedWritableByteChannel (
260+ stream , chunkSegmenter , state .getConfirmedBytes ());
261+ } else {
262+ c = new BidiAppendableUnbufferedWritableByteChannel (stream , chunkSegmenter , 0 );
263+ }
264+ return new AppendableObjectBufferedWritableByteChannel (
265+ flushPolicy .createBufferedChannel (c ),
266+ c ,
267+ this .closeAction == CloseAction .FINALIZE_WHEN_CLOSING );
268+ },
269+ state .getResultFuture ());
298270
299- BidiAppendableWrite baw = new BidiAppendableWrite (req , takeOver );
300-
301- WritableByteChannelSession <
302- AppendableObjectBufferedWritableByteChannel , BidiWriteObjectResponse >
303- build =
304- ResumableMedia .gapic ()
305- .write ()
306- .bidiByteChannel (storage .storageClient .bidiWriteObjectCallable ())
307- .setHasher (this .getHasher ())
308- .setByteStringStrategy (ByteStringStrategy .copy ())
309- .appendable ()
310- .withRetryConfig (
311- storage .retrier .withAlg (
312- new BasicResultRetryAlgorithm <Object >() {
313- @ Override
314- public boolean shouldRetry (
315- Throwable previousThrowable , Object previousResponse ) {
316- // TODO: remove this later once the redirects are not handled by the
317- // retry loop
318- ApiException apiEx = null ;
319- if (previousThrowable instanceof StorageException ) {
320- StorageException se = (StorageException ) previousThrowable ;
321- Throwable cause = se .getCause ();
322- if (cause instanceof ApiException ) {
323- apiEx = (ApiException ) cause ;
324- }
325- }
326- if (apiEx instanceof AbortedException ) {
327- return true ;
328- }
329- return storage
330- .retryAlgorithmManager
331- .idempotent ()
332- .shouldRetry (previousThrowable , null );
333- }
334- }))
335- .buffered (this .getFlushPolicy ())
336- .setStartAsync (ApiFutures .immediateFuture (baw ))
337- .setGetCallable (storage .storageClient .getObjectCallable ())
338- .setFinalizeOnClose (this .closeAction == CloseAction .FINALIZE_WHEN_CLOSING )
339- .build ();
340-
341- return new BlobAppendableUploadImpl (
342- new DefaultBlobWriteSessionConfig .DecoratedWritableByteChannelSession <>(
343- build , BidiBlobWriteSessionConfig .Factory .WRITE_OBJECT_RESPONSE_BLOB_INFO_DECODER ));
344- }
271+ return new BlobAppendableUploadImpl (
272+ new DefaultBlobWriteSessionConfig .DecoratedWritableByteChannelSession <>(
273+ build , BidiBlobWriteSessionConfig .Factory .WRITE_OBJECT_RESPONSE_BLOB_INFO_DECODER ));
345274 }
346275
347276 private static final class AppendableSession
0 commit comments