diff --git a/.github/workflows/hermetic_library_generation.yaml b/.github/workflows/hermetic_library_generation.yaml
index 519c2f8bfa..33eb75a8a0 100644
--- a/.github/workflows/hermetic_library_generation.yaml
+++ b/.github/workflows/hermetic_library_generation.yaml
@@ -38,12 +38,12 @@ jobs:
else
echo "SHOULD_RUN=true" >> $GITHUB_ENV
fi
- - uses: actions/checkout@v4
+ - uses: actions/checkout@v5
if: env.SHOULD_RUN == 'true'
with:
fetch-depth: 0
token: ${{ secrets.CLOUD_JAVA_BOT_TOKEN }}
- - uses: googleapis/sdk-platform-java/.github/scripts@v2.61.0
+ - uses: googleapis/sdk-platform-java/.github/scripts@v2.62.0
if: env.SHOULD_RUN == 'true'
with:
base_ref: ${{ github.base_ref }}
diff --git a/.github/workflows/unmanaged_dependency_check.yaml b/.github/workflows/unmanaged_dependency_check.yaml
index d7ae36c028..c9d370b95d 100644
--- a/.github/workflows/unmanaged_dependency_check.yaml
+++ b/.github/workflows/unmanaged_dependency_check.yaml
@@ -5,7 +5,7 @@ jobs:
unmanaged_dependency_check:
runs-on: ubuntu-latest
steps:
- - uses: actions/checkout@v3
+ - uses: actions/checkout@v5
- uses: actions/setup-java@v3
with:
distribution: temurin
@@ -17,6 +17,6 @@ jobs:
# repository
.kokoro/build.sh
- name: Unmanaged dependency check
- uses: googleapis/sdk-platform-java/java-shared-dependencies/unmanaged-dependency-check@google-cloud-shared-dependencies/v3.51.0
+ uses: googleapis/sdk-platform-java/java-shared-dependencies/unmanaged-dependency-check@google-cloud-shared-dependencies/v3.52.0
with:
bom-path: google-cloud-storage-bom/pom.xml
diff --git a/.kokoro/presubmit/graalvm-native-a.cfg b/.kokoro/presubmit/graalvm-native-a.cfg
index 783727ef01..5816d61073 100644
--- a/.kokoro/presubmit/graalvm-native-a.cfg
+++ b/.kokoro/presubmit/graalvm-native-a.cfg
@@ -3,7 +3,7 @@
# Configure the docker image for kokoro-trampoline.
env_vars: {
key: "TRAMPOLINE_IMAGE"
- value: "gcr.io/cloud-devrel-public-resources/graalvm_sdk_platform_a:3.51.0" # {x-version-update:google-cloud-shared-dependencies:current}
+ value: "gcr.io/cloud-devrel-public-resources/graalvm_sdk_platform_a:3.52.0" # {x-version-update:google-cloud-shared-dependencies:current}
}
env_vars: {
diff --git a/.kokoro/presubmit/graalvm-native-b.cfg b/.kokoro/presubmit/graalvm-native-b.cfg
index 83c7afee07..7986fd6731 100644
--- a/.kokoro/presubmit/graalvm-native-b.cfg
+++ b/.kokoro/presubmit/graalvm-native-b.cfg
@@ -3,7 +3,7 @@
# Configure the docker image for kokoro-trampoline.
env_vars: {
key: "TRAMPOLINE_IMAGE"
- value: "gcr.io/cloud-devrel-public-resources/graalvm_sdk_platform_b:3.51.0" # {x-version-update:google-cloud-shared-dependencies:current}
+ value: "gcr.io/cloud-devrel-public-resources/graalvm_sdk_platform_b:3.52.0" # {x-version-update:google-cloud-shared-dependencies:current}
}
env_vars: {
diff --git a/.kokoro/presubmit/graalvm-native-c.cfg b/.kokoro/presubmit/graalvm-native-c.cfg
index 3a9bbf8c3a..acecfce1bf 100644
--- a/.kokoro/presubmit/graalvm-native-c.cfg
+++ b/.kokoro/presubmit/graalvm-native-c.cfg
@@ -3,7 +3,7 @@
# Configure the docker image for kokoro-trampoline.
env_vars: {
key: "TRAMPOLINE_IMAGE"
- value: "gcr.io/cloud-devrel-public-resources/graalvm_sdk_platform_c:3.51.0" # {x-version-update:google-cloud-shared-dependencies:current}
+ value: "gcr.io/cloud-devrel-public-resources/graalvm_sdk_platform_c:3.52.0" # {x-version-update:google-cloud-shared-dependencies:current}
}
env_vars: {
diff --git a/CHANGELOG.md b/CHANGELOG.md
index b51e08214f..6cb0a9ae77 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,29 @@
# Changelog
+## [2.56.0](https://github.com/googleapis/java-storage/compare/v2.55.0...v2.56.0) (2025-08-25)
+
+
+### Features
+
+* *breaking behavior* rewrite Storage.blobAppendableUpload to be non-blocking and have improved throughput ([#3231](https://github.com/googleapis/java-storage/issues/3231)) ([7bd73d3](https://github.com/googleapis/java-storage/commit/7bd73d3104f5c47299f5a9c8d68dec82933eeda5))
+* Add AppendableUploadWriteableByteChannel#flush() ([#3261](https://github.com/googleapis/java-storage/issues/3261)) ([950c56f](https://github.com/googleapis/java-storage/commit/950c56f0e622d75faff51257d5cbc9f3ddc7e1ce))
+* Add MinFlushSizeFlushPolicy#withMaxPendingBytes(long) ([#3231](https://github.com/googleapis/java-storage/issues/3231)) ([7bd73d3](https://github.com/googleapis/java-storage/commit/7bd73d3104f5c47299f5a9c8d68dec82933eeda5))
+* Add StorageChannelUtils to provide helper methods to perform blocking read/write to/from non-blocking channels ([#3231](https://github.com/googleapis/java-storage/issues/3231)) ([7bd73d3](https://github.com/googleapis/java-storage/commit/7bd73d3104f5c47299f5a9c8d68dec82933eeda5))
+
+
+### Bug Fixes
+
+* Make FlushPolicy${Min,Max}FlushSizeFlushPolicy constructors private ([#3217](https://github.com/googleapis/java-storage/issues/3217)) ([7bd73d3](https://github.com/googleapis/java-storage/commit/7bd73d3104f5c47299f5a9c8d68dec82933eeda5))
+* Update BlobAppendableUploadConfig and FlushPolicy.MinFlushSizeFlushPolicy to default to 4MiB minFlushSize and 16MiB maxPendingBytes ([#3249](https://github.com/googleapis/java-storage/issues/3249)) ([7bd73d3](https://github.com/googleapis/java-storage/commit/7bd73d3104f5c47299f5a9c8d68dec82933eeda5))
+* Update otel integration to properly activate span context for lazy RPCs such as reads & writes ([#3255](https://github.com/googleapis/java-storage/issues/3255)) ([d6587f4](https://github.com/googleapis/java-storage/commit/d6587f42b65a586a2e3f30e0559975801726a812))
+
+
+### Dependencies
+
+* Update actions/checkout action to v5 ([#3239](https://github.com/googleapis/java-storage/issues/3239)) ([33f024b](https://github.com/googleapis/java-storage/commit/33f024b1ae094bf3e3605e1a835cb55eb5c9e750))
+* Update dependency com.google.apis:google-api-services-storage to v1-rev20250815-2.0.0 ([#3245](https://github.com/googleapis/java-storage/issues/3245)) ([87afe1a](https://github.com/googleapis/java-storage/commit/87afe1ac5f500053e4c0639d5b824304d03796f4))
+* Update dependency com.google.cloud:sdk-platform-java-config to v3.52.0 ([#3250](https://github.com/googleapis/java-storage/issues/3250)) ([0782e62](https://github.com/googleapis/java-storage/commit/0782e62fc9534e3cecfaaa4d78b58904ecf699d6))
+
## [2.55.0](https://github.com/googleapis/java-storage/compare/v2.54.0...v2.55.0) (2025-08-05)
diff --git a/README.md b/README.md
index 05340f4a40..045ffb52a6 100644
--- a/README.md
+++ b/README.md
@@ -19,7 +19,7 @@ If you are using Maven with [BOM][libraries-bom], add this to your pom.xml file:
com.google.cloudlibraries-bom
- 26.65.0
+ 26.66.0pomimport
@@ -46,12 +46,12 @@ If you are using Maven without the BOM, add this to your dependencies:
com.google.cloudgoogle-cloud-storage
- 2.54.0
+ 2.55.0com.google.cloudgoogle-cloud-storage-control
- 2.54.0
+ 2.55.0
```
@@ -59,20 +59,20 @@ If you are using Maven without the BOM, add this to your dependencies:
If you are using Gradle 5.x or later, add this to your dependencies:
```Groovy
-implementation platform('com.google.cloud:libraries-bom:26.65.0')
+implementation platform('com.google.cloud:libraries-bom:26.66.0')
implementation 'com.google.cloud:google-cloud-storage'
```
If you are using Gradle without BOM, add this to your dependencies:
```Groovy
-implementation 'com.google.cloud:google-cloud-storage:2.55.0'
+implementation 'com.google.cloud:google-cloud-storage:2.56.0'
```
If you are using SBT, add this to your dependencies:
```Scala
-libraryDependencies += "com.google.cloud" % "google-cloud-storage" % "2.55.0"
+libraryDependencies += "com.google.cloud" % "google-cloud-storage" % "2.56.0"
```
## Authentication
@@ -523,7 +523,7 @@ Java is a registered trademark of Oracle and/or its affiliates.
[kokoro-badge-link-5]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-storage/java11.html
[stability-image]: https://img.shields.io/badge/stability-stable-green
[maven-version-image]: https://img.shields.io/maven-central/v/com.google.cloud/google-cloud-storage.svg
-[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-storage/2.55.0
+[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-storage/2.56.0
[authentication]: https://github.com/googleapis/google-cloud-java#authentication
[auth-scopes]: https://developers.google.com/identity/protocols/oauth2/scopes
[predefined-iam-roles]: https://cloud.google.com/iam/docs/understanding-roles#predefined_roles
diff --git a/gapic-google-cloud-storage-v2/pom.xml b/gapic-google-cloud-storage-v2/pom.xml
index 0d97691561..86c78e9b9c 100644
--- a/gapic-google-cloud-storage-v2/pom.xml
+++ b/gapic-google-cloud-storage-v2/pom.xml
@@ -4,13 +4,13 @@
4.0.0com.google.api.grpcgapic-google-cloud-storage-v2
- 2.55.0
+ 2.56.0gapic-google-cloud-storage-v2GRPC library for gapic-google-cloud-storage-v2com.google.cloudgoogle-cloud-storage-parent
- 2.55.0
+ 2.56.0
diff --git a/gapic-google-cloud-storage-v2/src/main/java/com/google/storage/v2/stub/GrpcStorageStub.java b/gapic-google-cloud-storage-v2/src/main/java/com/google/storage/v2/stub/GrpcStorageStub.java
index bd0747d9b6..060ffb20d3 100644
--- a/gapic-google-cloud-storage-v2/src/main/java/com/google/storage/v2/stub/GrpcStorageStub.java
+++ b/gapic-google-cloud-storage-v2/src/main/java/com/google/storage/v2/stub/GrpcStorageStub.java
@@ -90,6 +90,7 @@ public class GrpcStorageStub extends StorageStub {
.setFullMethodName("google.storage.v2.Storage/DeleteBucket")
.setRequestMarshaller(ProtoUtils.marshaller(DeleteBucketRequest.getDefaultInstance()))
.setResponseMarshaller(ProtoUtils.marshaller(Empty.getDefaultInstance()))
+ .setSampledToLocalTracing(true)
.build();
private static final MethodDescriptor getBucketMethodDescriptor =
@@ -98,6 +99,7 @@ public class GrpcStorageStub extends StorageStub {
.setFullMethodName("google.storage.v2.Storage/GetBucket")
.setRequestMarshaller(ProtoUtils.marshaller(GetBucketRequest.getDefaultInstance()))
.setResponseMarshaller(ProtoUtils.marshaller(Bucket.getDefaultInstance()))
+ .setSampledToLocalTracing(true)
.build();
private static final MethodDescriptor createBucketMethodDescriptor =
@@ -106,6 +108,7 @@ public class GrpcStorageStub extends StorageStub {
.setFullMethodName("google.storage.v2.Storage/CreateBucket")
.setRequestMarshaller(ProtoUtils.marshaller(CreateBucketRequest.getDefaultInstance()))
.setResponseMarshaller(ProtoUtils.marshaller(Bucket.getDefaultInstance()))
+ .setSampledToLocalTracing(true)
.build();
private static final MethodDescriptor
@@ -116,6 +119,7 @@ public class GrpcStorageStub extends StorageStub {
.setRequestMarshaller(ProtoUtils.marshaller(ListBucketsRequest.getDefaultInstance()))
.setResponseMarshaller(
ProtoUtils.marshaller(ListBucketsResponse.getDefaultInstance()))
+ .setSampledToLocalTracing(true)
.build();
private static final MethodDescriptor
@@ -126,6 +130,7 @@ public class GrpcStorageStub extends StorageStub {
.setRequestMarshaller(
ProtoUtils.marshaller(LockBucketRetentionPolicyRequest.getDefaultInstance()))
.setResponseMarshaller(ProtoUtils.marshaller(Bucket.getDefaultInstance()))
+ .setSampledToLocalTracing(true)
.build();
private static final MethodDescriptor getIamPolicyMethodDescriptor =
@@ -134,6 +139,7 @@ public class GrpcStorageStub extends StorageStub {
.setFullMethodName("google.storage.v2.Storage/GetIamPolicy")
.setRequestMarshaller(ProtoUtils.marshaller(GetIamPolicyRequest.getDefaultInstance()))
.setResponseMarshaller(ProtoUtils.marshaller(Policy.getDefaultInstance()))
+ .setSampledToLocalTracing(true)
.build();
private static final MethodDescriptor setIamPolicyMethodDescriptor =
@@ -142,6 +148,7 @@ public class GrpcStorageStub extends StorageStub {
.setFullMethodName("google.storage.v2.Storage/SetIamPolicy")
.setRequestMarshaller(ProtoUtils.marshaller(SetIamPolicyRequest.getDefaultInstance()))
.setResponseMarshaller(ProtoUtils.marshaller(Policy.getDefaultInstance()))
+ .setSampledToLocalTracing(true)
.build();
private static final MethodDescriptor
@@ -153,6 +160,7 @@ public class GrpcStorageStub extends StorageStub {
ProtoUtils.marshaller(TestIamPermissionsRequest.getDefaultInstance()))
.setResponseMarshaller(
ProtoUtils.marshaller(TestIamPermissionsResponse.getDefaultInstance()))
+ .setSampledToLocalTracing(true)
.build();
private static final MethodDescriptor updateBucketMethodDescriptor =
@@ -161,6 +169,7 @@ public class GrpcStorageStub extends StorageStub {
.setFullMethodName("google.storage.v2.Storage/UpdateBucket")
.setRequestMarshaller(ProtoUtils.marshaller(UpdateBucketRequest.getDefaultInstance()))
.setResponseMarshaller(ProtoUtils.marshaller(Bucket.getDefaultInstance()))
+ .setSampledToLocalTracing(true)
.build();
private static final MethodDescriptor
@@ -171,6 +180,7 @@ public class GrpcStorageStub extends StorageStub {
.setRequestMarshaller(
ProtoUtils.marshaller(ComposeObjectRequest.getDefaultInstance()))
.setResponseMarshaller(ProtoUtils.marshaller(Object.getDefaultInstance()))
+ .setSampledToLocalTracing(true)
.build();
private static final MethodDescriptor deleteObjectMethodDescriptor =
@@ -179,6 +189,7 @@ public class GrpcStorageStub extends StorageStub {
.setFullMethodName("google.storage.v2.Storage/DeleteObject")
.setRequestMarshaller(ProtoUtils.marshaller(DeleteObjectRequest.getDefaultInstance()))
.setResponseMarshaller(ProtoUtils.marshaller(Empty.getDefaultInstance()))
+ .setSampledToLocalTracing(true)
.build();
private static final MethodDescriptor
@@ -189,6 +200,7 @@ public class GrpcStorageStub extends StorageStub {
.setRequestMarshaller(
ProtoUtils.marshaller(RestoreObjectRequest.getDefaultInstance()))
.setResponseMarshaller(ProtoUtils.marshaller(Object.getDefaultInstance()))
+ .setSampledToLocalTracing(true)
.build();
private static final MethodDescriptor
@@ -200,6 +212,7 @@ public class GrpcStorageStub extends StorageStub {
ProtoUtils.marshaller(CancelResumableWriteRequest.getDefaultInstance()))
.setResponseMarshaller(
ProtoUtils.marshaller(CancelResumableWriteResponse.getDefaultInstance()))
+ .setSampledToLocalTracing(true)
.build();
private static final MethodDescriptor getObjectMethodDescriptor =
@@ -208,6 +221,7 @@ public class GrpcStorageStub extends StorageStub {
.setFullMethodName("google.storage.v2.Storage/GetObject")
.setRequestMarshaller(ProtoUtils.marshaller(GetObjectRequest.getDefaultInstance()))
.setResponseMarshaller(ProtoUtils.marshaller(Object.getDefaultInstance()))
+ .setSampledToLocalTracing(true)
.build();
private static final MethodDescriptor
@@ -217,6 +231,7 @@ public class GrpcStorageStub extends StorageStub {
.setFullMethodName("google.storage.v2.Storage/ReadObject")
.setRequestMarshaller(ProtoUtils.marshaller(ReadObjectRequest.getDefaultInstance()))
.setResponseMarshaller(ProtoUtils.marshaller(ReadObjectResponse.getDefaultInstance()))
+ .setSampledToLocalTracing(true)
.build();
private static final MethodDescriptor
@@ -228,6 +243,7 @@ public class GrpcStorageStub extends StorageStub {
ProtoUtils.marshaller(BidiReadObjectRequest.getDefaultInstance()))
.setResponseMarshaller(
ProtoUtils.marshaller(BidiReadObjectResponse.getDefaultInstance()))
+ .setSampledToLocalTracing(true)
.build();
private static final MethodDescriptor updateObjectMethodDescriptor =
@@ -236,6 +252,7 @@ public class GrpcStorageStub extends StorageStub {
.setFullMethodName("google.storage.v2.Storage/UpdateObject")
.setRequestMarshaller(ProtoUtils.marshaller(UpdateObjectRequest.getDefaultInstance()))
.setResponseMarshaller(ProtoUtils.marshaller(Object.getDefaultInstance()))
+ .setSampledToLocalTracing(true)
.build();
private static final MethodDescriptor
@@ -246,6 +263,7 @@ public class GrpcStorageStub extends StorageStub {
.setRequestMarshaller(ProtoUtils.marshaller(WriteObjectRequest.getDefaultInstance()))
.setResponseMarshaller(
ProtoUtils.marshaller(WriteObjectResponse.getDefaultInstance()))
+ .setSampledToLocalTracing(true)
.build();
private static final MethodDescriptor
@@ -257,6 +275,7 @@ public class GrpcStorageStub extends StorageStub {
ProtoUtils.marshaller(BidiWriteObjectRequest.getDefaultInstance()))
.setResponseMarshaller(
ProtoUtils.marshaller(BidiWriteObjectResponse.getDefaultInstance()))
+ .setSampledToLocalTracing(true)
.build();
private static final MethodDescriptor
@@ -267,6 +286,7 @@ public class GrpcStorageStub extends StorageStub {
.setRequestMarshaller(ProtoUtils.marshaller(ListObjectsRequest.getDefaultInstance()))
.setResponseMarshaller(
ProtoUtils.marshaller(ListObjectsResponse.getDefaultInstance()))
+ .setSampledToLocalTracing(true)
.build();
private static final MethodDescriptor
@@ -277,6 +297,7 @@ public class GrpcStorageStub extends StorageStub {
.setRequestMarshaller(
ProtoUtils.marshaller(RewriteObjectRequest.getDefaultInstance()))
.setResponseMarshaller(ProtoUtils.marshaller(RewriteResponse.getDefaultInstance()))
+ .setSampledToLocalTracing(true)
.build();
private static final MethodDescriptor
@@ -288,6 +309,7 @@ public class GrpcStorageStub extends StorageStub {
ProtoUtils.marshaller(StartResumableWriteRequest.getDefaultInstance()))
.setResponseMarshaller(
ProtoUtils.marshaller(StartResumableWriteResponse.getDefaultInstance()))
+ .setSampledToLocalTracing(true)
.build();
private static final MethodDescriptor
@@ -299,6 +321,7 @@ public class GrpcStorageStub extends StorageStub {
ProtoUtils.marshaller(QueryWriteStatusRequest.getDefaultInstance()))
.setResponseMarshaller(
ProtoUtils.marshaller(QueryWriteStatusResponse.getDefaultInstance()))
+ .setSampledToLocalTracing(true)
.build();
private static final MethodDescriptor moveObjectMethodDescriptor =
@@ -307,6 +330,7 @@ public class GrpcStorageStub extends StorageStub {
.setFullMethodName("google.storage.v2.Storage/MoveObject")
.setRequestMarshaller(ProtoUtils.marshaller(MoveObjectRequest.getDefaultInstance()))
.setResponseMarshaller(ProtoUtils.marshaller(Object.getDefaultInstance()))
+ .setSampledToLocalTracing(true)
.build();
private final UnaryCallable deleteBucketCallable;
diff --git a/generation_config.yaml b/generation_config.yaml
index eb49ac610b..a063b9170d 100644
--- a/generation_config.yaml
+++ b/generation_config.yaml
@@ -1,6 +1,6 @@
-gapic_generator_version: 2.61.0
-googleapis_commitish: 8c74a4f9ad52cfd7a7d1f6427fa0a0511377a395
-libraries_bom_version: 26.65.0
+gapic_generator_version: 2.62.0
+googleapis_commitish: 3b2a2ae91db23a9c879b2b725d6a5de6bd64a800
+libraries_bom_version: 26.66.0
libraries:
- api_shortname: storage
name_pretty: Cloud Storage
diff --git a/google-cloud-storage-bom/pom.xml b/google-cloud-storage-bom/pom.xml
index 87397bf767..ba57b70c08 100644
--- a/google-cloud-storage-bom/pom.xml
+++ b/google-cloud-storage-bom/pom.xml
@@ -19,12 +19,12 @@
4.0.0com.google.cloudgoogle-cloud-storage-bom
- 2.55.0
+ 2.56.0pomcom.google.cloudsdk-platform-java-config
- 3.51.0
+ 3.52.0
@@ -69,37 +69,37 @@
com.google.cloudgoogle-cloud-storage
- 2.55.0
+ 2.56.0com.google.api.grpcgapic-google-cloud-storage-v2
- 2.55.0
+ 2.56.0com.google.api.grpcgrpc-google-cloud-storage-v2
- 2.55.0
+ 2.56.0com.google.api.grpcproto-google-cloud-storage-v2
- 2.55.0
+ 2.56.0com.google.cloudgoogle-cloud-storage-control
- 2.55.0
+ 2.56.0com.google.api.grpcgrpc-google-cloud-storage-control-v2
- 2.55.0
+ 2.56.0com.google.api.grpcproto-google-cloud-storage-control-v2
- 2.55.0
+ 2.56.0
diff --git a/google-cloud-storage-control/pom.xml b/google-cloud-storage-control/pom.xml
index f004cfe0af..0a29a76216 100644
--- a/google-cloud-storage-control/pom.xml
+++ b/google-cloud-storage-control/pom.xml
@@ -5,13 +5,13 @@
4.0.0com.google.cloudgoogle-cloud-storage-control
- 2.55.0
+ 2.56.0google-cloud-storage-controlGRPC library for google-cloud-storage-controlcom.google.cloudgoogle-cloud-storage-parent
- 2.55.0
+ 2.56.0
diff --git a/google-cloud-storage-control/src/main/java/com/google/storage/control/v2/stub/GrpcStorageControlStub.java b/google-cloud-storage-control/src/main/java/com/google/storage/control/v2/stub/GrpcStorageControlStub.java
index 78876be707..19f8d1916a 100644
--- a/google-cloud-storage-control/src/main/java/com/google/storage/control/v2/stub/GrpcStorageControlStub.java
+++ b/google-cloud-storage-control/src/main/java/com/google/storage/control/v2/stub/GrpcStorageControlStub.java
@@ -88,6 +88,7 @@ public class GrpcStorageControlStub extends StorageControlStub {
.setFullMethodName("google.storage.control.v2.StorageControl/CreateFolder")
.setRequestMarshaller(ProtoUtils.marshaller(CreateFolderRequest.getDefaultInstance()))
.setResponseMarshaller(ProtoUtils.marshaller(Folder.getDefaultInstance()))
+ .setSampledToLocalTracing(true)
.build();
private static final MethodDescriptor deleteFolderMethodDescriptor =
@@ -96,6 +97,7 @@ public class GrpcStorageControlStub extends StorageControlStub {
.setFullMethodName("google.storage.control.v2.StorageControl/DeleteFolder")
.setRequestMarshaller(ProtoUtils.marshaller(DeleteFolderRequest.getDefaultInstance()))
.setResponseMarshaller(ProtoUtils.marshaller(Empty.getDefaultInstance()))
+ .setSampledToLocalTracing(true)
.build();
private static final MethodDescriptor getFolderMethodDescriptor =
@@ -104,6 +106,7 @@ public class GrpcStorageControlStub extends StorageControlStub {
.setFullMethodName("google.storage.control.v2.StorageControl/GetFolder")
.setRequestMarshaller(ProtoUtils.marshaller(GetFolderRequest.getDefaultInstance()))
.setResponseMarshaller(ProtoUtils.marshaller(Folder.getDefaultInstance()))
+ .setSampledToLocalTracing(true)
.build();
private static final MethodDescriptor
@@ -114,6 +117,7 @@ public class GrpcStorageControlStub extends StorageControlStub {
.setRequestMarshaller(ProtoUtils.marshaller(ListFoldersRequest.getDefaultInstance()))
.setResponseMarshaller(
ProtoUtils.marshaller(ListFoldersResponse.getDefaultInstance()))
+ .setSampledToLocalTracing(true)
.build();
private static final MethodDescriptor
@@ -123,6 +127,7 @@ public class GrpcStorageControlStub extends StorageControlStub {
.setFullMethodName("google.storage.control.v2.StorageControl/RenameFolder")
.setRequestMarshaller(ProtoUtils.marshaller(RenameFolderRequest.getDefaultInstance()))
.setResponseMarshaller(ProtoUtils.marshaller(Operation.getDefaultInstance()))
+ .setSampledToLocalTracing(true)
.build();
private static final MethodDescriptor
@@ -133,6 +138,7 @@ public class GrpcStorageControlStub extends StorageControlStub {
.setRequestMarshaller(
ProtoUtils.marshaller(GetStorageLayoutRequest.getDefaultInstance()))
.setResponseMarshaller(ProtoUtils.marshaller(StorageLayout.getDefaultInstance()))
+ .setSampledToLocalTracing(true)
.build();
private static final MethodDescriptor
@@ -143,6 +149,7 @@ public class GrpcStorageControlStub extends StorageControlStub {
.setRequestMarshaller(
ProtoUtils.marshaller(CreateManagedFolderRequest.getDefaultInstance()))
.setResponseMarshaller(ProtoUtils.marshaller(ManagedFolder.getDefaultInstance()))
+ .setSampledToLocalTracing(true)
.build();
private static final MethodDescriptor
@@ -153,6 +160,7 @@ public class GrpcStorageControlStub extends StorageControlStub {
.setRequestMarshaller(
ProtoUtils.marshaller(DeleteManagedFolderRequest.getDefaultInstance()))
.setResponseMarshaller(ProtoUtils.marshaller(Empty.getDefaultInstance()))
+ .setSampledToLocalTracing(true)
.build();
private static final MethodDescriptor
@@ -163,6 +171,7 @@ public class GrpcStorageControlStub extends StorageControlStub {
.setRequestMarshaller(
ProtoUtils.marshaller(GetManagedFolderRequest.getDefaultInstance()))
.setResponseMarshaller(ProtoUtils.marshaller(ManagedFolder.getDefaultInstance()))
+ .setSampledToLocalTracing(true)
.build();
private static final MethodDescriptor
@@ -174,6 +183,7 @@ public class GrpcStorageControlStub extends StorageControlStub {
ProtoUtils.marshaller(ListManagedFoldersRequest.getDefaultInstance()))
.setResponseMarshaller(
ProtoUtils.marshaller(ListManagedFoldersResponse.getDefaultInstance()))
+ .setSampledToLocalTracing(true)
.build();
private static final MethodDescriptor
@@ -184,6 +194,7 @@ public class GrpcStorageControlStub extends StorageControlStub {
.setRequestMarshaller(
ProtoUtils.marshaller(CreateAnywhereCacheRequest.getDefaultInstance()))
.setResponseMarshaller(ProtoUtils.marshaller(Operation.getDefaultInstance()))
+ .setSampledToLocalTracing(true)
.build();
private static final MethodDescriptor
@@ -194,6 +205,7 @@ public class GrpcStorageControlStub extends StorageControlStub {
.setRequestMarshaller(
ProtoUtils.marshaller(UpdateAnywhereCacheRequest.getDefaultInstance()))
.setResponseMarshaller(ProtoUtils.marshaller(Operation.getDefaultInstance()))
+ .setSampledToLocalTracing(true)
.build();
private static final MethodDescriptor
@@ -204,6 +216,7 @@ public class GrpcStorageControlStub extends StorageControlStub {
.setRequestMarshaller(
ProtoUtils.marshaller(DisableAnywhereCacheRequest.getDefaultInstance()))
.setResponseMarshaller(ProtoUtils.marshaller(AnywhereCache.getDefaultInstance()))
+ .setSampledToLocalTracing(true)
.build();
private static final MethodDescriptor
@@ -214,6 +227,7 @@ public class GrpcStorageControlStub extends StorageControlStub {
.setRequestMarshaller(
ProtoUtils.marshaller(PauseAnywhereCacheRequest.getDefaultInstance()))
.setResponseMarshaller(ProtoUtils.marshaller(AnywhereCache.getDefaultInstance()))
+ .setSampledToLocalTracing(true)
.build();
private static final MethodDescriptor
@@ -224,6 +238,7 @@ public class GrpcStorageControlStub extends StorageControlStub {
.setRequestMarshaller(
ProtoUtils.marshaller(ResumeAnywhereCacheRequest.getDefaultInstance()))
.setResponseMarshaller(ProtoUtils.marshaller(AnywhereCache.getDefaultInstance()))
+ .setSampledToLocalTracing(true)
.build();
private static final MethodDescriptor
@@ -234,6 +249,7 @@ public class GrpcStorageControlStub extends StorageControlStub {
.setRequestMarshaller(
ProtoUtils.marshaller(GetAnywhereCacheRequest.getDefaultInstance()))
.setResponseMarshaller(ProtoUtils.marshaller(AnywhereCache.getDefaultInstance()))
+ .setSampledToLocalTracing(true)
.build();
private static final MethodDescriptor
@@ -245,6 +261,7 @@ public class GrpcStorageControlStub extends StorageControlStub {
ProtoUtils.marshaller(ListAnywhereCachesRequest.getDefaultInstance()))
.setResponseMarshaller(
ProtoUtils.marshaller(ListAnywhereCachesResponse.getDefaultInstance()))
+ .setSampledToLocalTracing(true)
.build();
private static final MethodDescriptor
@@ -256,6 +273,7 @@ public class GrpcStorageControlStub extends StorageControlStub {
.setRequestMarshaller(
ProtoUtils.marshaller(GetProjectIntelligenceConfigRequest.getDefaultInstance()))
.setResponseMarshaller(ProtoUtils.marshaller(IntelligenceConfig.getDefaultInstance()))
+ .setSampledToLocalTracing(true)
.build();
private static final MethodDescriptor
@@ -268,6 +286,7 @@ public class GrpcStorageControlStub extends StorageControlStub {
ProtoUtils.marshaller(
UpdateProjectIntelligenceConfigRequest.getDefaultInstance()))
.setResponseMarshaller(ProtoUtils.marshaller(IntelligenceConfig.getDefaultInstance()))
+ .setSampledToLocalTracing(true)
.build();
private static final MethodDescriptor
@@ -279,6 +298,7 @@ public class GrpcStorageControlStub extends StorageControlStub {
.setRequestMarshaller(
ProtoUtils.marshaller(GetFolderIntelligenceConfigRequest.getDefaultInstance()))
.setResponseMarshaller(ProtoUtils.marshaller(IntelligenceConfig.getDefaultInstance()))
+ .setSampledToLocalTracing(true)
.build();
private static final MethodDescriptor
@@ -290,6 +310,7 @@ public class GrpcStorageControlStub extends StorageControlStub {
.setRequestMarshaller(
ProtoUtils.marshaller(UpdateFolderIntelligenceConfigRequest.getDefaultInstance()))
.setResponseMarshaller(ProtoUtils.marshaller(IntelligenceConfig.getDefaultInstance()))
+ .setSampledToLocalTracing(true)
.build();
private static final MethodDescriptor<
@@ -304,6 +325,7 @@ public class GrpcStorageControlStub extends StorageControlStub {
ProtoUtils.marshaller(
GetOrganizationIntelligenceConfigRequest.getDefaultInstance()))
.setResponseMarshaller(ProtoUtils.marshaller(IntelligenceConfig.getDefaultInstance()))
+ .setSampledToLocalTracing(true)
.build();
private static final MethodDescriptor<
@@ -318,6 +340,7 @@ public class GrpcStorageControlStub extends StorageControlStub {
ProtoUtils.marshaller(
UpdateOrganizationIntelligenceConfigRequest.getDefaultInstance()))
.setResponseMarshaller(ProtoUtils.marshaller(IntelligenceConfig.getDefaultInstance()))
+ .setSampledToLocalTracing(true)
.build();
private final UnaryCallable createFolderCallable;
diff --git a/google-cloud-storage/clirr-ignored-differences.xml b/google-cloud-storage/clirr-ignored-differences.xml
index 0af49c35a7..9cb223aebc 100644
--- a/google-cloud-storage/clirr-ignored-differences.xml
+++ b/google-cloud-storage/clirr-ignored-differences.xml
@@ -161,4 +161,35 @@
com.google.cloud.storage.BucketInfo$Builder setGoogleManagedEncryptionEnforcementConfig(com.google.cloud.storage.BucketInfo$GoogleManagedEncryptionEnforcementConfig)
+
+
+ 7004
+ com/google/cloud/storage/FlushPolicy$MinFlushSizeFlushPolicy
+ FlushPolicy$MinFlushSizeFlushPolicy(int)
+
+
+ 7009
+ com/google/cloud/storage/FlushPolicy$MinFlushSizeFlushPolicy
+ FlushPolicy$MinFlushSizeFlushPolicy(int)
+
+
+ 7009
+ com/google/cloud/storage/FlushPolicy$MaxFlushSizeFlushPolicy
+ FlushPolicy$MaxFlushSizeFlushPolicy(int)
+
+
+
+ 7012
+ com/google/cloud/storage/BlobAppendableUpload$AppendableUploadWriteableByteChannel
+ int write(java.nio.ByteBuffer)
+
+
+
+
+ 7012
+ com/google/cloud/storage/BlobAppendableUpload$AppendableUploadWriteableByteChannel
+ void flush()
+
+
+
diff --git a/google-cloud-storage/pom.xml b/google-cloud-storage/pom.xml
index 1f3decd8ab..d8e343f743 100644
--- a/google-cloud-storage/pom.xml
+++ b/google-cloud-storage/pom.xml
@@ -2,7 +2,7 @@
4.0.0google-cloud-storage
- 2.55.0
+ 2.56.0jarGoogle Cloud Storagehttps://github.com/googleapis/java-storage
@@ -12,7 +12,7 @@
com.google.cloudgoogle-cloud-storage-parent
- 2.55.0
+ 2.56.0google-cloud-storage
@@ -239,14 +239,14 @@
com.google.api.grpcproto-google-cloud-kms-v1
- 0.164.0
+ 0.166.0testcom.google.cloudgoogle-cloud-kms
- 2.73.0
+ 2.75.0test
diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiAppendableUnbufferedWritableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiAppendableUnbufferedWritableByteChannel.java
new file mode 100644
index 0000000000..28663f813b
--- /dev/null
+++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiAppendableUnbufferedWritableByteChannel.java
@@ -0,0 +1,200 @@
+/*
+ * Copyright 2025 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.storage;
+
+import com.google.cloud.BaseServiceException;
+import com.google.cloud.storage.ChunkSegmenter.ChunkSegment;
+import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+final class BidiAppendableUnbufferedWritableByteChannel implements UnbufferedWritableByteChannel {
+
+ private final BidiUploadStreamingStream stream;
+ private final ChunkSegmenter chunkSegmenter;
+
+ private boolean open;
+ private long writeOffset;
+ private volatile boolean nextWriteShouldFinalize;
+ private boolean writeCalledAtLeastOnce;
+
+ /** If write throws an error, don't attempt to finalize things when {@link #close()} is called. */
+ private boolean writeThrewError;
+
+ BidiAppendableUnbufferedWritableByteChannel(
+ BidiUploadStreamingStream stream, ChunkSegmenter chunkSegmenter, long writeOffset) {
+ this.stream = stream;
+ this.chunkSegmenter = chunkSegmenter;
+ this.open = true;
+ this.writeOffset = writeOffset;
+ this.nextWriteShouldFinalize = false;
+ this.writeThrewError = false;
+ }
+
+ @Override
+ public long write(ByteBuffer[] srcs, int srcsOffset, int srcsLength) throws IOException {
+ return internalWrite(srcs, srcsOffset, srcsLength);
+ }
+
+ @Override
+ public long writeAndClose(ByteBuffer[] srcs, int offset, int length) throws IOException {
+ long totalRemaining = Buffers.totalRemaining(srcs, offset, length);
+ // internalWrite is non-blocking, but close is blocking.
+ // loop here to ensure all the bytes we need flush are enqueued before we transition to trying
+ // to close.
+ long written = 0;
+ do {
+ written += internalWrite(srcs, offset, length);
+ } while (written < totalRemaining);
+ close();
+ return written;
+ }
+
+ @Override
+ public boolean isOpen() {
+ return open;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!open) {
+ return;
+ }
+ try {
+ if (writeThrewError) {
+ return;
+ }
+
+ if (!writeCalledAtLeastOnce) {
+ stream.flush();
+ }
+ if (nextWriteShouldFinalize) {
+ //noinspection StatementWithEmptyBody
+ while (!stream.finishWrite(writeOffset)) {}
+ } else {
+ //noinspection StatementWithEmptyBody
+ while (!stream.closeStream(writeOffset)) {}
+ }
+
+ awaitResultFuture();
+ } finally {
+ stream.sendClose();
+ open = false;
+ }
+ }
+
+ public void nextWriteShouldFinalize() {
+ this.nextWriteShouldFinalize = true;
+ }
+
+ void flush() throws InterruptedException {
+ stream.flush();
+ stream.awaitAckOf(writeOffset);
+ }
+
+ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength) throws IOException {
+ if (!open) {
+ throw new ClosedChannelException();
+ }
+ // error early. if the result future is already failed, await it to throw the error
+ if (stream.getResultFuture().isDone()) {
+ awaitResultFuture();
+ return 0;
+ }
+ writeCalledAtLeastOnce = true;
+
+ long availableCapacity = stream.availableCapacity();
+ if (availableCapacity <= 0) {
+ return 0;
+ }
+ RewindableContent rewindableContent = RewindableContent.of(srcs, srcsOffset, srcsLength);
+ long totalBufferRemaining = rewindableContent.getLength();
+
+ ChunkSegment[] data = chunkSegmenter.segmentBuffers(srcs, srcsOffset, srcsLength, true);
+ if (data.length == 0) {
+ return 0;
+ }
+ // we consumed some bytes from srcs, flag our content as dirty since we aren't writing
+ // those bytes to implicitly flag as dirty.
+ rewindableContent.flagDirty();
+
+ long bytesConsumed = 0;
+ for (int i = 0, len = data.length, lastIdx = len - 1; i < len; i++) {
+ ChunkSegment datum = data[i];
+ int size = datum.getB().size();
+ boolean appended;
+ if (i < lastIdx) {
+ appended = stream.append(datum);
+ } else if (i == lastIdx && nextWriteShouldFinalize) {
+ appended = stream.appendAndFinalize(datum);
+ } else {
+ appended = stream.appendAndFlush(datum);
+ }
+ if (appended) {
+ bytesConsumed += size;
+ writeOffset += size;
+ } else {
+ // if we weren't able to trigger a flush by reaching the end of the array and calling
+ // appendAndFlush, explicitly call flush here so that some progress can be made.
+ // we prefer appendAndFlush so a separate message is not needed, but an extra message
+ // in order to make progress and free buffer space is better than ending up in a live-lock.
+ stream.flush();
+ break;
+ }
+ }
+
+ if (bytesConsumed != totalBufferRemaining) {
+ rewindableContent.rewindTo(bytesConsumed);
+ }
+
+ return bytesConsumed;
+ }
+
+ private void awaitResultFuture() throws IOException {
+ try {
+ stream.getResultFuture().get(10_717, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ InterruptedIOException ioe = new InterruptedIOException();
+ ioe.initCause(e);
+ writeThrewError = true;
+ throw ioe;
+ } catch (ExecutionException e) {
+ BaseServiceException coalesce = StorageException.coalesce(e.getCause());
+ String message = coalesce.getMessage();
+ String ioExceptionMessage = message;
+ // if the failure is an upload scenario we detect client side, it's message will be
+ // verbose. To avoid duplication, select the first line only for the io exception
+ int firstNewLineIndex = message != null ? message.indexOf('\n') : -1;
+ if (firstNewLineIndex > -1) {
+ ioExceptionMessage = message.substring(0, firstNewLineIndex);
+ }
+ IOException ioException = new IOException(ioExceptionMessage, coalesce);
+ // ioException.addSuppressed(new AsyncStorageTaskException());
+ writeThrewError = true;
+ throw ioException;
+ } catch (TimeoutException e) {
+ writeThrewError = true;
+ throw new IOException(e);
+ }
+ }
+}
diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiBlobWriteSessionConfig.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiBlobWriteSessionConfig.java
index b0e5ce639d..5cd7a8a650 100644
--- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiBlobWriteSessionConfig.java
+++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiBlobWriteSessionConfig.java
@@ -111,7 +111,8 @@ public WritableByteChannelSession, BlobInfo> writeSession(
GrpcStorageImpl grpc = (GrpcStorageImpl) s;
GrpcCallContext grpcCallContext =
opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
- BidiWriteObjectRequest req = grpc.getBidiWriteObjectRequest(info, opts);
+ BidiWriteObjectRequest req =
+ grpc.getBidiWriteObjectRequest(info, opts, false);
ApiFuture startResumableWrite =
grpc.startResumableWrite(grpcCallContext, req, opts);
diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiResumableWrite.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiResumableWrite.java
index 18e7cfff96..0f5a378f80 100644
--- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiResumableWrite.java
+++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiResumableWrite.java
@@ -94,73 +94,3 @@ static BidiResumableWrite identity(BidiResumableWrite w) {
return w;
}
}
-
-final class BidiAppendableWrite implements BidiWriteObjectRequestBuilderFactory {
-
- private final BidiWriteObjectRequest req;
-
- public BidiAppendableWrite(BidiWriteObjectRequest req) {
- this(req, false);
- }
-
- public BidiAppendableWrite(BidiWriteObjectRequest req, boolean takeOver) {
- if (takeOver) {
- this.req = req;
- } else {
- req =
- req.toBuilder()
- .setWriteObjectSpec(req.getWriteObjectSpec().toBuilder().setAppendable(true).build())
- .build();
- this.req = req;
- }
- }
-
- public BidiWriteObjectRequest getReq() {
- return req;
- }
-
- @Override
- public BidiWriteObjectRequest.Builder newBuilder() {
- return req.toBuilder();
- }
-
- @Override
- public @Nullable String bucketName() {
- if (req.hasWriteObjectSpec() && req.getWriteObjectSpec().hasResource()) {
- return req.getWriteObjectSpec().getResource().getBucket();
- } else if (req.hasAppendObjectSpec()) {
- return req.getAppendObjectSpec().getBucket();
- }
- return null;
- }
-
- @Override
- public String toString() {
- return "BidiAppendableWrite{" + "req=" + fmtProto(req) + '}';
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof BidiAppendableWrite)) {
- return false;
- }
- BidiAppendableWrite BidiAppendableWrite = (BidiAppendableWrite) o;
- return Objects.equals(req, BidiAppendableWrite.getReq());
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(req);
- }
-
- /**
- * Helper function which is more specific than {@link Function#identity()}. Constraining the input
- * and output to be exactly {@link BidiAppendableWrite}.
- */
- static BidiAppendableWrite identity(BidiAppendableWrite w) {
- return w;
- }
-}
diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadState.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadState.java
new file mode 100644
index 0000000000..08ed0c414f
--- /dev/null
+++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadState.java
@@ -0,0 +1,1136 @@
+/*
+ * Copyright 2025 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.storage;
+
+import static com.google.cloud.storage.StorageV2ProtoUtils.fmtProto;
+import static com.google.cloud.storage.Utils.ifNonNull;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.SettableApiFuture;
+import com.google.api.gax.grpc.GrpcCallContext;
+import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Descriptors.OneofDescriptor;
+import com.google.storage.v2.AppendObjectSpec;
+import com.google.storage.v2.BidiWriteHandle;
+import com.google.storage.v2.BidiWriteObjectRedirectedError;
+import com.google.storage.v2.BidiWriteObjectRequest;
+import com.google.storage.v2.BidiWriteObjectResponse;
+import com.google.storage.v2.ChecksummedData;
+import com.google.storage.v2.WriteObjectSpec;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.Marker;
+import org.slf4j.MarkerFactory;
+
+@SuppressWarnings("LoggingSimilarMessage")
+abstract class BidiUploadState {
+ private static final Logger LOGGER = LoggerFactory.getLogger(BidiUploadState.class);
+ private static final Marker TRACE_ENTER = MarkerFactory.getMarker("enter");
+ private static final Marker TRACE_EXIT = MarkerFactory.getMarker("exit");
+
+ static final OneofDescriptor FIRST_MESSAGE_DESCRIPTOR =
+ BidiWriteObjectRequest.getDescriptor().getOneofs().stream()
+ .filter(d -> "first_message".equalsIgnoreCase(d.getName()))
+ .findFirst()
+ .orElseThrow(
+ () -> new IllegalStateException("BidiWriteObject.first_message oneof not found"));
+
+ // seal this class to extension
+ private BidiUploadState() {}
+
+ @VisibleForTesting
+ BidiUploadState(String testName) {
+ // some runtime enforcement that this constructor is only called from a test
+ // if we had java9+ we could seal this all the way without this hack
+ StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
+ boolean isJunitTest =
+ Arrays.stream(stackTrace).anyMatch(ste -> ste.getClassName().startsWith("org.junit"));
+
+ checkState(isJunitTest, "not a junit test", testName);
+ }
+
+ protected final StorageException err(
+ UploadFailureScenario scenario, BidiWriteObjectResponse response) {
+ BidiWriteObjectRequest t = peekLast();
+ GrpcCallContext ctx = enqueueFirstMessageAndGetGrpcCallContext();
+ return scenario.toStorageException(Utils.nullSafeList(t), response, ctx, null);
+ }
+
+ @Nullable Crc32cLengthKnown getCumulativeCrc32c() {
+ return unimplemented();
+ }
+
+ long getTotalSentBytes() {
+ return unimplemented();
+ }
+
+ long getConfirmedBytes() {
+ return unimplemented();
+ }
+
+ long availableCapacity() {
+ return unimplemented();
+ }
+
+ boolean offer(ChunkSegmenter.@NonNull ChunkSegment data) {
+ return unimplemented();
+ }
+
+ boolean finalFlush(long totalLength) {
+ return unimplemented();
+ }
+
+ boolean offer(@NonNull BidiWriteObjectRequest e) {
+ return unimplemented();
+ }
+
+ void updateStateFromResponse(BidiWriteObjectResponse response) {
+ unimplemented();
+ }
+
+ @NonNull GrpcCallContext enqueueFirstMessageAndGetGrpcCallContext() {
+ return unimplemented();
+ }
+
+ void sendVia(Consumer consumer) {
+ unimplemented();
+ }
+
+ void updateFromRedirect(@NonNull BidiWriteObjectRedirectedError redirect) {
+ unimplemented();
+ }
+
+ void terminalError() {
+ unimplemented();
+ }
+
+ void pendingRetry() {
+ unimplemented();
+ }
+
+ void retrying() {
+ unimplemented();
+ }
+
+ @Nullable BidiWriteObjectRequest peekLast() {
+ return unimplemented();
+ }
+
+ boolean isFinalizing() {
+ return unimplemented();
+ }
+
+ ApiFuture beginReconciliation() {
+ return unimplemented();
+ }
+
+ static AppendableUploadState appendableNew(
+ BidiWriteObjectRequest initial,
+ Supplier baseCallContext,
+ long maxBytes,
+ SettableApiFuture resultFuture,
+ @Nullable Crc32cLengthKnown initialCrc32c) {
+ checkArgument(
+ initial.hasWriteObjectSpec(), "provided initial request did not contain a WriteObjectSpec");
+ WriteObjectSpec spec = initial.getWriteObjectSpec();
+ return new NewAppendableUploadState(
+ initial, spec, baseCallContext, maxBytes, resultFuture, initialCrc32c);
+ }
+
+ static AppendableUploadState appendableTakeover(
+ BidiWriteObjectRequest initial,
+ Supplier baseCallContext,
+ long maxBytes,
+ SettableApiFuture resultFuture,
+ @Nullable Crc32cLengthKnown initialCrc32c) {
+ checkArgument(
+ initial.hasAppendObjectSpec(),
+ "provided initial request did not contain a AppendableObjectSpec");
+ AppendObjectSpec spec = initial.getAppendObjectSpec();
+ return new TakeoverAppendableUploadState(
+ initial, spec, baseCallContext, maxBytes, resultFuture, initialCrc32c);
+ }
+
+ private static ImmutableMap> makeHeadersMap(
+ Stream xGoogRequestParamsEntries) {
+ return ImmutableMap.of(
+ "x-goog-request-params",
+ ImmutableList.of(
+ xGoogRequestParamsEntries.filter(Objects::nonNull).collect(Collectors.joining("&"))));
+ }
+
+ /**
+ * Create a single BidiWriteObjectRequest consisting of the same semantic meaning as if doing
+ * first then second.
+ *
+ * @throws IllegalArgumentException if both first and second have checksummedData
+ */
+ static BidiWriteObjectRequest concatenate(
+ BidiWriteObjectRequest first, BidiWriteObjectRequest second) {
+ checkArgument(
+ !(first.hasChecksummedData() && second.hasChecksummedData()),
+ "attempting to merge two requests that both specify checksummed_data");
+ BidiWriteObjectRequest.Builder b = first.toBuilder().mergeFrom(second);
+ long lwo = first.getWriteOffset();
+ long rwo = second.getWriteOffset();
+ if (first.hasChecksummedData()) {
+ int size = first.getChecksummedData().getContent().size();
+ checkArgument(
+ lwo + size == rwo,
+ "(leftWriteOffset + size == rightWriteOffset) (%s + %s == %s)",
+ lwo,
+ size,
+ rwo);
+ b.setWriteOffset(lwo);
+ } else {
+ b.setWriteOffset(rwo);
+ }
+
+ // finish_write implies flush & state_lookup. dedupe to avoid an extra incremental message
+ if (second.getFinishWrite() && (first.getFlush() || first.getStateLookup())) {
+ b.clearFlush().clearStateLookup();
+ }
+ return b.build();
+ }
+
+ @Nullable StorageException onResponse(BidiWriteObjectResponse response) {
+ return unimplemented();
+ }
+
+ State getState() {
+ return unimplemented();
+ }
+
+ @VisibleForTesting
+ @Nullable BidiWriteObjectRequest peekFirst() {
+ return unimplemented();
+ }
+
+ SettableApiFuture getResultFuture() {
+ return unimplemented();
+ }
+
+ void awaitState(State... state) throws InterruptedException {
+ unimplemented();
+ }
+
+ public void awaitTakeoverStateReconciliation(Runnable restart) {
+ unimplemented();
+ }
+
+ public void awaitAck(long writeOffset) throws InterruptedException {
+ unimplemented();
+ }
+
+ enum State {
+ INITIALIZING,
+ TAKEOVER,
+ RUNNING,
+ PENDING_RETRY,
+ RETRYING,
+ TERMINAL_SUCCESS,
+ TERMINAL_ERROR;
+
+ private static final State[] allNonTerminal =
+ new State[] {INITIALIZING, TAKEOVER, RUNNING, PENDING_RETRY, RETRYING};
+
+ boolean in(State... states) {
+ for (State state : states) {
+ if (state == this) {
+ return true;
+ }
+ }
+ return false;
+ }
+ }
+
+ private static T unimplemented() {
+ throw new IllegalStateException("not implemented");
+ }
+
+ abstract static class BaseUploadState extends BidiUploadState {
+
+ protected final BidiWriteObjectRequest initial;
+ protected final Supplier baseCallContext;
+ protected final ReentrantLock lock;
+ protected final Condition stateUpdated;
+ protected final Condition confirmedBytesUpdated;
+
+ /** The maximum number of bytes allowed to be enqueued in {@link #queue} across all messages. */
+ protected final long maxBytes;
+
+ protected final ArrayList queue;
+ protected final SettableApiFuture resultFuture;
+
+ /** The total number of bytes currently enqueued in {@link #queue} */
+ private long enqueuedBytes;
+
+ /** A value in the range of {@code -1 <= lastSentRequest && lastSentRequest < queue.size()} */
+ @VisibleForTesting int lastSentRequestIndex;
+
+ /** The minimum offset of bytes for those pending messages. */
+ protected long minByteOffset;
+
+ /**
+ * The number of bytes that have been "sent". This might also be named something like
+ * cumulativeWriteOffset.
+ */
+ protected long totalSentBytes;
+
+ protected @Nullable Crc32cLengthKnown cumulativeCrc32c;
+
+ /**
+ * Initially {@code -1} to signify the upload does not exist at all in the server, when the
+ * server responds successfully this will be updated to a value >= 0.
+ */
+ protected long confirmedBytes;
+
+ protected long generation;
+ protected @Nullable BidiWriteHandle writeHandle;
+ protected @Nullable String routingToken;
+ protected @NonNull State state;
+ protected @MonotonicNonNull BidiWriteObjectResponse lastResponseWithResource;
+ protected @Nullable State stateToReturnToAfterRetry;
+ protected boolean finalFlushSignaled;
+ protected boolean finalFlushSent;
+ protected boolean finishWriteSignaled;
+ protected boolean finishWriteSent;
+ protected @MonotonicNonNull OpenArguments lastOpenArguments;
+ protected @Nullable SettableApiFuture pendingReconciliation;
+
+ private BaseUploadState(
+ BidiWriteObjectRequest initial,
+ Supplier baseCallContext,
+ long maxBytes,
+ SettableApiFuture resultFuture,
+ @Nullable Crc32cLengthKnown initialCrc32c,
+ State startingState) {
+ this.initial = initial;
+ this.baseCallContext = baseCallContext;
+ this.resultFuture = resultFuture;
+ this.cumulativeCrc32c = initialCrc32c;
+ this.maxBytes = maxBytes;
+ this.queue = new ArrayList<>();
+ this.enqueuedBytes = 0;
+ this.lock = new ReentrantLock();
+ this.stateUpdated = lock.newCondition();
+ this.confirmedBytesUpdated = lock.newCondition();
+ this.lastSentRequestIndex = -1;
+ this.minByteOffset = 0;
+ this.totalSentBytes = 0;
+ this.confirmedBytes = -1;
+ this.state = startingState;
+ }
+
+ @Override
+ final State getState() {
+ lock.lock();
+ try {
+ return state;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ final @Nullable Crc32cLengthKnown getCumulativeCrc32c() {
+ lock.lock();
+ try {
+ return cumulativeCrc32c;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ final long getTotalSentBytes() {
+ lock.lock();
+ try {
+ return totalSentBytes;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ final long getConfirmedBytes() {
+ lock.lock();
+ try {
+ return confirmedBytes;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ final long availableCapacity() {
+ lock.lock();
+ try {
+ return maxBytes - enqueuedBytes;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ final boolean offer(ChunkSegmenter.@NonNull ChunkSegment datum) {
+ lock.lock();
+ try {
+ requireNonNull(datum, "data must be non null");
+ validateCurrentStateIsOneOf(State.allNonTerminal);
+ checkNotFinalizing();
+ ByteString b = datum.getB();
+ long availableCapacity = availableCapacity();
+ int size = b.size();
+ if (size <= availableCapacity) {
+ Crc32cLengthKnown crc32c = datum.getCrc32c();
+ ChecksummedData.Builder checksummedData = ChecksummedData.newBuilder().setContent(b);
+ if (crc32c != null) {
+ checksummedData.setCrc32C(crc32c.getValue());
+ }
+ ChecksummedData built = checksummedData.build();
+ boolean offered =
+ internalOffer(
+ BidiWriteObjectRequest.newBuilder()
+ .setWriteOffset(totalSentBytes)
+ .setChecksummedData(built)
+ .build());
+ if (offered) {
+ cumulativeCrc32c = crc32cConcat(crc32c);
+ }
+ return offered;
+ }
+ return false;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public boolean finalFlush(long totalLength) {
+ lock.lock();
+ try {
+ validateCurrentStateIsOneOf(State.allNonTerminal);
+ checkNotFinalizing();
+ checkArgument(
+ totalLength == totalSentBytes,
+ "(totalLength == totalSentBytes) (%s == %s)",
+ totalLength,
+ totalSentBytes);
+
+ BidiWriteObjectRequest flush =
+ BidiWriteObjectRequest.newBuilder()
+ .setWriteOffset(totalLength)
+ .setFlush(true)
+ .setStateLookup(true)
+ .build();
+
+ BidiWriteObjectRequest currentLast = peekLast();
+ boolean equals = flush.equals(currentLast);
+ if (equals && finalFlushSignaled) {
+ return true;
+ } else if (equals && lastSentRequestIndex == queue.size() - 1) {
+ finalFlushSignaled = true;
+ finalFlushSent = true;
+ return true;
+ }
+
+ boolean offered = internalOffer(flush);
+ if (offered) {
+ finalFlushSignaled = true;
+ }
+ return offered;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ final boolean offer(@NonNull BidiWriteObjectRequest e) {
+ lock.lock();
+ try {
+ requireNonNull(e, "e must be non null");
+ validateCurrentStateIsOneOf(State.allNonTerminal);
+ if (e.hasChecksummedData()) {
+ checkNotFinalizing();
+ }
+ int size = e.getChecksummedData().getContent().size();
+ long availableCapacity = availableCapacity();
+ if (size > availableCapacity) {
+ return false;
+ }
+
+ checkArgument(
+ e.hasOneof(FIRST_MESSAGE_DESCRIPTOR) || e.getWriteOffset() == totalSentBytes,
+ "(write_offset == totalSentBytes) (%s == %s)",
+ e.getWriteOffset(),
+ totalSentBytes);
+ return internalOffer(e);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ protected void setConfirmedBytes(long newConfirmedBytes) {
+ this.confirmedBytes = newConfirmedBytes;
+ this.confirmedBytesUpdated.signalAll();
+ }
+
+ @Override
+ final void updateStateFromResponse(BidiWriteObjectResponse response) {
+ lock.lock();
+ try {
+ long persistedSize = -1;
+ if (response.hasPersistedSize()) {
+ persistedSize = response.getPersistedSize();
+ } else if (response.hasResource()) {
+ persistedSize = response.getResource().getSize();
+ lastResponseWithResource = response;
+ generation = lastResponseWithResource.getResource().getGeneration();
+ }
+ checkState(persistedSize > -1, "persistedSize > -1 (%s > -1)", persistedSize);
+ checkArgument(
+ persistedSize >= confirmedBytes,
+ "(persistedSize >= confirmedBytes) (%s >= %s)",
+ response,
+ confirmedBytes);
+ validateCurrentStateIsOneOf(
+ State.INITIALIZING, State.TAKEOVER, State.RUNNING, State.RETRYING);
+ routingToken = null;
+ // todo: test more permutations where this might be true
+ // 1. retry, object not yet created
+ if (state == State.INITIALIZING) {
+ setConfirmedBytes(persistedSize);
+ totalSentBytes = Math.max(totalSentBytes, persistedSize);
+ }
+ if (state == State.INITIALIZING || state == State.RETRYING) {
+ transitionTo(
+ stateToReturnToAfterRetry != null ? stateToReturnToAfterRetry : State.RUNNING);
+ }
+
+ boolean signalTerminalSuccess = false;
+ BidiWriteObjectRequest peek;
+ while ((peek = peekFirst()) != null) {
+ if (peek.hasChecksummedData()) {
+ int size = peek.getChecksummedData().getContent().size();
+ long endOffset = peek.getWriteOffset() + size;
+ if (endOffset <= persistedSize) {
+ poll();
+ setConfirmedBytes(endOffset);
+ enqueuedBytes -= size;
+ minByteOffset = peek.getWriteOffset();
+ } else {
+ break;
+ }
+ } else if (peek.hasOneof(FIRST_MESSAGE_DESCRIPTOR)) {
+ poll();
+ } else if (peek.getFlush()) {
+ if (finalFlushSent && persistedSize == totalSentBytes) {
+ setConfirmedBytes(persistedSize);
+ signalTerminalSuccess = true;
+ poll();
+ } else if (persistedSize >= peek.getWriteOffset()) {
+ setConfirmedBytes(persistedSize);
+ poll();
+ } else {
+ break;
+ }
+ } else if (peek.getFinishWrite()) {
+ checkState(
+ enqueuedBytes == 0,
+ "attempting to evict finish_write: true while bytes are still enqueued");
+ if (response.hasResource() && persistedSize == totalSentBytes) {
+ setConfirmedBytes(persistedSize);
+ if (response.getResource().hasFinalizeTime()) {
+ signalTerminalSuccess = true;
+ poll();
+ } else {
+ break;
+ }
+ } else {
+ break;
+ }
+ } else {
+ //noinspection DataFlowIssue
+ checkState(false, "peek = {%s}, response = {%s}", fmtProto(peek), fmtProto(response));
+ }
+ }
+
+ if (pendingReconciliation != null) {
+ pendingReconciliation.set(null);
+ pendingReconciliation = null;
+ }
+
+ if (signalTerminalSuccess && lastResponseWithResource != null) {
+ BidiWriteObjectResponse.Builder b = lastResponseWithResource.toBuilder();
+ b.getResourceBuilder().setSize(confirmedBytes);
+ b.getResourceBuilder().getChecksumsBuilder().clearMd5Hash().clearCrc32C();
+ if (cumulativeCrc32c != null) {
+ b.getResourceBuilder().getChecksumsBuilder().setCrc32C(cumulativeCrc32c.getValue());
+ }
+ BidiWriteObjectResponse updated = b.build();
+ resultFuture.set(updated);
+ terminalSuccess();
+ } else if (signalTerminalSuccess) {
+ checkState(false, "signalTerminalSuccess without prior resource response");
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ final void updateFromRedirect(@NonNull BidiWriteObjectRedirectedError redirect) {
+ lock.lock();
+ try {
+ validateCurrentStateIsOneOf(
+ State.INITIALIZING, State.RUNNING, State.PENDING_RETRY, State.RETRYING);
+ if (redirect.hasWriteHandle()) {
+ this.writeHandle = redirect.getWriteHandle();
+ }
+ if (redirect.hasRoutingToken()) {
+ routingToken = redirect.getRoutingToken();
+ }
+ if (redirect.hasGeneration()) {
+ if (generation > 0) {
+ checkState(
+ generation == redirect.getGeneration(),
+ "Generation changed: (generation == redirect.getGeneration()) (%s == %s)",
+ generation,
+ redirect.getGeneration());
+ }
+ generation = redirect.getGeneration();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ final void terminalError() {
+ lock.lock();
+ try {
+ validateCurrentStateIsOneOf(State.allNonTerminal);
+ transitionTo(State.TERMINAL_ERROR);
+ if (pendingReconciliation != null) {
+ pendingReconciliation.cancel(true);
+ }
+ stateUpdated.signalAll();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private void terminalSuccess() {
+ lock.lock();
+ try {
+ validateCurrentStateIsOneOf(State.allNonTerminal);
+ transitionTo(State.TERMINAL_SUCCESS);
+ stateUpdated.signalAll();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ final void pendingRetry() {
+ lock.lock();
+ try {
+ validateCurrentStateIsOneOf(State.allNonTerminal);
+ stateToReturnToAfterRetry = state;
+ transitionTo(State.PENDING_RETRY);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ final void retrying() {
+ lock.lock();
+ try {
+ validateCurrentStateIsOneOf(State.PENDING_RETRY, State.INITIALIZING, State.TAKEOVER);
+ transitionTo(State.RETRYING);
+ lastSentRequestIndex = -1;
+ finishWriteSent = false;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ final boolean isFinalizing() {
+ lock.lock();
+ try {
+ return finishWriteSignaled && finishWriteSent;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ ApiFuture beginReconciliation() {
+ lock.lock();
+ try {
+ if (pendingReconciliation == null) {
+ pendingReconciliation = SettableApiFuture.create();
+ }
+ return pendingReconciliation;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ final void sendVia(Consumer consumer) {
+ lock.lock();
+ try {
+ validateCurrentStateIsOneOf(
+ State.INITIALIZING, State.RUNNING, State.RETRYING, State.TAKEOVER);
+ BidiWriteObjectRequest prev = null;
+ int i = lastSentRequestIndex + 1;
+ for (; i < queue.size(); i++) {
+ BidiWriteObjectRequest m = queue.get(i);
+ lastSentRequestIndex = i;
+ if (state == State.RETRYING) {
+ prev = m;
+ break; // if retrying only send the first message
+ }
+
+ if (prev != null) {
+ // never compact bytes, purely for simplicity’s sake. ByteString won't copy when
+ // concatenating two values together, but there is a limit on how many bytes can be in
+ // an
+ // individual message, and it's much easier to not have to worry about all of that here.
+ // We're mainly wanting to ensure things like flush/finish are packed into the last data
+ // message, and the first data message is included with the initial request if no state
+ // reconciliation needs to take place.
+ if (prev.hasChecksummedData() && m.hasChecksummedData()) {
+ consumer.accept(prev);
+ prev = m;
+ } else {
+ prev = concatenate(prev, m);
+ }
+ } else {
+ prev = m;
+ }
+ }
+ if (prev != null) {
+ if (prev.getFinishWrite()) {
+ finishWriteSent = true;
+ } else if (prev.getFlush() && prev.getStateLookup() && finalFlushSignaled) {
+ finalFlushSent = true;
+ }
+ consumer.accept(prev);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private void prepend(BidiWriteObjectRequest e) {
+ queue.add(0, e);
+ enqueuedBytes = enqueuedBytes + e.getChecksummedData().getContent().size();
+ }
+
+ private void append(BidiWriteObjectRequest e) {
+ queue.add(e);
+ enqueuedBytes = enqueuedBytes + e.getChecksummedData().getContent().size();
+ }
+
+ @Override
+ final @Nullable BidiWriteObjectRequest peekLast() {
+ lock.lock();
+ try {
+ int index = queue.size() - 1;
+ if (index < 0) {
+ return null;
+ }
+ return queue.get(index);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @VisibleForTesting
+ @Override
+ final @Nullable BidiWriteObjectRequest peekFirst() {
+ lock.lock();
+ try {
+ if (queue.isEmpty()) {
+ return null;
+ }
+ return queue.get(0);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private void poll() {
+ BidiWriteObjectRequest remove = queue.remove(0);
+ if (remove != null) {
+ lastSentRequestIndex = Math.max(lastSentRequestIndex - 1, -1);
+ }
+ }
+
+ protected final void transitionTo(State state) {
+ this.state = state;
+ stateUpdated.signalAll();
+ }
+
+ protected final void validateCurrentStateIsOneOf(State... allowed) {
+ checkState(
+ state.in(allowed),
+ "state mismatch. expected one of %s but is %s",
+ Arrays.toString(allowed),
+ state);
+ }
+
+ private void checkNotFinalizing() {
+ checkState(
+ !finishWriteSignaled,
+ "Attempting to append bytes even though finalization has previously been signaled.");
+ }
+
+ protected final boolean internalOffer(BidiWriteObjectRequest e) {
+ Consumer add = this::append;
+ if (e.hasOneof(FIRST_MESSAGE_DESCRIPTOR)) {
+ if (!queue.isEmpty() && queue.get(0).hasOneof(FIRST_MESSAGE_DESCRIPTOR)) {
+ poll(); // dequeue the existing first message
+ }
+ add = this::prepend;
+ }
+ if (e.getFinishWrite()) {
+ finishWriteSignaled = true;
+ }
+
+ if (e.hasChecksummedData() && !finishWriteSignaled) {
+ ChecksummedData checksummedData = e.getChecksummedData();
+ int size = checksummedData.getContent().size();
+ if (size <= availableCapacity()) {
+ totalSentBytes += size;
+ add.accept(e);
+ return true;
+ }
+ return false;
+ } else {
+ add.accept(e);
+ return true;
+ }
+ }
+
+ @Nullable
+ private Crc32cLengthKnown crc32cConcat(@Nullable Crc32cLengthKnown rhs) {
+ if (cumulativeCrc32c == null) {
+ return null;
+ }
+ requireNonNull(rhs, "rhs must be non null");
+ return cumulativeCrc32c.concat(rhs);
+ }
+
+ @Override
+ public SettableApiFuture getResultFuture() {
+ return resultFuture;
+ }
+
+ @Override
+ void awaitState(State... anyOf) throws InterruptedException {
+ lock.lock();
+ try {
+ ImmutableSet states = ImmutableSet.copyOf(anyOf);
+ while (!states.contains(this.state) && !stateUpdated.await(5, TimeUnit.MILLISECONDS)) {
+ if (resultFuture.isDone()) {
+ return;
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void awaitTakeoverStateReconciliation(Runnable restart) {
+ try {
+ pendingRetry();
+ restart.run();
+ awaitState(State.RUNNING);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw StorageException.coalesce(e);
+ }
+ }
+
+ @Override
+ public void awaitAck(long writeOffset) throws InterruptedException {
+ lock.lock();
+ try {
+ while (confirmedBytes < writeOffset
+ && !confirmedBytesUpdated.await(5, TimeUnit.MILLISECONDS)) {
+ if (resultFuture.isDone()) {
+ return;
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ abstract static class AppendableUploadState extends BaseUploadState {
+
+ private AppendableUploadState(
+ BidiWriteObjectRequest initial,
+ Supplier baseCallContext,
+ long maxBytes,
+ SettableApiFuture resultFuture,
+ @Nullable Crc32cLengthKnown initialCrc32c,
+ State startingState) {
+ super(initial, baseCallContext, maxBytes, resultFuture, initialCrc32c, startingState);
+ }
+
+ protected abstract String getBucket();
+
+ protected abstract BidiWriteObjectRequest.Builder getBuilder();
+
+ @Override
+ public @NonNull GrpcCallContext enqueueFirstMessageAndGetGrpcCallContext() {
+ super.lock.lock();
+ try {
+ if (!state.in(State.INITIALIZING, State.RETRYING, State.TAKEOVER)) {
+ return lastOpenArguments.getCtx();
+ }
+ ImmutableMap> xGoogRequestParams =
+ makeHeadersMap(
+ Stream.of(
+ "bucket=" + this.getBucket(),
+ "appendable=true",
+ routingToken != null ? "routing_token=" + routingToken : null));
+ GrpcCallContext context = baseCallContext.get().withExtraHeaders(xGoogRequestParams);
+
+ BidiWriteObjectRequest.Builder b = this.getBuilder();
+ if (state == State.RETRYING) {
+ b.setStateLookup(true);
+ }
+ BidiWriteObjectRequest req = b.build();
+ OpenArguments openArguments = new OpenArguments(req, context);
+ internalOffer(req);
+ lastOpenArguments = openArguments;
+ return openArguments.getCtx();
+ } finally {
+ super.lock.unlock();
+ }
+ }
+
+ @Override
+ @Nullable StorageException onResponse(BidiWriteObjectResponse response) {
+ lock.lock();
+ try {
+ validateCurrentStateIsOneOf(State.allNonTerminal);
+
+ if (response.hasWriteHandle()) {
+ this.writeHandle = response.getWriteHandle();
+ }
+
+ boolean incremental = !response.hasResource();
+ long persistedSize = -1;
+ if (response.hasPersistedSize()) {
+ persistedSize = response.getPersistedSize();
+ } else if (response.hasResource()) {
+ persistedSize = response.getResource().getSize();
+ }
+ checkState(persistedSize > -1, "persistedSize > -1 (%s > -1)", persistedSize);
+ if (state == State.TAKEOVER || stateToReturnToAfterRetry == State.TAKEOVER) {
+ totalSentBytes = persistedSize;
+ setConfirmedBytes(persistedSize);
+ if (response.hasResource()
+ && response.getResource().hasChecksums()
+ && response.getResource().getChecksums().hasCrc32C()) {
+ cumulativeCrc32c =
+ Crc32cValue.of(response.getResource().getChecksums().getCrc32C(), persistedSize);
+ }
+ updateStateFromResponse(response);
+ transitionTo(State.RUNNING);
+ return null;
+ }
+
+ long totalSentBytes = getTotalSentBytes();
+ long minWriteOffset = minByteOffset;
+ boolean finalizing = isFinalizing();
+
+ if (!finalizing && incremental) {
+ if (persistedSize == totalSentBytes) {
+ updateStateFromResponse(response);
+ } else if (persistedSize < totalSentBytes) {
+ updateStateFromResponse(response);
+ } else {
+ return err(UploadFailureScenario.SCENARIO_7, response);
+ }
+ } else if (finalizing && !incremental) {
+ if (persistedSize == totalSentBytes) {
+ updateStateFromResponse(response);
+ } else if (persistedSize < totalSentBytes) {
+ if (persistedSize > minWriteOffset) {
+ updateStateFromResponse(response);
+ } else if (lastResponseWithResource != null) {
+ return err(UploadFailureScenario.SCENARIO_4_1, response);
+ }
+ } else {
+ return err(UploadFailureScenario.SCENARIO_4_2, response);
+ }
+ } else if (!finalizing /* && !incremental*/) {
+ // generally the first response from the server
+ if (persistedSize <= totalSentBytes) {
+ updateStateFromResponse(response);
+ } else {
+ return err(UploadFailureScenario.SCENARIO_7, response);
+ }
+ } else /* (finalizing && incremental) */ {
+ // might happen if a `flush: true, state_lookup: true, finish_write: true`
+ if (persistedSize == totalSentBytes) {
+ updateStateFromResponse(response);
+ } else if (persistedSize < totalSentBytes) {
+ if (persistedSize > minWriteOffset) {
+ updateStateFromResponse(response);
+ } else if (lastResponseWithResource != null) {
+ return err(UploadFailureScenario.SCENARIO_3, response);
+ }
+ } else {
+ return err(UploadFailureScenario.SCENARIO_2, response);
+ }
+ }
+
+ return null;
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ static final class NewAppendableUploadState extends AppendableUploadState {
+ private final WriteObjectSpec spec;
+
+ private NewAppendableUploadState(
+ BidiWriteObjectRequest initial,
+ WriteObjectSpec spec,
+ Supplier baseCallContext,
+ long maxBytes,
+ SettableApiFuture resultFuture,
+ @Nullable Crc32cLengthKnown initialCrc32c) {
+ super(initial, baseCallContext, maxBytes, resultFuture, initialCrc32c, State.INITIALIZING);
+ this.spec = spec;
+ }
+
+ @Override
+ protected String getBucket() {
+ return spec.getResource().getBucket();
+ }
+
+ @Override
+ protected BidiWriteObjectRequest.Builder getBuilder() {
+ BidiWriteObjectRequest.Builder b = BidiWriteObjectRequest.newBuilder();
+ if (confirmedBytes >= 0) {
+ checkState(generation > 0, "generation > 0");
+
+ AppendObjectSpec.Builder aosb =
+ AppendObjectSpec.newBuilder()
+ .setBucket(spec.getResource().getBucket())
+ .setObject(spec.getResource().getName())
+ .setGeneration(generation);
+ if (spec.hasIfMetagenerationMatch()) {
+ aosb.setIfMetagenerationMatch(spec.getIfMetagenerationMatch());
+ }
+ if (spec.hasIfMetagenerationNotMatch()) {
+ aosb.setIfMetagenerationNotMatch(spec.getIfMetagenerationMatch());
+ }
+ ifNonNull(routingToken, aosb::setRoutingToken);
+ ifNonNull(writeHandle, aosb::setWriteHandle);
+ b.setAppendObjectSpec(aosb);
+ } else {
+ b.setWriteObjectSpec(spec);
+ }
+ return b;
+ }
+ }
+
+ static final class TakeoverAppendableUploadState extends AppendableUploadState {
+ private final AppendObjectSpec spec;
+
+ private TakeoverAppendableUploadState(
+ BidiWriteObjectRequest initial,
+ AppendObjectSpec spec,
+ Supplier baseCallContext,
+ long maxBytes,
+ SettableApiFuture resultFuture,
+ @Nullable Crc32cLengthKnown initialCrc32c) {
+ super(initial, baseCallContext, maxBytes, resultFuture, initialCrc32c, State.TAKEOVER);
+ this.spec = spec;
+ }
+
+ @Override
+ protected String getBucket() {
+ return spec.getBucket();
+ }
+
+ @Override
+ protected BidiWriteObjectRequest.Builder getBuilder() {
+ AppendObjectSpec.Builder aosb = spec.toBuilder();
+ ifNonNull(routingToken, aosb::setRoutingToken);
+ ifNonNull(writeHandle, aosb::setWriteHandle);
+ return BidiWriteObjectRequest.newBuilder().setAppendObjectSpec(aosb);
+ }
+ }
+
+ static final class OpenArguments {
+
+ private final BidiWriteObjectRequest req;
+ private final GrpcCallContext ctx;
+
+ private OpenArguments(BidiWriteObjectRequest req, GrpcCallContext ctx) {
+ this.req = req;
+ this.ctx = ctx;
+ }
+
+ public BidiWriteObjectRequest getReq() {
+ return req;
+ }
+
+ public GrpcCallContext getCtx() {
+ return ctx;
+ }
+ }
+}
diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadStreamingStream.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadStreamingStream.java
new file mode 100644
index 0000000000..6da243e541
--- /dev/null
+++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadStreamingStream.java
@@ -0,0 +1,608 @@
+/*
+ * Copyright 2025 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.storage;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutureCallback;
+import com.google.api.core.ApiFutures;
+import com.google.api.core.SettableApiFuture;
+import com.google.api.gax.grpc.GrpcCallContext;
+import com.google.api.gax.rpc.BidiStreamingCallable;
+import com.google.api.gax.rpc.ClientStream;
+import com.google.api.gax.rpc.ResponseObserver;
+import com.google.api.gax.rpc.StreamController;
+import com.google.cloud.BaseServiceException;
+import com.google.cloud.storage.BidiUploadState.State;
+import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown;
+import com.google.cloud.storage.RetryContext.OnFailure;
+import com.google.cloud.storage.RetryContext.OnSuccess;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.storage.v2.BidiWriteObjectRedirectedError;
+import com.google.storage.v2.BidiWriteObjectRequest;
+import com.google.storage.v2.BidiWriteObjectResponse;
+import com.google.storage.v2.ObjectChecksums;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+import org.checkerframework.checker.nullness.qual.EnsuresNonNull;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.checkerframework.checker.nullness.qual.RequiresNonNull;
+
+/**
+ * A class that helps tie together a {@link BidiUploadState}, {@link RetryContext} and underlying
+ * gRPC bidi stream.
+ *
+ *
This class helps transparently handle retries in the event an error is observed, and will
+ * handle redirect(s) if they occur, all without the need for the caller of this class to know about
+ * those things and the state need to worry about how retries will happen.
+ */
+final class BidiUploadStreamingStream {
+
+ private final BidiUploadState state;
+ private final BidiStreamingCallable write;
+ // private final UnaryCallable get;
+ private final ScheduledExecutorService executor;
+ private final RetryContext retryContext;
+ private final OnSuccess onSuccess;
+ private final OnFailure onFailure;
+ private final ReentrantLock lock;
+ private final int maxRedirectsAllowed;
+ private final AtomicInteger redirectCounter;
+
+ private volatile @Nullable StreamTuple stream;
+ private volatile @Nullable ApiFuture pendingReconciliation;
+
+ BidiUploadStreamingStream(
+ BidiUploadState state,
+ ScheduledExecutorService executor,
+ BidiStreamingCallable write,
+ int maxRedirectsAllowed,
+ RetryContext retryContext) {
+ this.state = state;
+ this.executor = executor;
+ this.write = write;
+ this.lock = new ReentrantLock();
+ this.retryContext = new StreamRetryContextDecorator(retryContext, lock, this::reset);
+ this.onSuccess = this::restart;
+ this.onFailure =
+ t -> {
+ SettableApiFuture resultFuture = state.getResultFuture();
+ if (!resultFuture.isDone()) {
+ this.state.terminalError();
+ BaseServiceException coalesced = StorageException.coalesce(t);
+ resultFuture.setException(coalesced);
+ }
+ };
+ this.maxRedirectsAllowed = maxRedirectsAllowed;
+ this.redirectCounter = new AtomicInteger();
+ }
+
+ public ApiFuture getResultFuture() {
+ return state.getResultFuture();
+ }
+
+ public boolean append(ChunkSegmenter.@NonNull ChunkSegment data) {
+ lock.lock();
+ try {
+ boolean offered = state.offer(data);
+ if (offered) {
+ internalSend();
+ }
+ return offered;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public boolean appendAndFlush(ChunkSegmenter.@NonNull ChunkSegment data) {
+ lock.lock();
+ try {
+ boolean offered = state.offer(data);
+ if (offered) {
+ flush();
+ }
+ return offered;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public boolean appendAndFinalize(ChunkSegmenter.@NonNull ChunkSegment data) {
+ lock.lock();
+ try {
+ boolean offered = state.offer(data);
+ if (offered) {
+ finishWrite(state.getTotalSentBytes());
+ }
+ return offered;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void flush() {
+ lock.lock();
+ try {
+ BidiWriteObjectRequest flush =
+ BidiWriteObjectRequest.newBuilder()
+ .setWriteOffset(state.getTotalSentBytes())
+ .setFlush(true)
+ .setStateLookup(true)
+ .build();
+ // if our flush is already enqueued, simply tick to make sure things are sent
+ if (flush.equals(state.peekLast())) {
+ internalSend();
+ return;
+ }
+ boolean offered = state.offer(flush);
+ if (offered) {
+ internalSend();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public boolean finishWrite(long length) {
+ lock.lock();
+ try {
+ // if we're already finalizing, ack rather than enqueueing again
+ if (state.isFinalizing() && state.getTotalSentBytes() == length) {
+ return true;
+ }
+
+ BidiWriteObjectRequest.Builder b =
+ BidiWriteObjectRequest.newBuilder().setWriteOffset(length).setFinishWrite(true);
+ Crc32cLengthKnown cumulativeCrc32c = state.getCumulativeCrc32c();
+ if (cumulativeCrc32c != null) {
+ b.setObjectChecksums(
+ ObjectChecksums.newBuilder().setCrc32C(cumulativeCrc32c.getValue()).build());
+ }
+ BidiWriteObjectRequest msg = b.build();
+ boolean offer = state.offer(msg);
+ if (offer) {
+ internalSend();
+ }
+ return offer;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public boolean closeStream(long length) {
+ lock.lock();
+ try {
+
+ boolean offer = state.finalFlush(length);
+ if (offer) {
+ internalSend();
+ }
+ return offer;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void sendClose() {
+ lock.lock();
+ try {
+ StreamTuple tmp = getStream();
+ if (tmp != null) {
+ tmp.closeSend();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void awaitTakeoverStateReconciliation() {
+ state.awaitTakeoverStateReconciliation(this::restart);
+ }
+
+ void awaitAckOf(long writeOffset) throws InterruptedException {
+ state.awaitAck(writeOffset);
+ }
+
+ /**
+ * It is possible for this value to change after reading, however it is guaranteed that the amount
+ * of available capacity will only ever increase.
+ *
+ *
The only way this value is impacted by a background thread is if buffer space is released.
+ * Buffer consumption can only happen from the same thread that would invoke this method.
+ */
+ long availableCapacity() {
+ return state.availableCapacity();
+ }
+
+ /** expected to be called from a background thread provided by {@link #executor}. */
+ @VisibleForTesting
+ void restart() {
+ lock.lock();
+ try {
+ checkState(stream == null, "attempting to restart stream when stream is already active");
+ state.retrying();
+ ApiFuture reconciliation = state.beginReconciliation();
+ // read the current volatile value
+ ApiFuture tmpPendingReconciliation = pendingReconciliation;
+ StreamTuple tmp = initStreamTuple();
+ state.sendVia(tmp);
+ // Intentionally using reference equality.
+ // Only register the callback if we haven't previously registered it.
+ // We want to avoid any error/cancellation on a long-running reconciliation being registered
+ // in retry context multiple times.
+ // Unfortunately, ApiFuture doesn't provide "isCallbackRegistered" so we need to track this
+ // ourselves.
+ if (reconciliation != tmpPendingReconciliation) {
+ ApiFutures.addCallback(
+ reconciliation,
+ new ApiFutureCallback() {
+ @Override
+ public void onFailure(Throwable t) {
+ lock.lock();
+ try {
+ BidiUploadStreamingStream.this.pendingReconciliation = null;
+ } finally {
+ lock.unlock();
+ }
+ retryContext.recordError(t, onSuccess, onFailure);
+ }
+
+ @Override
+ public void onSuccess(Void result) {
+ lock.lock();
+ try {
+ BidiUploadStreamingStream.this.pendingReconciliation = null;
+ } finally {
+ lock.unlock();
+ }
+ // when the reconciliation completes, trigger sending the rest of the messages
+ // that might be in the queue.
+ // re-get the stream so that if a retry is in progress we don't attempt to send
+ // to a stream that was broken after reconciliation.
+ StreamTuple tmp = getStream();
+ if (tmp != null) {
+ state.sendVia(tmp);
+ }
+ }
+ },
+ executor);
+ pendingReconciliation = reconciliation;
+ }
+ stream = tmp;
+ } catch (Throwable t) {
+ retryContext.recordError(t, onSuccess, onFailure);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @VisibleForTesting
+ void reset() {
+ lock.lock();
+ try {
+ StreamTuple tmp = stream;
+ if (tmp != null) {
+ tmp.in.flagTombstoned();
+ tmp.closeSend();
+ stream = null;
+ state.pendingRetry();
+ }
+ } catch (Throwable t) {
+ // if any exception is thrown, catch it and funnel it into retryContext so that it is surfaced
+ // to the application.
+ retryContext.recordError(t, onSuccess, onFailure);
+ // Then throw it to prevent the current thread from running any following steps. Not ideal,
+ // but this can execute on a background thread that the application will never see.
+ // throw t;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private @Nullable StreamTuple getStream() {
+ if (stream == null && state.getState() == State.INITIALIZING) {
+ stream = initStreamTuple();
+ }
+ return stream;
+ }
+
+ private StreamTuple initStreamTuple() {
+ GrpcCallContext grpcCallContext = state.enqueueFirstMessageAndGetGrpcCallContext();
+ StreamingResponseObserver streamResponseObserver =
+ new StreamingResponseObserver(state, retryContext, onSuccess, onFailure);
+ RedirectHandlingResponseObserver responseObserver =
+ new RedirectHandlingResponseObserver(
+ state,
+ streamResponseObserver,
+ redirectCounter,
+ maxRedirectsAllowed,
+ this::reset,
+ () -> executor.execute(this::restart));
+ ClientStream clientStream =
+ write.splitCall(responseObserver, grpcCallContext);
+ GracefulOutboundStream out = new GracefulOutboundStream(clientStream);
+
+ return new StreamTuple(out, responseObserver);
+ }
+
+ private void internalSend() {
+ StreamTuple tmp = getStream();
+ if (tmp != null) {
+ state.sendVia(tmp);
+ }
+ }
+
+ private static final class StreamTuple implements Consumer {
+ private final ClientStream out;
+ private final RedirectHandlingResponseObserver in;
+
+ StreamTuple(ClientStream out, RedirectHandlingResponseObserver in) {
+ this.out = out;
+ this.in = in;
+ }
+
+ @Override
+ public void accept(BidiWriteObjectRequest bidiWriteObjectRequest) {
+ out.send(bidiWriteObjectRequest);
+ }
+
+ public void closeSend() {
+ in.flagTombstoned();
+ out.closeSend();
+ }
+ }
+
+ static final class StreamingResponseObserver
+ implements ResponseObserver {
+
+ private final BidiUploadState state;
+ private final RetryContext retryContext;
+ private final OnSuccess onSuccess;
+ private final OnFailure onFailure;
+
+ @MonotonicNonNull private StreamController controller;
+
+ StreamingResponseObserver(
+ BidiUploadState state,
+ RetryContext retryContext,
+ OnSuccess onSuccess,
+ OnFailure onFailure) {
+ this.state = state;
+ this.retryContext = retryContext;
+ this.onSuccess = onSuccess;
+ this.onFailure = onFailure;
+ }
+
+ @EnsuresNonNull("controller")
+ @Override
+ public void onStart(StreamController controller) {
+ this.controller = controller;
+ controller.disableAutoInboundFlowControl();
+ controller.request(1);
+ }
+
+ @RequiresNonNull("controller")
+ @Override
+ public void onResponse(BidiWriteObjectResponse response) {
+ try {
+ controller.request(1);
+ @Nullable StorageException se = state.onResponse(response);
+ if (se != null) {
+ retryContext.recordError(se, onSuccess, onFailure);
+ }
+ } catch (Throwable t) {
+ // catch an error that might happen while processing and forward it to our retry context
+ retryContext.recordError(t, onSuccess, onFailure);
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ retryContext.recordError(t, onSuccess, onFailure);
+ }
+
+ @Override
+ public void onComplete() {
+ // ignore
+ }
+ }
+
+ static final class RedirectHandlingResponseObserver
+ implements ResponseObserver {
+ private final BidiUploadState state;
+ private final ResponseObserver delegate;
+ private final AtomicInteger redirectCounter;
+ private final int maxRedirectsAllowed;
+ private final Runnable beforeRedirect;
+ private final Runnable onRedirect;
+
+ private volatile boolean tombstoned;
+
+ RedirectHandlingResponseObserver(
+ BidiUploadState state,
+ ResponseObserver delegate,
+ AtomicInteger redirectCounter,
+ int maxRedirectsAllowed,
+ Runnable beforeRedirect,
+ Runnable onRedirect) {
+ this.state = state;
+ this.delegate = delegate;
+ this.redirectCounter = redirectCounter;
+ this.maxRedirectsAllowed = maxRedirectsAllowed;
+ this.beforeRedirect = beforeRedirect;
+ this.onRedirect = onRedirect;
+ this.tombstoned = false;
+ }
+
+ /**
+ * mark this observer instance as tombstoned, this will cause it to ignore any invocations of
+ * its methods.
+ *
+ *
When we are going to retry a client detected error instead of a server detected one, we
+ * want to effectively ignore any following message that might already be inflight from the
+ * server.
+ */
+ void flagTombstoned() {
+ tombstoned = true;
+ }
+
+ @Override
+ public void onStart(StreamController controller) {
+ if (tombstoned) {
+ return;
+ }
+ delegate.onStart(controller);
+ }
+
+ @Override
+ public void onResponse(BidiWriteObjectResponse response) {
+ if (tombstoned) {
+ return;
+ }
+ redirectCounter.set(0);
+ delegate.onResponse(response);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ if (tombstoned) {
+ return;
+ }
+ BidiWriteObjectRedirectedError error = GrpcUtils.getBidiWriteObjectRedirectedError(t);
+ if (error == null) {
+ delegate.onError(t);
+ return;
+ }
+ int redirectCount = redirectCounter.incrementAndGet();
+ if (redirectCount > maxRedirectsAllowed) {
+ // attach the fact we're ignoring the redirect to the original exception as a suppressed
+ // Exception. The lower level handler can then perform its usual handling, but if things
+ // bubble all the way up to the invoker we'll be able to see it in a bug report.
+ t.addSuppressed(new MaxRedirectsExceededException(maxRedirectsAllowed, redirectCount));
+ delegate.onError(t);
+ return;
+ }
+ beforeRedirect.run();
+ state.updateFromRedirect(error);
+ onRedirect.run();
+ }
+
+ @Override
+ public void onComplete() {
+ if (tombstoned) {
+ return;
+ }
+ delegate.onComplete();
+ }
+ }
+
+ /**
+ * Prevent "already half-closed" if we previously called onComplete but then detect an error and
+ * call onError
+ */
+ private static final class GracefulOutboundStream
+ implements ClientStream {
+
+ private final ClientStream delegate;
+ private volatile boolean closing;
+
+ private GracefulOutboundStream(ClientStream delegate) {
+ this.delegate = delegate;
+ this.closing = false;
+ }
+
+ @Override
+ public boolean isSendReady() {
+ return delegate.isSendReady();
+ }
+
+ @Override
+ public void send(BidiWriteObjectRequest request) {
+ delegate.send(request);
+ }
+
+ @Override
+ public void closeSendWithError(Throwable t) {
+ if (closing) {
+ return;
+ }
+ closing = true;
+ delegate.closeSendWithError(t);
+ }
+
+ @Override
+ public void closeSend() {
+ if (closing) {
+ return;
+ }
+ closing = true;
+ delegate.closeSend();
+ }
+ }
+
+ /**
+ * Decorate a RetryContext to allow observing the invocation of {@link #recordError(Throwable,
+ * OnSuccess, OnFailure)}. This allows us to clear out the pending stream before a retry.
+ */
+ @VisibleForTesting
+ static final class StreamRetryContextDecorator implements RetryContext {
+ private final RetryContext retryContext;
+ private final ReentrantLock lock;
+ private final Runnable onRecordError;
+
+ @VisibleForTesting
+ StreamRetryContextDecorator(
+ RetryContext retryContext, ReentrantLock lock, Runnable onRecordError) {
+ this.retryContext = retryContext;
+ this.lock = lock;
+ this.onRecordError = onRecordError;
+ }
+
+ @Override
+ public boolean inBackoff() {
+ return retryContext.inBackoff();
+ }
+
+ @Override
+ public void reset() {
+ retryContext.reset();
+ }
+
+ @Override
+ public void recordError(
+ T t, OnSuccess onSuccess, OnFailure onFailure) {
+ lock.lock();
+ try {
+ try {
+ onRecordError.run();
+ } catch (Throwable tt) {
+ t.addSuppressed(tt);
+ onFailure.onFailure(t);
+ return;
+ }
+ retryContext.recordError(t, onSuccess, onFailure);
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+}
diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUpload.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUpload.java
index b79a290969..056f665ab6 100644
--- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUpload.java
+++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUpload.java
@@ -23,6 +23,7 @@
import com.google.cloud.storage.Storage.BlobWriteOption;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
import java.nio.channels.WritableByteChannel;
import java.util.concurrent.TimeUnit;
@@ -90,6 +91,8 @@ public interface BlobAppendableUpload extends BlobWriteSession {
*
This interface allows writing bytes to an Appendable Upload, and provides methods to close
* this channel -- optionally finalizing the upload.
*
+ *
The {@link #write(ByteBuffer)} method of this channel is non-blocking.
+ *
* @since 2.51.0 This new api is in preview and is subject to breaking changes.
*/
@BetaApi
@@ -97,7 +100,47 @@ public interface BlobAppendableUpload extends BlobWriteSession {
interface AppendableUploadWriteableByteChannel extends WritableByteChannel {
/**
- * Finalize the upload and close this instance to further {@link #write(ByteBuffer)}ing. This
+ * This method is non-blocking
+ *
+ *
Consume as many bytes as can fit in the underlying outbound queue. The size of the
+ * outbound queue is determined from {@link BlobAppendableUploadConfig#getFlushPolicy()}{@code
+ * .}{@link FlushPolicy#getMaxPendingBytes() getMaxPendingBytes()}. If the outbound queue is
+ * full, and can not fit more bytes, this method will return 0.
+ *
+ *
If your application needs to empty its ByteBuffer before progressing, use our helper
+ * method {@link StorageChannelUtils#blockingEmptyTo(ByteBuffer, WritableByteChannel)} like so:
+ *
+ *
+ *
+ * @param src The buffer from which bytes are to be retrieved
+ * @return The number of bytes written, possibly zero
+ * @throws ClosedChannelException If this channel is closed
+ * @throws IOException If some other I/O error occurs
+ */
+ @Override
+ int write(ByteBuffer src) throws IOException;
+
+ /**
+ * This method is blocking
+ *
+ *
Block the invoking thread, waiting until the number of bytes written so far has been
+ * acknowledged by Google Cloud Storage.
+ *
+ * @throws IOException if an error happens while waiting for the flush to complete
+ * @throws java.io.InterruptedIOException if the current thread is interrupted while waiting
+ * @since 2.56.0 This new api is in preview and is subject to breaking changes.
+ */
+ @BetaApi
+ void flush() throws IOException;
+
+ /**
+ * This method is blocking
+ *
+ *
Finalize the upload and close this instance to further {@link #write(ByteBuffer)}ing. This
* will close any underlying stream and release any releasable resources once out of scope.
*
*
Once this method is called, and returns no more writes to the object will be allowed by
@@ -116,8 +159,11 @@ interface AppendableUploadWriteableByteChannel extends WritableByteChannel {
void finalizeAndClose() throws IOException;
/**
- * Close this instance to further {@link #write(ByteBuffer)}ing without finalizing the upload.
- * This will close any underlying stream and release any releasable resources once out of scope.
+ * This method is blocking
+ *
+ *
Close this instance to further {@link #write(ByteBuffer)}ing without finalizing the
+ * upload. This will close any underlying stream and release any releasable resources once out
+ * of scope.
*
*
This method, {@link AppendableUploadWriteableByteChannel#finalizeAndClose()} and {@link
* AppendableUploadWriteableByteChannel#close()} are mutually exclusive. If one of the other
@@ -133,7 +179,9 @@ interface AppendableUploadWriteableByteChannel extends WritableByteChannel {
void closeWithoutFinalizing() throws IOException;
/**
- * Close this instance to further {@link #write(ByteBuffer)}ing.
+ * This method is blocking
+ *
+ *
Close this instance to further {@link #write(ByteBuffer)}ing.
*
*
Whether the upload is finalized during this depends on the {@link
* BlobAppendableUploadConfig#getCloseAction()} provided to create the {@link
diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUploadConfig.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUploadConfig.java
index ae95356d74..4cd51c79fb 100644
--- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUploadConfig.java
+++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUploadConfig.java
@@ -16,24 +16,26 @@
package com.google.cloud.storage;
-import static com.google.cloud.storage.ByteSizeConstants._256KiB;
import static java.util.Objects.requireNonNull;
+import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.BetaApi;
-import com.google.api.core.InternalApi;
-import com.google.api.gax.retrying.BasicResultRetryAlgorithm;
-import com.google.api.gax.rpc.AbortedException;
-import com.google.api.gax.rpc.ApiException;
+import com.google.api.core.SettableApiFuture;
+import com.google.cloud.storage.BidiUploadState.AppendableUploadState;
+import com.google.cloud.storage.BidiUploadState.TakeoverAppendableUploadState;
import com.google.cloud.storage.BlobAppendableUpload.AppendableUploadWriteableByteChannel;
import com.google.cloud.storage.BlobAppendableUploadImpl.AppendableObjectBufferedWritableByteChannel;
import com.google.cloud.storage.Storage.BlobWriteOption;
import com.google.cloud.storage.TransportCompatibility.Transport;
import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt;
import com.google.cloud.storage.UnifiedOpts.Opts;
-import com.google.storage.v2.BidiWriteObjectRequest;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
import com.google.storage.v2.BidiWriteObjectResponse;
-import com.google.storage.v2.Object;
+import com.google.storage.v2.ServiceConstants.Values;
+import java.util.Objects;
+import java.util.function.BiFunction;
import javax.annotation.concurrent.Immutable;
/**
@@ -51,26 +53,24 @@ public final class BlobAppendableUploadConfig {
private static final BlobAppendableUploadConfig INSTANCE =
new BlobAppendableUploadConfig(
- FlushPolicy.minFlushSize(_256KiB),
- Hasher.enabled(),
- CloseAction.CLOSE_WITHOUT_FINALIZING);
+ FlushPolicy.minFlushSize(), CloseAction.CLOSE_WITHOUT_FINALIZING, 3);
private final FlushPolicy flushPolicy;
- private final Hasher hasher;
private final CloseAction closeAction;
+ private final int maxRedirectsAllowed;
private BlobAppendableUploadConfig(
- FlushPolicy flushPolicy, Hasher hasher, CloseAction closeAction) {
+ FlushPolicy flushPolicy, CloseAction closeAction, int maxRedirectsAllowed) {
this.flushPolicy = flushPolicy;
- this.hasher = hasher;
this.closeAction = closeAction;
+ this.maxRedirectsAllowed = maxRedirectsAllowed;
}
/**
* The {@link FlushPolicy} which will be used to determine when and how many bytes to flush to
* GCS.
*
- *
Default: {@link FlushPolicy#minFlushSize()}
*
* @see #withFlushPolicy(FlushPolicy)
* @since 2.51.0 This new api is in preview and is subject to breaking changes.
@@ -83,7 +83,7 @@ public FlushPolicy getFlushPolicy() {
/**
* Return an instance with the {@code FlushPolicy} set to be the specified value.
*
- *
Default: {@link FlushPolicy#minFlushSize()}
*
* @see #getFlushPolicy()
* @since 2.51.0 This new api is in preview and is subject to breaking changes.
@@ -94,7 +94,7 @@ public BlobAppendableUploadConfig withFlushPolicy(FlushPolicy flushPolicy) {
if (this.flushPolicy.equals(flushPolicy)) {
return this;
}
- return new BlobAppendableUploadConfig(flushPolicy, hasher, closeAction);
+ return new BlobAppendableUploadConfig(flushPolicy, closeAction, maxRedirectsAllowed);
}
/**
@@ -112,8 +112,9 @@ public CloseAction getCloseAction() {
}
/**
- * Return an instance with the {@code CloseAction} set to be the specified value. Default:
- * {@link CloseAction#CLOSE_WITHOUT_FINALIZING}
+ * Return an instance with the {@code CloseAction} set to be the specified value.
+ *
+ *
Default: {@link CloseAction#CLOSE_WITHOUT_FINALIZING}
*
* @see #getCloseAction()
* @since 2.51.0 This new api is in preview and is subject to breaking changes.
@@ -124,45 +125,66 @@ public BlobAppendableUploadConfig withCloseAction(CloseAction closeAction) {
if (this.closeAction == closeAction) {
return this;
}
- return new BlobAppendableUploadConfig(flushPolicy, hasher, closeAction);
+ return new BlobAppendableUploadConfig(flushPolicy, closeAction, maxRedirectsAllowed);
}
/**
- * Whether crc32c validation will be performed for bytes returned by Google Cloud Storage
+ * The {@code maxRedirectsAllowed} set to be the specified value.
*
- *
Default: {@code true}
+ *
Default: 3
*
- * @since 2.51.0 This new api is in preview and is subject to breaking changes.
+ * @see #withMaxRedirectsAllowed(int)
+ * @since 2.56.0 This new api is in preview and is subject to breaking changes.
*/
@BetaApi
- boolean getCrc32cValidationEnabled() {
- return Hasher.enabled().equals(hasher);
+ int getMaxRedirectsAllowed() {
+ return maxRedirectsAllowed;
}
/**
- * Return an instance with crc32c validation enabled based on {@code enabled}.
+ * Return an instance with the {@code maxRedirectsAllowed} set to be the specified value.
*
- *
Default: {@code true}
+ *
Default: 3
*
- * @param enabled Whether crc32c validation will be performed for bytes returned by Google Cloud
- * Storage
- * @since 2.51.0 This new api is in preview and is subject to breaking changes.
+ * @see #getMaxRedirectsAllowed()
+ * @since 2.56.0 This new api is in preview and is subject to breaking changes.
*/
@BetaApi
- BlobAppendableUploadConfig withCrc32cValidationEnabled(boolean enabled) {
- if (enabled && Hasher.enabled().equals(hasher)) {
- return this;
- } else if (!enabled && Hasher.noop().equals(hasher)) {
+ BlobAppendableUploadConfig withMaxRedirectsAllowed(int maxRedirectsAllowed) {
+ Preconditions.checkArgument(
+ maxRedirectsAllowed >= 0, "maxRedirectsAllowed >= 0 (%s >= 0)", maxRedirectsAllowed);
+ if (this.maxRedirectsAllowed == maxRedirectsAllowed) {
return this;
}
- return new BlobAppendableUploadConfig(
- flushPolicy, enabled ? Hasher.enabled() : Hasher.noop(), closeAction);
+ return new BlobAppendableUploadConfig(flushPolicy, closeAction, maxRedirectsAllowed);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof BlobAppendableUploadConfig)) {
+ return false;
+ }
+ BlobAppendableUploadConfig that = (BlobAppendableUploadConfig) o;
+ return maxRedirectsAllowed == that.maxRedirectsAllowed
+ && Objects.equals(flushPolicy, that.flushPolicy)
+ && closeAction == that.closeAction;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(flushPolicy, closeAction, maxRedirectsAllowed);
}
- /** Never to be made public until {@link Hasher} is public */
- @InternalApi
- Hasher getHasher() {
- return hasher;
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("flushPolicy", flushPolicy)
+ .add("closeAction", closeAction)
+ .add("maxRedirectsAllowed", maxRedirectsAllowed)
+ .toString();
}
/**
@@ -217,55 +239,65 @@ public enum CloseAction {
}
BlobAppendableUpload create(GrpcStorageImpl storage, BlobInfo info, Opts opts) {
- boolean takeOver = info.getGeneration() != null;
- BidiWriteObjectRequest req =
- takeOver
- ? storage.getBidiWriteObjectRequestForTakeover(info, opts)
- : storage.getBidiWriteObjectRequest(info, opts);
-
- BidiAppendableWrite baw = new BidiAppendableWrite(req, takeOver);
-
+ long maxPendingBytes = this.getFlushPolicy().getMaxPendingBytes();
+ AppendableUploadState state = storage.getAppendableState(info, opts, maxPendingBytes);
WritableByteChannelSession
build =
- ResumableMedia.gapic()
- .write()
- .bidiByteChannel(storage.storageClient.bidiWriteObjectCallable())
- .setHasher(this.getHasher())
- .setByteStringStrategy(ByteStringStrategy.copy())
- .appendable()
- .withRetryConfig(
- storage.retrier.withAlg(
- new BasicResultRetryAlgorithm