Skip to content

Commit 6853eb7

Browse files
committed
chore: update BidiUploadStreamingStream to avoid enqueuing multiple equivalent flushes
If the downstream queue doesn't have enough space to accept the message, we can trigger a flush to ensure some progress is made. However, if the attempted write is not consumed multiple times we want to avoid enqueueing the standalone flush multiple times. Flag the rewindable content as dirty so the rewind will work when only partial consumption happens.
1 parent 1a1f879 commit 6853eb7

File tree

5 files changed

+114
-2
lines changed

5 files changed

+114
-2
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/BidiAppendableUnbufferedWritableByteChannel.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,9 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength) th
128128
if (data.length == 0) {
129129
return 0;
130130
}
131+
// we consumed some bytes from srcs, flag our content as dirty since we aren't writing
132+
// those bytes to implicitly flag as dirty.
133+
rewindableContent.flagDirty();
131134

132135
long bytesConsumed = 0;
133136
for (int i = 0, len = data.length, lastIdx = len - 1; i < len; i++) {

google-cloud-storage/src/main/java/com/google/cloud/storage/BidiUploadStreamingStream.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,14 +142,17 @@ public boolean appendAndFinalize(ChunkSegmenter.@NonNull ChunkSegment data) {
142142
public void flush() {
143143
lock.lock();
144144
try {
145-
// TODO: debounce this to happen only every 8MiB or so.
146-
// we want to avoid flush: true, state_lookup: true for every single message
147145
BidiWriteObjectRequest flush =
148146
BidiWriteObjectRequest.newBuilder()
149147
.setWriteOffset(state.getTotalSentBytes())
150148
.setFlush(true)
151149
.setStateLookup(true)
152150
.build();
151+
// if our flush is already enqueued, simply tick to make sure things are sent
152+
if (flush.equals(state.peekLast())) {
153+
internalSend();
154+
return;
155+
}
153156
boolean offered = state.offer(flush);
154157
if (offered) {
155158
internalSend();

google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedChunkedResumableWritableByteChannel.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,9 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength, bo
122122
if (data.length == 0) {
123123
return 0;
124124
}
125+
// we consumed some bytes from srcs, flag our content as dirty since we aren't writing
126+
// those bytes to implicitly flag as dirty.
127+
content.flagDirty();
125128

126129
List<WriteObjectRequest> messages = new ArrayList<>();
127130

google-cloud-storage/src/main/java/com/google/cloud/storage/RewindableContent.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ private RewindableContent() {
4848

4949
abstract long writeTo(GatheringByteChannel gbc) throws IOException;
5050

51+
abstract void flagDirty();
52+
5153
@Override
5254
public final boolean retrySupported() {
5355
return false;
@@ -106,6 +108,9 @@ long writeTo(GatheringByteChannel gbc) {
106108

107109
@Override
108110
protected void rewindTo(long offset) {}
111+
112+
@Override
113+
void flagDirty() {}
109114
}
110115

111116
private static final class PathRewindableContent extends RewindableContent {
@@ -157,6 +162,9 @@ long writeTo(GatheringByteChannel gbc) throws IOException {
157162
return ByteStreams.copy(in, gbc);
158163
}
159164
}
165+
166+
@Override
167+
void flagDirty() {}
160168
}
161169

162170
private static final class ByteBufferContent extends RewindableContent {
@@ -247,5 +255,10 @@ void rewindTo(long offset) {
247255
}
248256
this.offset = offset;
249257
}
258+
259+
@Override
260+
void flagDirty() {
261+
this.dirty = true;
262+
}
250263
}
251264
}
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Copyright 2025 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.storage;
18+
19+
import static com.google.cloud.storage.TestUtils.assertAll;
20+
import static com.google.common.truth.Truth.assertThat;
21+
22+
import com.google.cloud.storage.BlobAppendableUpload.AppendableUploadWriteableByteChannel;
23+
import com.google.cloud.storage.BlobAppendableUploadConfig.CloseAction;
24+
import com.google.cloud.storage.TransportCompatibility.Transport;
25+
import com.google.cloud.storage.it.ChecksummedTestContent;
26+
import com.google.cloud.storage.it.runner.StorageITRunner;
27+
import com.google.cloud.storage.it.runner.annotations.Backend;
28+
import com.google.cloud.storage.it.runner.annotations.BucketFixture;
29+
import com.google.cloud.storage.it.runner.annotations.BucketType;
30+
import com.google.cloud.storage.it.runner.annotations.CrossRun;
31+
import com.google.cloud.storage.it.runner.annotations.Inject;
32+
import com.google.cloud.storage.it.runner.registry.Generator;
33+
import java.nio.ByteBuffer;
34+
import java.nio.channels.WritableByteChannel;
35+
import java.util.UUID;
36+
import java.util.concurrent.TimeUnit;
37+
import org.junit.Test;
38+
import org.junit.runner.RunWith;
39+
40+
@RunWith(StorageITRunner.class)
41+
@CrossRun(
42+
backends = {Backend.PROD},
43+
transports = Transport.GRPC)
44+
public final class ITBidiAppendableUnbufferedWritableByteChannelTest {
45+
46+
@Inject public Generator generator;
47+
48+
@Inject public Storage storage;
49+
50+
@Inject
51+
@BucketFixture(BucketType.RAPID)
52+
public BucketInfo bucket;
53+
54+
@Inject public Backend backend;
55+
56+
@Test
57+
public void nonBufferAlignedWritesLeaveBuffersInTheCorrectState() throws Exception {
58+
BlobId bid = BlobId.of(bucket.getName(), UUID.randomUUID().toString());
59+
BlobAppendableUploadConfig config =
60+
BlobAppendableUploadConfig.of()
61+
.withFlushPolicy(FlushPolicy.minFlushSize(8 * 1024).withMaxPendingBytes(16 * 1024))
62+
.withCloseAction(CloseAction.CLOSE_WITHOUT_FINALIZING);
63+
ChecksummedTestContent ctc = ChecksummedTestContent.gen(16 * 1024 + 5);
64+
BlobAppendableUpload upload =
65+
storage.blobAppendableUpload(BlobInfo.newBuilder(bid).build(), config);
66+
try (AppendableUploadWriteableByteChannel channel = upload.open()) {
67+
// enqueue 4 bytes, this makes it so the following 8K writes don't evenly fit
68+
checkedEmptyTo(ctc.slice(0, 4).asByteBuffer(), channel);
69+
checkedEmptyTo(ctc.slice(4, 8192).asByteBuffer(), channel);
70+
checkedEmptyTo(ctc.slice(4 + 8192, 8192).asByteBuffer(), channel);
71+
checkedEmptyTo(ctc.slice(4 + 8192 + 8192, 1).asByteBuffer(), channel);
72+
}
73+
BlobInfo done1 = upload.getResult().get(5, TimeUnit.SECONDS);
74+
assertThat(done1.getSize()).isEqualTo(ctc.length());
75+
assertThat(done1.getCrc32c()).isEqualTo(Utils.crc32cCodec.encode(ctc.getCrc32c()));
76+
}
77+
78+
private static int checkedEmptyTo(ByteBuffer buf, WritableByteChannel c) throws Exception {
79+
int remaining = buf.remaining();
80+
int position = buf.position();
81+
int remaining1 = buf.remaining();
82+
int written = StorageChannelUtils.blockingEmptyTo(buf, c);
83+
assertAll(
84+
() -> assertThat(written).isEqualTo(position + remaining1),
85+
() -> assertThat(buf.position()).isEqualTo(position + written),
86+
() -> assertThat(buf.remaining()).isEqualTo(remaining1 - written));
87+
assertThat(written).isEqualTo(remaining);
88+
return written;
89+
}
90+
}

0 commit comments

Comments
 (0)