@@ -51,6 +51,15 @@ class BlobWriteChannel extends BaseWriteChannel<StorageOptions, BlobInfo> {
5151 // Contains metadata of the updated object or null if upload is not completed.
5252 private StorageObject storageObject ;
5353
54+ // Detect if flushBuffer() is being retried or not.
55+ // TODO: I don't think this is thread safe, and there's probably a better way to detect a retry
56+ // occuring.
57+ private boolean retrying = false ;
58+
59+ boolean isRetrying () {
60+ return retrying ;
61+ }
62+
5463 StorageObject getStorageObject () {
5564 return storageObject ;
5665 }
@@ -63,11 +72,105 @@ protected void flushBuffer(final int length, final boolean last) {
6372 new Runnable () {
6473 @ Override
6574 public void run () {
66- storageObject =
67- getOptions ()
68- .getStorageRpcV1 ()
69- .writeWithResponse (
70- getUploadId (), getBuffer (), 0 , getPosition (), length , last );
75+ if (!isRetrying ()) {
76+ // Enable isRetrying state to reduce number of calls to getCurrentUploadOffset()
77+ retrying = true ;
78+ storageObject =
79+ getOptions ()
80+ .getStorageRpcV1 ()
81+ .writeWithResponse (
82+ getUploadId (), getBuffer (), 0 , getPosition (), length , last );
83+ } else {
84+ // Retriable interruption occurred.
85+ // Variables:
86+ // chunk = getBuffer()
87+ // localNextByteOffset == getPosition()
88+ // chunkSize = getChunkSize()
89+ //
90+ // Case 1: localNextByteOffset == 0 && remoteNextByteOffset == 0:
91+ // we are retrying from first chunk start from 0 offset.
92+ //
93+ // Case 2: localNextByteOffset == remoteNextByteOffset:
94+ // Special case of Case 1 when a chunk is retried.
95+ //
96+ // Case 3: localNextByteOffset < remoteNextByteOffset
97+ // && driftOffset < chunkSize:
98+ // Upload progressed and localNextByteOffset is not in-sync with
99+ // remoteNextByteOffset and driftOffset is less than chunkSize.
100+ // driftOffset must be less than chunkSize for it to retry using
101+ // chunk maintained in memory.
102+ // Find the driftOffset by subtracting localNextByteOffset from
103+ // remoteNextByteOffset.
104+ // Use driftOffset to determine where to restart from using the chunk in
105+ // memory.
106+ //
107+ // Case 4: localNextByteOffset < remoteNextByteOffset
108+ // && driftOffset == chunkSize:
109+ // Special case of Case 3.
110+ // If chunkSize is equal to driftOffset then remoteNextByteOffset has moved on
111+ // to the next chunk.
112+ //
113+ // Case 5: localNextByteOffset < remoteNextByteOffset
114+ // && driftOffset > chunkSize:
115+ // Throw exception as remoteNextByteOffset has drifted beyond the retriable
116+ // chunk maintained in memory. This is not possible unless there's multiple
117+ // clients uploading to the same resumable upload session.
118+ //
119+ // Case 6: localNextByteOffset > remoteNextByteOffset:
120+ // For completeness, this case is not possible because it would require retrying
121+ // a 400 status code which is not allowed.
122+ //
123+ // Get remote offset from API
124+ long remoteNextByteOffset =
125+ getOptions ().getStorageRpcV1 ().getCurrentUploadOffset (getUploadId ());
126+ long localNextByteOffset = getPosition ();
127+ int driftOffset = (int ) (remoteNextByteOffset - localNextByteOffset );
128+ int retryChunkLength = length - driftOffset ;
129+
130+ if (localNextByteOffset == 0 && remoteNextByteOffset == 0
131+ || localNextByteOffset == remoteNextByteOffset ) {
132+ // Case 1 and 2
133+ storageObject =
134+ getOptions ()
135+ .getStorageRpcV1 ()
136+ .writeWithResponse (
137+ getUploadId (), getBuffer (), 0 , getPosition (), length , last );
138+ } else if (localNextByteOffset < remoteNextByteOffset
139+ && driftOffset < getChunkSize ()) {
140+ // Case 3
141+ storageObject =
142+ getOptions ()
143+ .getStorageRpcV1 ()
144+ .writeWithResponse (
145+ getUploadId (),
146+ getBuffer (),
147+ driftOffset ,
148+ remoteNextByteOffset ,
149+ retryChunkLength ,
150+ last );
151+ } else if (localNextByteOffset < remoteNextByteOffset
152+ && driftOffset == getChunkSize ()) {
153+ // Case 4
154+ // Continue to next chunk
155+ retrying = false ;
156+ return ;
157+ } else {
158+ // Case 5
159+ StringBuilder sb = new StringBuilder ();
160+ sb .append (
161+ "Remote offset has progressed beyond starting byte offset of next chunk." );
162+ sb .append (
163+ "This may be a symptom of multiple clients uploading to the same upload session.\n \n " );
164+ sb .append ("For debugging purposes:\n " );
165+ sb .append ("uploadId: " ).append (getUploadId ()).append ('\n' );
166+ sb .append ("localNextByteOffset: " ).append (localNextByteOffset ).append ('\n' );
167+ sb .append ("remoteNextByteOffset: " ).append (remoteNextByteOffset ).append ('\n' );
168+ sb .append ("driftOffset: " ).append (driftOffset ).append ("\n \n " );
169+ throw new StorageException (0 , sb .toString ());
170+ }
171+ }
172+ // Request was successful and retrying state is now disabled.
173+ retrying = false ;
71174 }
72175 }),
73176 getOptions ().getRetrySettings (),
0 commit comments