diff --git a/.github/readme/synth.metadata/synth.metadata b/.github/readme/synth.metadata/synth.metadata
index a6979675dc..35c50eb639 100644
--- a/.github/readme/synth.metadata/synth.metadata
+++ b/.github/readme/synth.metadata/synth.metadata
@@ -4,14 +4,14 @@
"git": {
"name": ".",
"remote": "https://github.com/googleapis/java-storage.git",
- "sha": "ab3723dfa1674249f51f77f56b70fee57a7fd756"
+ "sha": "4c1a739e7314eddc73cfb9df004e65a10a614086"
}
},
{
"git": {
"name": "synthtool",
"remote": "https://github.com/googleapis/synthtool.git",
- "sha": "4dca4132c6d63788c6675e1b1e11e7b9225f8694"
+ "sha": "0199c79b8324fba66476300824aa931788c47e2d"
}
}
]
diff --git a/CHANGELOG.md b/CHANGELOG.md
index bc777d7656..50923c74b3 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,20 @@
# Changelog
+### [1.113.12](https://www.github.com/googleapis/java-storage/compare/v1.113.11...v1.113.12) (2021-02-26)
+
+
+### Bug Fixes
+
+* retrying get remote offset and recover from last chunk failures. ([#726](https://www.github.com/googleapis/java-storage/issues/726)) ([b41b881](https://www.github.com/googleapis/java-storage/commit/b41b88109e13b5ebbd0393d1f264225c12876be6))
+
+
+### Dependencies
+
+* update dependency com.google.api-client:google-api-client to v1.31.2 ([#686](https://www.github.com/googleapis/java-storage/issues/686)) ([6b1f036](https://www.github.com/googleapis/java-storage/commit/6b1f0361376167719ec5456181134136d27d1d3c))
+* update dependency com.google.cloud:google-cloud-shared-dependencies to v0.20.0 ([#732](https://www.github.com/googleapis/java-storage/issues/732)) ([c98413d](https://www.github.com/googleapis/java-storage/commit/c98413df9d9514340aed78b5a4d5e596760bb616))
+* update kms.version to v0.87.7 ([#724](https://www.github.com/googleapis/java-storage/issues/724)) ([3229bd8](https://www.github.com/googleapis/java-storage/commit/3229bd860f3a4d700a969aa9e922bbf6b5c1ca10))
+* update kms.version to v0.87.8 ([#733](https://www.github.com/googleapis/java-storage/issues/733)) ([a21b75f](https://www.github.com/googleapis/java-storage/commit/a21b75fa846f373970298dd98f8f3520fc2b3c97))
+
### [1.113.11](https://www.github.com/googleapis/java-storage/compare/v1.113.10...v1.113.11) (2021-02-19)
diff --git a/README.md b/README.md
index 83f3028d3f..983257327f 100644
--- a/README.md
+++ b/README.md
@@ -17,7 +17,7 @@ If you are using Maven with [BOM][libraries-bom], add this to your pom.xml file
com.google.cloud
libraries-bom
- 16.4.0
+ 18.0.0
pom
import
@@ -38,25 +38,25 @@ If you are using Maven without BOM, add this to your dependencies:
com.google.cloud
google-cloud-storage
- 1.113.10
+ 1.113.11
```
If you are using Gradle 5.x or later, add this to your dependencies
```Groovy
-implementation platform('com.google.cloud:libraries-bom:16.4.0')
+implementation platform('com.google.cloud:libraries-bom:18.0.0')
compile 'com.google.cloud:google-cloud-storage'
```
If you are using Gradle without BOM, add this to your dependencies
```Groovy
-compile 'com.google.cloud:google-cloud-storage:1.113.10'
+compile 'com.google.cloud:google-cloud-storage:1.113.11'
```
If you are using SBT, add this to your dependencies
```Scala
-libraryDependencies += "com.google.cloud" % "google-cloud-storage" % "1.113.10"
+libraryDependencies += "com.google.cloud" % "google-cloud-storage" % "1.113.11"
```
## Authentication
diff --git a/google-cloud-storage/pom.xml b/google-cloud-storage/pom.xml
index 1f5c5ee1ed..0785817227 100644
--- a/google-cloud-storage/pom.xml
+++ b/google-cloud-storage/pom.xml
@@ -2,7 +2,7 @@
4.0.0
google-cloud-storage
- 1.113.11
+ 1.113.12
jar
Google Cloud Storage
https://github.com/googleapis/java-storage
@@ -12,11 +12,11 @@
com.google.cloud
google-cloud-storage-parent
- 1.113.11
+ 1.113.12
google-cloud-storage
- 0.87.6
+ 0.87.8
@@ -34,7 +34,7 @@
com.google.api-client
google-api-client
- 1.31.1
+ 1.31.2
com.google.apis
diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java
index ca87933c0f..aa5c3a8118 100644
--- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java
+++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java
@@ -55,6 +55,7 @@ class BlobWriteChannel extends BaseWriteChannel {
// TODO: I don't think this is thread safe, and there's probably a better way to detect a retry
// occuring.
private boolean retrying = false;
+ private boolean checkingForLastChunk = false;
boolean isRetrying() {
return retrying;
@@ -64,129 +65,141 @@ StorageObject getStorageObject() {
return storageObject;
}
+ private StorageObject transmitChunk(
+ int chunkOffset, int chunkLength, long position, boolean last) {
+ return getOptions()
+ .getStorageRpcV1()
+ .writeWithResponse(getUploadId(), getBuffer(), chunkOffset, position, chunkLength, last);
+ }
+
+ private long getRemotePosition() {
+ return getOptions().getStorageRpcV1().getCurrentUploadOffset(getUploadId());
+ }
+
+ private StorageObject getRemoteStorageObject() {
+ return getOptions().getStorageRpcV1().get(getEntity().toPb(), null);
+ }
+
+ private StorageException unrecoverableState(
+ int chunkOffset, int chunkLength, long localPosition, long remotePosition, boolean last) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Unable to recover in upload.\n");
+ sb.append(
+ "This may be a symptom of multiple clients uploading to the same upload session.\n\n");
+ sb.append("For debugging purposes:\n");
+ sb.append("uploadId: ").append(getUploadId()).append('\n');
+ sb.append("chunkOffset: ").append(chunkOffset).append('\n');
+ sb.append("chunkLength: ").append(chunkLength).append('\n');
+ sb.append("localOffset: ").append(localPosition).append('\n');
+ sb.append("remoteOffset: ").append(remotePosition).append('\n');
+ sb.append("lastChunk: ").append(last).append("\n\n");
+ return new StorageException(0, sb.toString());
+ }
+
+ // Retriable interruption occurred.
+ // Variables:
+ // chunk = getBuffer()
+ // localNextByteOffset == getPosition()
+ // chunkSize = getChunkSize()
+ //
+ // Case 1: localNextByteOffset == remoteNextByteOffset:
+ // Retrying the entire chunk
+ //
+ // Case 2: localNextByteOffset < remoteNextByteOffset
+ // && driftOffset < chunkSize:
+ // Upload progressed and localNextByteOffset is not in-sync with
+ // remoteNextByteOffset and driftOffset is less than chunkSize.
+ // driftOffset must be less than chunkSize for it to retry using
+ // chunk maintained in memory.
+ // Find the driftOffset by subtracting localNextByteOffset from
+ // remoteNextByteOffset.
+ // Use driftOffset to determine where to restart from using the chunk in
+ // memory.
+ //
+ // Case 3: localNextByteOffset < remoteNextByteOffset
+ // && driftOffset == chunkSize:
+ // Special case of Case 2.
+ // If chunkSize is equal to driftOffset then remoteNextByteOffset has moved on
+ // to the next chunk.
+ //
+ // Case 4: localNextByteOffset < remoteNextByteOffset
+ // && driftOffset > chunkSize:
+ // Throw exception as remoteNextByteOffset has drifted beyond the retriable
+ // chunk maintained in memory. This is not possible unless there's multiple
+ // clients uploading to the same resumable upload session.
+ //
+ // Case 5: localNextByteOffset > remoteNextByteOffset:
+ // For completeness, this case is not possible because it would require retrying
+ // a 400 status code which is not allowed.
+ //
+ // Case 6: remoteNextByteOffset==-1 && last == true
+ // Upload is complete and retry occurred in the "last" chunk. Data sent was
+ // received by the service.
+ //
+ // Case 7: remoteNextByteOffset==-1 && last == false && !checkingForLastChunk
+ // Not last chunk and are not checkingForLastChunk, allow for the client to
+ // catch up to final chunk which meets
+ // Case 6.
+ //
+ // Case 8: remoteNextByteOffset==-1 && last == false && checkingForLastChunk
+ // Not last chunk and checkingForLastChunk means this is the second time we
+ // hit this case, meaning the upload was completed by a different client.
+ //
+ // Case 9: Only possible if the client local offset continues beyond the remote
+ // offset which is not possible.
+ //
@Override
- protected void flushBuffer(final int length, final boolean last) {
+ protected void flushBuffer(final int length, final boolean lastChunk) {
try {
runWithRetries(
callable(
new Runnable() {
@Override
public void run() {
+ // Get remote offset from API
+ final long localPosition = getPosition();
+ // For each request it should be possible to retry from its location in this code
+ final long remotePosition = isRetrying() ? getRemotePosition() : getPosition();
+ final int chunkOffset = (int) (remotePosition - localPosition);
+ final int chunkLength = length - chunkOffset;
+ final boolean uploadAlreadyComplete = remotePosition == -1;
+ // Enable isRetrying state to reduce number of calls to getRemotePosition()
if (!isRetrying()) {
- // Enable isRetrying state to reduce number of calls to getCurrentUploadOffset()
retrying = true;
+ }
+ if (uploadAlreadyComplete && lastChunk) {
+ // Case 6
+ // Request object metadata if not available
+ if (storageObject == null) {
+ storageObject = getRemoteStorageObject();
+ }
+ // Verify that with the final chunk we match the blob length
+ if (storageObject.getSize().longValue() != getPosition() + length) {
+ throw unrecoverableState(
+ chunkOffset, chunkLength, localPosition, remotePosition, lastChunk);
+ }
+ retrying = false;
+ } else if (uploadAlreadyComplete && !lastChunk && !checkingForLastChunk) {
+ // Case 7
+ // Make sure this is the second to last chunk.
+ checkingForLastChunk = true;
+ // Continue onto next chunk in case this is the last chunk
+ } else if (localPosition <= remotePosition && chunkOffset < getChunkSize()) {
+ // Case 1 && Case 2
+ // We are in a position to send a chunk
storageObject =
- getOptions()
- .getStorageRpcV1()
- .writeWithResponse(
- getUploadId(), getBuffer(), 0, getPosition(), length, last);
+ transmitChunk(chunkOffset, chunkLength, remotePosition, lastChunk);
+ retrying = false;
+ } else if (localPosition < remotePosition && chunkOffset == getChunkSize()) {
+ // Case 3
+ // Continue to next chunk to catch up with remotePosition we are one chunk
+ // behind
+ retrying = false;
} else {
- // Retriable interruption occurred.
- // Variables:
- // chunk = getBuffer()
- // localNextByteOffset == getPosition()
- // chunkSize = getChunkSize()
- //
- // Case 1: localNextByteOffset == 0 && remoteNextByteOffset == 0:
- // we are retrying from first chunk start from 0 offset.
- //
- // Case 2: localNextByteOffset == remoteNextByteOffset:
- // Special case of Case 1 when a chunk is retried.
- //
- // Case 3: localNextByteOffset < remoteNextByteOffset
- // && driftOffset < chunkSize:
- // Upload progressed and localNextByteOffset is not in-sync with
- // remoteNextByteOffset and driftOffset is less than chunkSize.
- // driftOffset must be less than chunkSize for it to retry using
- // chunk maintained in memory.
- // Find the driftOffset by subtracting localNextByteOffset from
- // remoteNextByteOffset.
- // Use driftOffset to determine where to restart from using the chunk in
- // memory.
- //
- // Case 4: localNextByteOffset < remoteNextByteOffset
- // && driftOffset == chunkSize:
- // Special case of Case 3.
- // If chunkSize is equal to driftOffset then remoteNextByteOffset has moved on
- // to the next chunk.
- //
- // Case 5: localNextByteOffset < remoteNextByteOffset
- // && driftOffset > chunkSize:
- // Throw exception as remoteNextByteOffset has drifted beyond the retriable
- // chunk maintained in memory. This is not possible unless there's multiple
- // clients uploading to the same resumable upload session.
- //
- // Case 6: localNextByteOffset > remoteNextByteOffset:
- // For completeness, this case is not possible because it would require retrying
- // a 400 status code which is not allowed.
- //
- // Case 7: remoteNextByteOffset==-1 && last == true
- // Upload is complete and retry occurred in the "last" chunk. Data sent was
- // received by the service.
- //
- // Case 8: remoteNextByteOffset==-1 && last == false
- // Upload was completed by another client because this retry did not occur
- // during the last chunk.
- //
- // Get remote offset from API
- long remoteNextByteOffset =
- getOptions().getStorageRpcV1().getCurrentUploadOffset(getUploadId());
- long localNextByteOffset = getPosition();
- int driftOffset = (int) (remoteNextByteOffset - localNextByteOffset);
- int retryChunkLength = length - driftOffset;
-
- if (localNextByteOffset == 0 && remoteNextByteOffset == 0
- || localNextByteOffset == remoteNextByteOffset) {
- // Case 1 and 2
- storageObject =
- getOptions()
- .getStorageRpcV1()
- .writeWithResponse(
- getUploadId(), getBuffer(), 0, getPosition(), length, last);
- } else if (localNextByteOffset < remoteNextByteOffset
- && driftOffset < getChunkSize()) {
- // Case 3
- storageObject =
- getOptions()
- .getStorageRpcV1()
- .writeWithResponse(
- getUploadId(),
- getBuffer(),
- driftOffset,
- remoteNextByteOffset,
- retryChunkLength,
- last);
- } else if (localNextByteOffset < remoteNextByteOffset
- && driftOffset == getChunkSize()) {
- // Case 4
- // Continue to next chunk
- retrying = false;
- return;
- } else if (localNextByteOffset < remoteNextByteOffset
- && driftOffset > getChunkSize()) {
- // Case 5
- StringBuilder sb = new StringBuilder();
- sb.append(
- "Remote offset has progressed beyond starting byte offset of next chunk.");
- sb.append(
- "This may be a symptom of multiple clients uploading to the same upload session.\n\n");
- sb.append("For debugging purposes:\n");
- sb.append("uploadId: ").append(getUploadId()).append('\n');
- sb.append("localNextByteOffset: ").append(localNextByteOffset).append('\n');
- sb.append("remoteNextByteOffset: ").append(remoteNextByteOffset).append('\n');
- sb.append("driftOffset: ").append(driftOffset).append("\n\n");
- throw new StorageException(0, sb.toString());
- } else if (remoteNextByteOffset == -1 && last) {
- // Case 7
- retrying = false;
- return;
- } else if (remoteNextByteOffset == -1 && !last) {
- // Case 8
- throw new StorageException(0, "Resumable upload is already complete.");
- }
+ // Case 4 && Case 8 && Case 9
+ throw unrecoverableState(
+ chunkOffset, chunkLength, localPosition, remotePosition, lastChunk);
}
- // Request was successful and retrying state is now disabled.
- retrying = false;
}
}),
getOptions().getRetrySettings(),
diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java
index 74977dc6ec..6df86cb6ad 100644
--- a/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java
+++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java
@@ -32,6 +32,7 @@
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.http.HttpResponse;
import com.google.api.client.http.HttpResponseException;
+import com.google.api.client.http.HttpStatusCodes;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.http.InputStreamContent;
import com.google.api.client.http.json.JsonHttpContent;
@@ -765,7 +766,8 @@ public long getCurrentUploadOffset(String uploadId) {
try {
response = httpRequest.execute();
int code = response.getStatusCode();
- if (code == 201 || code == 200) {
+ if (HttpStatusCodes.isSuccess(code)) {
+ // Upload completed successfully
return -1;
}
StringBuilder sb = new StringBuilder();
@@ -774,20 +776,18 @@ public long getCurrentUploadOffset(String uploadId) {
throw new StorageException(0, sb.toString());
} catch (HttpResponseException ex) {
int code = ex.getStatusCode();
- if (code == 308 && ex.getHeaders().getRange() == null) {
- // No progress has been made.
- return 0;
- } else if (code == 308 && ex.getHeaders().getRange() != null) {
+ if (code == 308) {
+ if (ex.getHeaders().getRange() == null) {
+ // No progress has been made.
+ return 0;
+ }
// API returns last byte received offset
String range = ex.getHeaders().getRange();
// Return next byte offset by adding 1 to last byte received offset
return Long.parseLong(range.substring(range.indexOf("-") + 1)) + 1;
} else {
- // Not certain what went wrong
- StringBuilder sb = new StringBuilder();
- sb.append("Not sure what occurred. Here's debugging information:\n");
- sb.append("Response:\n").append(ex.toString()).append("\n\n");
- throw new StorageException(0, sb.toString());
+ // Something else occurred like a 5xx so translate and throw.
+ throw translate(ex);
}
} finally {
if (response != null) {
diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobWriteChannelTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobWriteChannelTest.java
index eefee77290..ba60a3c4d6 100644
--- a/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobWriteChannelTest.java
+++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobWriteChannelTest.java
@@ -41,6 +41,7 @@
import com.google.cloud.storage.spi.v1.StorageRpc;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
+import java.math.BigInteger;
import java.net.MalformedURLException;
import java.net.SocketException;
import java.net.URL;
@@ -69,7 +70,8 @@ public class BlobWriteChannelTest {
private static final Random RANDOM = new Random();
private static final String SIGNED_URL =
"http://www.test.com/test-bucket/test1.txt?GoogleAccessId=testClient-test@test.com&Expires=1553839761&Signature=MJUBXAZ7";
-
+ private static final StorageException socketClosedException =
+ new StorageException(new SocketException("Socket closed"));
private StorageOptions options;
private StorageRpcFactory rpcFactoryMock;
private StorageRpc storageRpcMock;
@@ -104,8 +106,8 @@ public void testCreate() {
@Test
public void testCreateRetryableError() {
- StorageException exception = new StorageException(new SocketException("Socket closed"));
- expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andThrow(exception);
+ expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS))
+ .andThrow(socketClosedException);
expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);
replay(storageRpcMock);
writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS);
@@ -136,7 +138,6 @@ public void testWriteWithoutFlush() throws IOException {
@Test
public void testWriteWithFlushRetryChunk() throws IOException {
- StorageException exception = new StorageException(new SocketException("Socket closed"));
ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE);
Capture capturedBuffer = Capture.newInstance();
expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);
@@ -148,7 +149,7 @@ public void testWriteWithFlushRetryChunk() throws IOException {
eq(0L),
eq(MIN_CHUNK_SIZE),
eq(false)))
- .andThrow(exception);
+ .andThrow(socketClosedException);
expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(0L);
expect(
storageRpcMock.writeWithResponse(
@@ -169,8 +170,45 @@ public void testWriteWithFlushRetryChunk() throws IOException {
}
@Test
- public void testWriteWithFlushRetryChunkWithDrift() throws IOException {
- StorageException exception = new StorageException(new SocketException("Socket closed"));
+ public void testWriteWithRetryFullChunk() throws IOException {
+ ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE);
+ Capture capturedBuffer = Capture.newInstance();
+ expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);
+ expect(
+ storageRpcMock.writeWithResponse(
+ eq(UPLOAD_ID), (byte[]) anyObject(), eq(0), eq(0L), eq(MIN_CHUNK_SIZE), eq(false)))
+ .andThrow(socketClosedException);
+ expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(0L);
+ expect(
+ storageRpcMock.writeWithResponse(
+ eq(UPLOAD_ID),
+ capture(capturedBuffer),
+ eq(0),
+ eq(0L),
+ eq(MIN_CHUNK_SIZE),
+ eq(false)))
+ .andReturn(null);
+ expect(
+ storageRpcMock.writeWithResponse(
+ eq(UPLOAD_ID),
+ (byte[]) anyObject(),
+ eq(0),
+ eq((long) MIN_CHUNK_SIZE),
+ eq(0),
+ eq(true)))
+ .andReturn(BLOB_INFO.toPb());
+ replay(storageRpcMock);
+ writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS);
+ writer.setChunkSize(MIN_CHUNK_SIZE);
+ assertEquals(MIN_CHUNK_SIZE, writer.write(buffer));
+ writer.close();
+ assertFalse(writer.isOpen());
+ assertNotNull(writer.getStorageObject());
+ assertArrayEquals(buffer.array(), capturedBuffer.getValue());
+ }
+
+ @Test
+ public void testWriteWithRemoteProgressMade() throws IOException {
ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE);
Capture capturedBuffer = Capture.newInstance();
expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);
@@ -182,7 +220,8 @@ public void testWriteWithFlushRetryChunkWithDrift() throws IOException {
eq(0L),
eq(MIN_CHUNK_SIZE),
eq(false)))
- .andThrow(exception);
+ .andThrow(socketClosedException);
+ // Simulate GCS received 10 bytes but not the rest of the chunk
expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(10L);
expect(
storageRpcMock.writeWithResponse(
@@ -203,8 +242,7 @@ public void testWriteWithFlushRetryChunkWithDrift() throws IOException {
}
@Test
- public void testWriteWithLastFlushRetryChunkButCompleted() throws IOException {
- StorageException exception = new StorageException(new SocketException("Socket closed"));
+ public void testWriteWithDriftRetryCase4() throws IOException {
ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE);
Capture capturedBuffer = Capture.newInstance();
expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);
@@ -215,25 +253,102 @@ public void testWriteWithLastFlushRetryChunkButCompleted() throws IOException {
eq(0),
eq(0L),
eq(MIN_CHUNK_SIZE),
- eq(true)))
- .andThrow(exception);
+ eq(false)))
+ .andThrow(socketClosedException);
+ expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn((long) MIN_CHUNK_SIZE);
+ expect(
+ storageRpcMock.writeWithResponse(
+ eq(UPLOAD_ID),
+ capture(capturedBuffer),
+ eq(0),
+ eq((long) MIN_CHUNK_SIZE),
+ eq(MIN_CHUNK_SIZE),
+ eq(false)))
+ .andReturn(null);
+ replay(storageRpcMock);
+ writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS);
+ writer.setChunkSize(MIN_CHUNK_SIZE);
+ assertEquals(MIN_CHUNK_SIZE, writer.write(buffer));
+ assertArrayEquals(buffer.array(), capturedBuffer.getValue());
+ capturedBuffer.reset();
+ buffer.rewind();
+ assertEquals(MIN_CHUNK_SIZE, writer.write(buffer));
+ assertArrayEquals(buffer.array(), capturedBuffer.getValue());
+ assertTrue(writer.isOpen());
+ assertNull(writer.getStorageObject());
+ }
+
+ @Test
+ public void testWriteWithUnreachableRemoteOffset() throws IOException {
+ ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE);
+ Capture capturedBuffer = Capture.newInstance();
+ expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);
+ expect(
+ storageRpcMock.writeWithResponse(
+ eq(UPLOAD_ID),
+ capture(capturedBuffer),
+ eq(0),
+ eq(0L),
+ eq(MIN_CHUNK_SIZE),
+ eq(false)))
+ .andThrow(socketClosedException);
+ expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(MIN_CHUNK_SIZE + 10L);
+ replay(storageRpcMock);
+ writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS);
+ writer.setChunkSize(MIN_CHUNK_SIZE);
+ try {
+ writer.write(buffer);
+ fail("Expected StorageException");
+ } catch (StorageException storageException) {
+ // expected storageException
+ }
+ assertTrue(writer.isOpen());
+ assertNull(writer.getStorageObject());
+ assertArrayEquals(buffer.array(), capturedBuffer.getValue());
+ }
+
+ @Test
+ public void testWriteWithRetryAndObjectMetadata() throws IOException {
+ ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE);
+ Capture capturedBuffer = Capture.newInstance();
+ expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);
+ expect(
+ storageRpcMock.writeWithResponse(
+ eq(UPLOAD_ID),
+ capture(capturedBuffer),
+ eq(0),
+ eq(0L),
+ eq(MIN_CHUNK_SIZE),
+ eq(false)))
+ .andThrow(socketClosedException);
+ expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(10L);
+ expect(
+ storageRpcMock.writeWithResponse(
+ eq(UPLOAD_ID),
+ capture(capturedBuffer),
+ eq(10),
+ eq(10L),
+ eq(MIN_CHUNK_SIZE - 10),
+ eq(false)))
+ .andThrow(socketClosedException);
+ expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(-1L);
+ expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(-1L);
+ expect(storageRpcMock.get(BLOB_INFO.toPb(), null)).andThrow(socketClosedException);
expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(-1L);
+ expect(storageRpcMock.get(BLOB_INFO.toPb(), null))
+ .andReturn(BLOB_INFO.toPb().setSize(BigInteger.valueOf(MIN_CHUNK_SIZE)));
replay(storageRpcMock);
writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS);
+ writer.setChunkSize(MIN_CHUNK_SIZE);
assertEquals(MIN_CHUNK_SIZE, writer.write(buffer));
writer.close();
- assertFalse(writer.isRetrying());
assertFalse(writer.isOpen());
- // Capture captures entire buffer of a chunk even when not completely used.
- // Making assert selective up to the size of MIN_CHUNK_SIZE
- assertArrayEquals(Arrays.copyOf(capturedBuffer.getValue(), MIN_CHUNK_SIZE), buffer.array());
+ assertNotNull(writer.getStorageObject());
+ assertArrayEquals(buffer.array(), capturedBuffer.getValue());
}
@Test
- public void testWriteWithFlushRetryChunkButCompletedByAnotherClient() throws IOException {
- StorageException exception = new StorageException(new SocketException("Socket closed"));
- StorageException completedException =
- new StorageException(0, "Resumable upload is already complete.");
+ public void testWriteWithUploadCompletedByAnotherClient() throws IOException {
ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE);
Capture capturedBuffer = Capture.newInstance();
expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);
@@ -245,21 +360,145 @@ public void testWriteWithFlushRetryChunkButCompletedByAnotherClient() throws IOE
eq(0L),
eq(MIN_CHUNK_SIZE),
eq(false)))
- .andThrow(exception);
+ .andReturn(null);
+ expect(
+ storageRpcMock.writeWithResponse(
+ eq(UPLOAD_ID),
+ capture(capturedBuffer),
+ eq(0),
+ eq((long) MIN_CHUNK_SIZE),
+ eq(MIN_CHUNK_SIZE),
+ eq(false)))
+ .andThrow(socketClosedException);
+ expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(-1L);
expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(-1L);
replay(storageRpcMock);
writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS);
writer.setChunkSize(MIN_CHUNK_SIZE);
try {
writer.write(buffer);
+ buffer.rewind();
+ writer.write(buffer);
+ buffer.rewind();
+ writer.write(buffer);
+ fail("Expected completed exception.");
+ } catch (StorageException ex) {
+
+ }
+ assertTrue(writer.isOpen());
+ }
+
+ @Test
+ public void testWriteWithLocalOffsetGoingBeyondRemoteOffset() throws IOException {
+ ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE);
+ Capture capturedBuffer = Capture.newInstance();
+ expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);
+ expect(
+ storageRpcMock.writeWithResponse(
+ eq(UPLOAD_ID),
+ capture(capturedBuffer),
+ eq(0),
+ eq(0L),
+ eq(MIN_CHUNK_SIZE),
+ eq(false)))
+ .andReturn(null);
+ expect(
+ storageRpcMock.writeWithResponse(
+ eq(UPLOAD_ID),
+ capture(capturedBuffer),
+ eq(0),
+ eq((long) MIN_CHUNK_SIZE),
+ eq(MIN_CHUNK_SIZE),
+ eq(false)))
+ .andThrow(socketClosedException);
+ expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(0L);
+ replay(storageRpcMock);
+ writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS);
+ writer.setChunkSize(MIN_CHUNK_SIZE);
+ try {
+ writer.write(buffer);
+ buffer.rewind();
+ writer.write(buffer);
+ writer.close();
fail("Expected completed exception.");
} catch (StorageException ex) {
- assertEquals(ex, completedException);
}
- assertTrue(writer.isRetrying());
assertTrue(writer.isOpen());
}
+ @Test
+ public void testGetCurrentUploadOffset() throws IOException {
+ ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE);
+ Capture capturedBuffer = Capture.newInstance();
+ expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);
+ expect(
+ storageRpcMock.writeWithResponse(
+ eq(UPLOAD_ID),
+ capture(capturedBuffer),
+ eq(0),
+ eq(0L),
+ eq(MIN_CHUNK_SIZE),
+ eq(false)))
+ .andThrow(socketClosedException);
+ expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andThrow(socketClosedException);
+ expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(0L);
+ expect(
+ storageRpcMock.writeWithResponse(
+ eq(UPLOAD_ID),
+ capture(capturedBuffer),
+ eq(0),
+ eq(0L),
+ eq(MIN_CHUNK_SIZE),
+ eq(false)))
+ .andReturn(null);
+ expect(
+ storageRpcMock.writeWithResponse(
+ eq(UPLOAD_ID),
+ (byte[]) anyObject(),
+ eq(0),
+ eq((long) MIN_CHUNK_SIZE),
+ eq(0),
+ eq(true)))
+ .andReturn(BLOB_INFO.toPb());
+ replay(storageRpcMock);
+ writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS);
+ writer.setChunkSize(MIN_CHUNK_SIZE);
+ assertEquals(MIN_CHUNK_SIZE, writer.write(buffer));
+ writer.close();
+ assertFalse(writer.isOpen());
+ assertNotNull(writer.getStorageObject());
+ assertArrayEquals(buffer.array(), capturedBuffer.getValue());
+ }
+
+ @Test
+ public void testWriteWithLastFlushRetryChunkButCompleted() throws IOException {
+ ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE);
+ Capture capturedBuffer = Capture.newInstance();
+ expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);
+ expect(
+ storageRpcMock.writeWithResponse(
+ eq(UPLOAD_ID),
+ capture(capturedBuffer),
+ eq(0),
+ eq(0L),
+ eq(MIN_CHUNK_SIZE),
+ eq(true)))
+ .andThrow(socketClosedException);
+ expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(-1L);
+ expect(storageRpcMock.get(BLOB_INFO.toPb(), null))
+ .andReturn(BLOB_INFO.toPb().setSize(BigInteger.valueOf(MIN_CHUNK_SIZE)));
+ replay(storageRpcMock);
+ writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS);
+ assertEquals(MIN_CHUNK_SIZE, writer.write(buffer));
+ writer.close();
+ assertFalse(writer.isRetrying());
+ assertFalse(writer.isOpen());
+ assertNotNull(writer.getStorageObject());
+ // Capture captures entire buffer of a chunk even when not completely used.
+ // Making assert selective up to the size of MIN_CHUNK_SIZE
+ assertArrayEquals(Arrays.copyOf(capturedBuffer.getValue(), MIN_CHUNK_SIZE), buffer.array());
+ }
+
@Test
public void testWriteWithFlush() throws IOException {
expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);
diff --git a/pom.xml b/pom.xml
index d0a073ce44..a13bd87af2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,7 +4,7 @@
com.google.cloud
google-cloud-storage-parent
pom
- 1.113.11
+ 1.113.12
Storage Parent
https://github.com/googleapis/java-storage
@@ -14,7 +14,7 @@
com.google.cloud
google-cloud-shared-config
- 0.10.0
+ 0.11.0
@@ -63,7 +63,7 @@
UTF-8
github
google-cloud-storage-parent
- 0.19.0
+ 0.20.0
diff --git a/samples/install-without-bom/pom.xml b/samples/install-without-bom/pom.xml
index 6e2f3c7b8a..02f4f5e9df 100644
--- a/samples/install-without-bom/pom.xml
+++ b/samples/install-without-bom/pom.xml
@@ -29,7 +29,7 @@
com.google.cloud
google-cloud-storage
- 1.113.10
+ 1.113.11
diff --git a/samples/snapshot/pom.xml b/samples/snapshot/pom.xml
index 749bc35b98..76a8165225 100644
--- a/samples/snapshot/pom.xml
+++ b/samples/snapshot/pom.xml
@@ -28,7 +28,7 @@
com.google.cloud
google-cloud-storage
- 1.113.10
+ 1.113.11
diff --git a/samples/snippets/pom.xml b/samples/snippets/pom.xml
index f67951b57f..fb546e66f2 100644
--- a/samples/snippets/pom.xml
+++ b/samples/snippets/pom.xml
@@ -30,7 +30,7 @@
com.google.cloud
libraries-bom
- 16.4.0
+ 18.0.0
pom
import
diff --git a/versions.txt b/versions.txt
index a1e2f178f8..6d1e0399fc 100644
--- a/versions.txt
+++ b/versions.txt
@@ -1,4 +1,4 @@
# Format:
# module:released-version:current-version
-google-cloud-storage:1.113.11:1.113.11
\ No newline at end of file
+google-cloud-storage:1.113.12:1.113.12
\ No newline at end of file