diff --git a/.github/readme/synth.metadata/synth.metadata b/.github/readme/synth.metadata/synth.metadata
index f8ff395fd8..4a44631afe 100644
--- a/.github/readme/synth.metadata/synth.metadata
+++ b/.github/readme/synth.metadata/synth.metadata
@@ -4,14 +4,14 @@
"git": {
"name": ".",
"remote": "https://github.com/googleapis/java-storage.git",
- "sha": "bacc7b75898c3d4054214da069588ad4957b5d39"
+ "sha": "3ac92c97d371756d7abe0c56419092047b4a4c1b"
}
},
{
"git": {
"name": "synthtool",
"remote": "https://github.com/googleapis/synthtool.git",
- "sha": "2c8aecedd55b0480fb4e123b6e07fa5b12953862"
+ "sha": "7db8a6c5ffb12a6e4c2f799c18f00f7f3d60e279"
}
}
]
diff --git a/.github/workflows/formatting.yaml b/.github/workflows/formatting.yaml
index d4d367cfce..6844407b4d 100644
--- a/.github/workflows/formatting.yaml
+++ b/.github/workflows/formatting.yaml
@@ -18,7 +18,7 @@ jobs:
with:
java-version: 11
- run: "mvn com.coveo:fmt-maven-plugin:format"
- - uses: googleapis/code-suggester@v1.8.0
+ - uses: googleapis/code-suggester@v1
with:
command: review
pull_number: ${{ github.event.pull_request.number }}
diff --git a/.kokoro/readme.sh b/.kokoro/readme.sh
index 65bff26aba..013ce452f6 100755
--- a/.kokoro/readme.sh
+++ b/.kokoro/readme.sh
@@ -28,9 +28,18 @@ echo "https://${GITHUB_TOKEN}:@github.com" >> ~/.git-credentials
git config --global credential.helper 'store --file ~/.git-credentials'
python3.6 -m pip install git+https://github.com/googleapis/synthtool.git#egg=gcp-synthtool
+
+set +e
python3.6 -m autosynth.synth \
--repository=googleapis/java-storage \
--synth-file-name=.github/readme/synth.py \
--metadata-path=.github/readme/synth.metadata \
--pr-title="chore: regenerate README" \
- --branch-suffix="readme"
\ No newline at end of file
+ --branch-suffix="readme"
+
+# autosynth returns 28 to signal there are no changes
+RETURN_CODE=$?
+if [[ ${RETURN_CODE} -ne 0 && ${RETURN_CODE} -ne 28 ]]
+then
+ exit ${RETURN_CODE}
+fi
diff --git a/CHANGELOG.md b/CHANGELOG.md
index c7bb999d38..0ccaff6c12 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,17 @@
# Changelog
+### [1.113.4](https://www.github.com/googleapis/java-storage/compare/v1.113.3...v1.113.4) (2020-11-13)
+
+
+### Bug Fixes
+
+* retry using remote offset ([#604](https://www.github.com/googleapis/java-storage/issues/604)) ([216b52c](https://www.github.com/googleapis/java-storage/commit/216b52c54d34eaf1307788809a3512c461adf381))
+
+
+### Dependencies
+
+* update dependency com.google.cloud:google-cloud-shared-dependencies to v0.15.0 ([#610](https://www.github.com/googleapis/java-storage/issues/610)) ([ac65e5b](https://www.github.com/googleapis/java-storage/commit/ac65e5b0bd324d5726504bb3405c758675a56ddc))
+
### [1.113.3](https://www.github.com/googleapis/java-storage/compare/v1.113.2...v1.113.3) (2020-11-06)
diff --git a/README.md b/README.md
index 2f99ca640b..4b797d17f6 100644
--- a/README.md
+++ b/README.md
@@ -38,18 +38,18 @@ If you are using Maven without BOM, add this to your dependencies:
com.google.cloud
google-cloud-storage
- 1.113.2
+ 1.113.3
```
If you are using Gradle, add this to your dependencies
```Groovy
-compile 'com.google.cloud:google-cloud-storage:1.113.2'
+compile 'com.google.cloud:google-cloud-storage:1.113.3'
```
If you are using SBT, add this to your dependencies
```Scala
-libraryDependencies += "com.google.cloud" % "google-cloud-storage" % "1.113.2"
+libraryDependencies += "com.google.cloud" % "google-cloud-storage" % "1.113.3"
```
## Authentication
diff --git a/google-cloud-storage/clirr-ignored-differences.xml b/google-cloud-storage/clirr-ignored-differences.xml
index ba97a975ce..02722fac60 100644
--- a/google-cloud-storage/clirr-ignored-differences.xml
+++ b/google-cloud-storage/clirr-ignored-differences.xml
@@ -26,4 +26,9 @@
com.google.cloud.storage.BucketInfo$Builder setUpdateTime(java.lang.Long)
7013
+
+ com/google/cloud/storage/spi/v1/StorageRpc
+ long getCurrentUploadOffset(java.lang.String)
+ 7012
+
diff --git a/google-cloud-storage/pom.xml b/google-cloud-storage/pom.xml
index 279e25c441..32e8e9c8ba 100644
--- a/google-cloud-storage/pom.xml
+++ b/google-cloud-storage/pom.xml
@@ -2,7 +2,7 @@
4.0.0
google-cloud-storage
- 1.113.3
+ 1.113.4
jar
Google Cloud Storage
https://github.com/googleapis/java-storage
@@ -12,7 +12,7 @@
com.google.cloud
google-cloud-storage-parent
- 1.113.3
+ 1.113.4
google-cloud-storage
diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java
index 0c9520849b..2e9c0a3804 100644
--- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java
+++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java
@@ -51,6 +51,15 @@ class BlobWriteChannel extends BaseWriteChannel {
// Contains metadata of the updated object or null if upload is not completed.
private StorageObject storageObject;
+ // Detect if flushBuffer() is being retried or not.
+ // TODO: I don't think this is thread safe, and there's probably a better way to detect a retry
+ // occuring.
+ private boolean retrying = false;
+
+ boolean isRetrying() {
+ return retrying;
+ }
+
StorageObject getStorageObject() {
return storageObject;
}
@@ -63,11 +72,105 @@ protected void flushBuffer(final int length, final boolean last) {
new Runnable() {
@Override
public void run() {
- storageObject =
- getOptions()
- .getStorageRpcV1()
- .writeWithResponse(
- getUploadId(), getBuffer(), 0, getPosition(), length, last);
+ if (!isRetrying()) {
+ // Enable isRetrying state to reduce number of calls to getCurrentUploadOffset()
+ retrying = true;
+ storageObject =
+ getOptions()
+ .getStorageRpcV1()
+ .writeWithResponse(
+ getUploadId(), getBuffer(), 0, getPosition(), length, last);
+ } else {
+ // Retriable interruption occurred.
+ // Variables:
+ // chunk = getBuffer()
+ // localNextByteOffset == getPosition()
+ // chunkSize = getChunkSize()
+ //
+ // Case 1: localNextByteOffset == 0 && remoteNextByteOffset == 0:
+ // we are retrying from first chunk start from 0 offset.
+ //
+ // Case 2: localNextByteOffset == remoteNextByteOffset:
+ // Special case of Case 1 when a chunk is retried.
+ //
+ // Case 3: localNextByteOffset < remoteNextByteOffset
+ // && driftOffset < chunkSize:
+ // Upload progressed and localNextByteOffset is not in-sync with
+ // remoteNextByteOffset and driftOffset is less than chunkSize.
+ // driftOffset must be less than chunkSize for it to retry using
+ // chunk maintained in memory.
+ // Find the driftOffset by subtracting localNextByteOffset from
+ // remoteNextByteOffset.
+ // Use driftOffset to determine where to restart from using the chunk in
+ // memory.
+ //
+ // Case 4: localNextByteOffset < remoteNextByteOffset
+ // && driftOffset == chunkSize:
+ // Special case of Case 3.
+ // If chunkSize is equal to driftOffset then remoteNextByteOffset has moved on
+ // to the next chunk.
+ //
+ // Case 5: localNextByteOffset < remoteNextByteOffset
+ // && driftOffset > chunkSize:
+ // Throw exception as remoteNextByteOffset has drifted beyond the retriable
+ // chunk maintained in memory. This is not possible unless there's multiple
+ // clients uploading to the same resumable upload session.
+ //
+ // Case 6: localNextByteOffset > remoteNextByteOffset:
+ // For completeness, this case is not possible because it would require retrying
+ // a 400 status code which is not allowed.
+ //
+ // Get remote offset from API
+ long remoteNextByteOffset =
+ getOptions().getStorageRpcV1().getCurrentUploadOffset(getUploadId());
+ long localNextByteOffset = getPosition();
+ int driftOffset = (int) (remoteNextByteOffset - localNextByteOffset);
+ int retryChunkLength = length - driftOffset;
+
+ if (localNextByteOffset == 0 && remoteNextByteOffset == 0
+ || localNextByteOffset == remoteNextByteOffset) {
+ // Case 1 and 2
+ storageObject =
+ getOptions()
+ .getStorageRpcV1()
+ .writeWithResponse(
+ getUploadId(), getBuffer(), 0, getPosition(), length, last);
+ } else if (localNextByteOffset < remoteNextByteOffset
+ && driftOffset < getChunkSize()) {
+ // Case 3
+ storageObject =
+ getOptions()
+ .getStorageRpcV1()
+ .writeWithResponse(
+ getUploadId(),
+ getBuffer(),
+ driftOffset,
+ remoteNextByteOffset,
+ retryChunkLength,
+ last);
+ } else if (localNextByteOffset < remoteNextByteOffset
+ && driftOffset == getChunkSize()) {
+ // Case 4
+ // Continue to next chunk
+ retrying = false;
+ return;
+ } else {
+ // Case 5
+ StringBuilder sb = new StringBuilder();
+ sb.append(
+ "Remote offset has progressed beyond starting byte offset of next chunk.");
+ sb.append(
+ "This may be a symptom of multiple clients uploading to the same upload session.\n\n");
+ sb.append("For debugging purposes:\n");
+ sb.append("uploadId: ").append(getUploadId()).append('\n');
+ sb.append("localNextByteOffset: ").append(localNextByteOffset).append('\n');
+ sb.append("remoteNextByteOffset: ").append(remoteNextByteOffset).append('\n');
+ sb.append("driftOffset: ").append(driftOffset).append("\n\n");
+ throw new StorageException(0, sb.toString());
+ }
+ }
+ // Request was successful and retrying state is now disabled.
+ retrying = false;
}
}),
getOptions().getRetrySettings(),
diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java
index 0960f91ffb..faba82b1de 100644
--- a/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java
+++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java
@@ -747,6 +747,51 @@ public void write(
writeWithResponse(uploadId, toWrite, toWriteOffset, destOffset, length, last);
}
+ @Override
+ public long getCurrentUploadOffset(String uploadId) {
+ try {
+ GenericUrl url = new GenericUrl(uploadId);
+ HttpRequest httpRequest = storage.getRequestFactory().buildPutRequest(url, null);
+
+ httpRequest.getHeaders().setContentRange("bytes */*");
+ // Turn off automatic redirects.
+ // HTTP 308 are returned if upload is incomplete.
+ // See: https://cloud.google.com/storage/docs/performing-resumable-uploads
+ httpRequest.setFollowRedirects(false);
+
+ HttpResponse response = null;
+ try {
+ response = httpRequest.execute();
+ int code = response.getStatusCode();
+ String message = response.getStatusMessage();
+ if (code == 201 || code == 200) {
+ throw new StorageException(0, "Resumable upload is already complete.");
+ }
+ StringBuilder sb = new StringBuilder();
+ sb.append("Not sure what occurred. Here's debugging information:\n");
+ sb.append("Response:\n").append(response.toString()).append("\n\n");
+ throw new StorageException(0, sb.toString());
+ } catch (HttpResponseException ex) {
+ int code = ex.getStatusCode();
+ if (code == 308 && ex.getHeaders().getRange() == null) {
+ // No progress has been made.
+ return 0;
+ } else {
+ // API returns last byte received offset
+ String range = ex.getHeaders().getRange();
+ // Return next byte offset by adding 1 to last byte received offset
+ return Long.parseLong(range.substring(range.indexOf("-") + 1)) + 1;
+ }
+ } finally {
+ if (response != null) {
+ response.disconnect();
+ }
+ }
+ } catch (IOException ex) {
+ throw translate(ex);
+ }
+ }
+
@Override
public StorageObject writeWithResponse(
String uploadId,
diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/StorageRpc.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/StorageRpc.java
index 2088c15bec..cc4dd5740c 100644
--- a/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/StorageRpc.java
+++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/StorageRpc.java
@@ -328,6 +328,15 @@ void write(
int length,
boolean last);
+ /**
+ * Requests current byte offset from Cloud Storage API. Used to recover from a failure in some
+ * bytes were committed successfully to the open resumable session.
+ *
+ * @param uploadId resumable upload ID URL
+ * @throws StorageException upon failure
+ */
+ long getCurrentUploadOffset(String uploadId);
+
/**
* Writes the provided bytes to a storage object at the provided location. If {@code last=true}
* returns metadata of the updated object, otherwise returns null.
diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/testing/StorageRpcTestBase.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/testing/StorageRpcTestBase.java
index 7733f13eb6..b4479682a5 100644
--- a/google-cloud-storage/src/main/java/com/google/cloud/storage/testing/StorageRpcTestBase.java
+++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/testing/StorageRpcTestBase.java
@@ -139,6 +139,11 @@ public void write(
throw new UnsupportedOperationException("Not implemented yet");
}
+ @Override
+ public long getCurrentUploadOffset(String uploadId) {
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
+
@Override
public StorageObject writeWithResponse(
String uploadId,
diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobWriteChannelTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobWriteChannelTest.java
index a18345be89..b6d7bbf3a3 100644
--- a/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobWriteChannelTest.java
+++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobWriteChannelTest.java
@@ -134,6 +134,74 @@ public void testWriteWithoutFlush() throws IOException {
assertEquals(MIN_CHUNK_SIZE, writer.write(ByteBuffer.allocate(MIN_CHUNK_SIZE)));
}
+ @Test
+ public void testWriteWithFlushRetryChunk() throws IOException {
+ StorageException exception = new StorageException(new SocketException("Socket closed"));
+ ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE);
+ Capture capturedBuffer = Capture.newInstance();
+ expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);
+ expect(
+ storageRpcMock.writeWithResponse(
+ eq(UPLOAD_ID),
+ capture(capturedBuffer),
+ eq(0),
+ eq(0L),
+ eq(MIN_CHUNK_SIZE),
+ eq(false)))
+ .andThrow(exception);
+ expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(0L);
+ expect(
+ storageRpcMock.writeWithResponse(
+ eq(UPLOAD_ID),
+ capture(capturedBuffer),
+ eq(0),
+ eq(0L),
+ eq(MIN_CHUNK_SIZE),
+ eq(false)))
+ .andReturn(null);
+ replay(storageRpcMock);
+ writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS);
+ writer.setChunkSize(MIN_CHUNK_SIZE);
+ assertEquals(MIN_CHUNK_SIZE, writer.write(buffer));
+ assertTrue(writer.isOpen());
+ assertNull(writer.getStorageObject());
+ assertArrayEquals(buffer.array(), capturedBuffer.getValue());
+ }
+
+ @Test
+ public void testWriteWithFlushRetryChunkWithDrift() throws IOException {
+ StorageException exception = new StorageException(new SocketException("Socket closed"));
+ ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE);
+ Capture capturedBuffer = Capture.newInstance();
+ expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);
+ expect(
+ storageRpcMock.writeWithResponse(
+ eq(UPLOAD_ID),
+ capture(capturedBuffer),
+ eq(0),
+ eq(0L),
+ eq(MIN_CHUNK_SIZE),
+ eq(false)))
+ .andThrow(exception);
+ expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(10L);
+ expect(
+ storageRpcMock.writeWithResponse(
+ eq(UPLOAD_ID),
+ capture(capturedBuffer),
+ eq(10),
+ eq(10L),
+ eq(MIN_CHUNK_SIZE - 10),
+ eq(false)))
+ .andReturn(null);
+ replay(storageRpcMock);
+ writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS);
+ writer.setChunkSize(MIN_CHUNK_SIZE);
+ assertEquals(MIN_CHUNK_SIZE, writer.write(buffer));
+ assertTrue(writer.isOpen());
+ assertNull(writer.getStorageObject());
+ assertArrayEquals(buffer.array(), capturedBuffer.getValue());
+ }
+
@Test
public void testWriteWithFlush() throws IOException {
expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);
diff --git a/pom.xml b/pom.xml
index 6bd003cf86..b52e86548b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,7 +4,7 @@
com.google.cloud
google-cloud-storage-parent
pom
- 1.113.3
+ 1.113.4
Storage Parent
https://github.com/googleapis/java-storage
@@ -63,7 +63,7 @@
UTF-8
github
google-cloud-storage-parent
- 0.14.1
+ 0.15.0
diff --git a/samples/install-without-bom/pom.xml b/samples/install-without-bom/pom.xml
index fe8f70f1a8..99bf72f8e9 100644
--- a/samples/install-without-bom/pom.xml
+++ b/samples/install-without-bom/pom.xml
@@ -29,7 +29,7 @@
com.google.cloud
google-cloud-storage
- 1.113.2
+ 1.113.3
diff --git a/samples/snapshot/pom.xml b/samples/snapshot/pom.xml
index 81d033eaf2..6a6fdaebfb 100644
--- a/samples/snapshot/pom.xml
+++ b/samples/snapshot/pom.xml
@@ -28,7 +28,7 @@
com.google.cloud
google-cloud-storage
- 1.113.2
+ 1.113.3
diff --git a/samples/snippets/pom.xml b/samples/snippets/pom.xml
index 9c0f70a262..7910681e22 100644
--- a/samples/snippets/pom.xml
+++ b/samples/snippets/pom.xml
@@ -30,7 +30,7 @@
com.google.cloud
libraries-bom
- 15.0.0
+ 15.1.0
pom
import
diff --git a/synth.metadata b/synth.metadata
index 5e7ee883cf..4acdcb8395 100644
--- a/synth.metadata
+++ b/synth.metadata
@@ -4,14 +4,14 @@
"git": {
"name": ".",
"remote": "https://github.com/googleapis/java-storage.git",
- "sha": "b39a67551866fb8e09755e4918f30f92754ab297"
+ "sha": "ac65e5b0bd324d5726504bb3405c758675a56ddc"
}
},
{
"git": {
"name": "synthtool",
"remote": "https://github.com/googleapis/synthtool.git",
- "sha": "c7824ea48ff6d4d42dfae0849aec8a85acd90bd9"
+ "sha": "7d652819519dfa24da9e14548232e4aaba71a11c"
}
}
],
diff --git a/versions.txt b/versions.txt
index 11ef215259..697d9c4621 100644
--- a/versions.txt
+++ b/versions.txt
@@ -1,4 +1,4 @@
# Format:
# module:released-version:current-version
-google-cloud-storage:1.113.3:1.113.3
\ No newline at end of file
+google-cloud-storage:1.113.4:1.113.4
\ No newline at end of file