Skip to content

Commit 094040c

Browse files
committed
chore: make DefaultBufferedWritableByteChannel capable of being non-blocking
1 parent 70b6b78 commit 094040c

8 files changed

Lines changed: 274 additions & 16 deletions

File tree

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUploadConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ BlobAppendableUpload create(GrpcStorageImpl storage, BlobInfo info, Opts<ObjectT
272272
c = new BidiAppendableUnbufferedWritableByteChannel(stream, chunkSegmenter, 0);
273273
}
274274
return new AppendableObjectBufferedWritableByteChannel(
275-
flushPolicy.createBufferedChannel(c),
275+
flushPolicy.createBufferedChannel(c, /* blocking= */ false),
276276
c,
277277
this.closeAction == CloseAction.FINALIZE_WHEN_CLOSING);
278278
},

google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBufferedWritableByteChannel.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package com.google.cloud.storage;
1818

19+
import static com.google.common.base.Preconditions.checkState;
20+
1921
import com.google.cloud.storage.BufferedWritableByteChannelSession.BufferedWritableByteChannel;
2022
import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel;
2123
import java.io.IOException;
@@ -59,10 +61,17 @@ final class DefaultBufferedWritableByteChannel implements BufferedWritableByteCh
5961
private final BufferHandle handle;
6062

6163
private final UnbufferedWritableByteChannel channel;
64+
private final boolean blocking;
6265

6366
DefaultBufferedWritableByteChannel(BufferHandle handle, UnbufferedWritableByteChannel channel) {
67+
this(handle, channel, true);
68+
}
69+
70+
DefaultBufferedWritableByteChannel(
71+
BufferHandle handle, UnbufferedWritableByteChannel channel, boolean blocking) {
6472
this.handle = handle;
6573
this.channel = channel;
74+
this.blocking = blocking;
6675
}
6776

6877
@SuppressWarnings("UnnecessaryLocalVariable")
@@ -110,6 +119,7 @@ public int write(ByteBuffer src) throws IOException {
110119
Buffers.flip(buffer);
111120
ByteBuffer[] srcs = {buffer, buf};
112121
long write = channel.write(srcs);
122+
checkState(write >= 0, "write >= 0 (%s > 0)", write);
113123
if (write == capacity) {
114124
// we successfully wrote all the bytes we wanted to
115125
Buffers.clear(buffer);
@@ -131,23 +141,36 @@ public int write(ByteBuffer src) throws IOException {
131141
Buffers.position(src, srcPosition + sliceWritten);
132142
bytesConsumed += sliceWritten;
133143
}
144+
145+
if (!blocking) {
146+
break;
147+
}
134148
}
135149
} else {
136150
// no enqueued data and src is at least as large as our buffer, see if we can simply write
137151
// the provided src or a slice of it since our buffer is empty
138152
if (bufferRemaining == srcRemaining) {
139153
// the capacity of buffer and the bytes remaining in src are the same, directly
140154
// write src
141-
bytesConsumed += channel.write(src);
155+
int write = channel.write(src);
156+
checkState(write >= 0, "write >= 0 (%s > 0)", write);
157+
bytesConsumed += write;
158+
if (write < srcRemaining && !blocking) {
159+
break;
160+
}
142161
} else {
143162
// the src provided is larger than our buffer. rather than copying into the buffer, simply
144163
// write a slice
145164
ByteBuffer slice = src.slice();
146165
Buffers.limit(slice, bufferRemaining);
147166
int write = channel.write(slice);
167+
checkState(write >= 0, "write >= 0 (%s > 0)", write);
148168
int newPosition = srcPosition + write;
149169
Buffers.position(src, newPosition);
150170
bytesConsumed += write;
171+
if (write < bufferRemaining && !blocking) {
172+
break;
173+
}
151174
}
152175
}
153176
}

google-cloud-storage/src/main/java/com/google/cloud/storage/FlushPolicy.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public static MinFlushSizeFlushPolicy minFlushSize(int minFlushSize) {
8686
}
8787

8888
abstract BufferedWritableByteChannel createBufferedChannel(
89-
UnbufferedWritableByteChannel unbuffered);
89+
UnbufferedWritableByteChannel unbuffered, boolean blocking);
9090

9191
abstract long getMaxPendingBytes();
9292

@@ -155,9 +155,10 @@ public MaxFlushSizeFlushPolicy withMaxFlushSize(int maxFlushSize) {
155155
}
156156

157157
@Override
158-
BufferedWritableByteChannel createBufferedChannel(UnbufferedWritableByteChannel unbuffered) {
158+
BufferedWritableByteChannel createBufferedChannel(
159+
UnbufferedWritableByteChannel unbuffered, boolean blocking) {
159160
return new DefaultBufferedWritableByteChannel(
160-
BufferHandle.allocate(maxFlushSize), unbuffered);
161+
BufferHandle.allocate(maxFlushSize), unbuffered, blocking);
161162
}
162163

163164
@Override
@@ -264,9 +265,10 @@ public MinFlushSizeFlushPolicy withMaxPendingBytes(long maxPendingBytes) {
264265
}
265266

266267
@Override
267-
BufferedWritableByteChannel createBufferedChannel(UnbufferedWritableByteChannel unbuffered) {
268+
BufferedWritableByteChannel createBufferedChannel(
269+
UnbufferedWritableByteChannel unbuffered, boolean blocking) {
268270
return new MinFlushBufferedWritableByteChannel(
269-
BufferHandle.allocate(minFlushSize), unbuffered, false);
271+
BufferHandle.allocate(minFlushSize), unbuffered, blocking);
270272
}
271273

272274
@Override

google-cloud-storage/src/test/java/com/google/cloud/storage/DefaultBufferedWritableByteChannelTest.java

Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@
2424
import static org.junit.Assert.fail;
2525

2626
import com.google.cloud.storage.BufferedWritableByteChannelSession.BufferedWritableByteChannel;
27+
import com.google.cloud.storage.MinFlushBufferedWritableByteChannelTest.OnlyConsumeNBytes;
2728
import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel;
29+
import com.google.cloud.storage.it.ChecksummedTestContent;
2830
import com.google.common.collect.ImmutableList;
2931
import java.io.ByteArrayOutputStream;
3032
import java.io.IOException;
@@ -401,6 +403,200 @@ public void close() throws IOException {
401403
assertThat(closed.get()).isTrue();
402404
}
403405

406+
@Example
407+
void nonBlockingWrite0DoesNotBlock() throws IOException {
408+
BufferHandle handle = BufferHandle.allocate(5);
409+
DefaultBufferedWritableByteChannel c =
410+
new DefaultBufferedWritableByteChannel(handle, new OnlyConsumeNBytes(0, 1), false);
411+
412+
ChecksummedTestContent all = ChecksummedTestContent.gen(11);
413+
ByteBuffer s_0_4 = ByteBuffer.wrap(all.slice(0, 4).getBytes());
414+
ByteBuffer s_4_4 = ByteBuffer.wrap(all.slice(0, 4).getBytes());
415+
ByteBuffer s_8_3 = ByteBuffer.wrap(all.slice(0, 3).getBytes());
416+
int written1 = c.write(s_0_4);
417+
assertThat(written1).isEqualTo(4);
418+
assertThat(s_0_4.remaining()).isEqualTo(0);
419+
420+
int written2 = c.write(s_4_4);
421+
assertThat(written2).isEqualTo(0);
422+
assertThat(s_4_4.remaining()).isEqualTo(4);
423+
424+
int written3 = c.write(s_8_3);
425+
assertThat(written3).isEqualTo(0);
426+
assertThat(s_8_3.remaining()).isEqualTo(3);
427+
428+
assertThat(handle.remaining()).isEqualTo(1);
429+
}
430+
431+
@Example
432+
void nonBlockingWritePartialDoesNotBlock_withoutBuffering() throws IOException {
433+
BufferHandle handle = BufferHandle.allocate(4);
434+
OnlyConsumeNBytes channel = new OnlyConsumeNBytes(4, 4);
435+
DefaultBufferedWritableByteChannel c =
436+
new DefaultBufferedWritableByteChannel(handle, channel, false);
437+
438+
ChecksummedTestContent all = ChecksummedTestContent.gen(13);
439+
ByteBuffer s_0_4 = ByteBuffer.wrap(all.slice(0, 4).getBytes());
440+
ByteBuffer s_4_4 = ByteBuffer.wrap(all.slice(4, 4).getBytes());
441+
442+
// write all 4 bytes
443+
int written1 = c.write(s_0_4);
444+
assertThat(written1).isEqualTo(4);
445+
assertThat(s_0_4.remaining()).isEqualTo(0);
446+
assertThat(handle.remaining()).isEqualTo(4);
447+
assertThat(channel.getBytesConsumed()).isEqualTo(4);
448+
449+
// Attempt to write 4 bytes, but 0 will be consumed, break out without consuming any
450+
int written2 = c.write(s_4_4);
451+
assertThat(written2).isEqualTo(0);
452+
assertThat(s_4_4.remaining()).isEqualTo(4);
453+
assertThat(handle.remaining()).isEqualTo(4);
454+
assertThat(channel.getBytesConsumed()).isEqualTo(4);
455+
}
456+
457+
@Example
458+
void nonBlockingWritePartialDoesNotBlock_withoutBuffering_oversized() throws IOException {
459+
BufferHandle handle = BufferHandle.allocate(2);
460+
OnlyConsumeNBytes channel = new OnlyConsumeNBytes(4, 2);
461+
DefaultBufferedWritableByteChannel c =
462+
new DefaultBufferedWritableByteChannel(handle, channel, false);
463+
464+
ChecksummedTestContent all = ChecksummedTestContent.gen(13);
465+
ByteBuffer s_0_4 = ByteBuffer.wrap(all.slice(0, 4).getBytes());
466+
ByteBuffer s_4_4 = ByteBuffer.wrap(all.slice(4, 4).getBytes());
467+
468+
// write all 4 bytes
469+
int written1 = c.write(s_0_4);
470+
assertThat(written1).isEqualTo(4);
471+
assertThat(s_0_4.remaining()).isEqualTo(0);
472+
assertThat(handle.remaining()).isEqualTo(2);
473+
assertThat(channel.getBytesConsumed()).isEqualTo(4);
474+
475+
// Attempt to write 4 bytes, but 0 will be consumed, break out without consuming any
476+
int written2 = c.write(s_4_4);
477+
assertThat(written2).isEqualTo(0);
478+
assertThat(s_4_4.remaining()).isEqualTo(4);
479+
assertThat(handle.remaining()).isEqualTo(2);
480+
assertThat(channel.getBytesConsumed()).isEqualTo(4);
481+
}
482+
483+
@Example
484+
void nonBlockingWritePartialDoesNotBlock_withBuffering() throws IOException {
485+
BufferHandle handle = BufferHandle.allocate(5);
486+
OnlyConsumeNBytes channel = new OnlyConsumeNBytes(5, 5);
487+
DefaultBufferedWritableByteChannel c =
488+
new DefaultBufferedWritableByteChannel(handle, channel, false);
489+
490+
ChecksummedTestContent all = ChecksummedTestContent.gen(13);
491+
ByteBuffer s_0_4 = ByteBuffer.wrap(all.slice(0, 4).getBytes());
492+
ByteBuffer s_4_4 = ByteBuffer.wrap(all.slice(4, 4).getBytes());
493+
ByteBuffer s_8_12 = ByteBuffer.wrap(all.slice(8, 4).getBytes());
494+
495+
// write all 4 bytes
496+
int written1 = c.write(s_0_4);
497+
assertThat(written1).isEqualTo(4);
498+
assertThat(s_0_4.remaining()).isEqualTo(0);
499+
assertThat(handle.remaining()).isEqualTo(1);
500+
assertThat(channel.getBytesConsumed()).isEqualTo(0);
501+
502+
//
503+
int written2 = c.write(s_4_4);
504+
assertThat(written2).isEqualTo(4);
505+
assertThat(s_4_4.remaining()).isEqualTo(0);
506+
assertThat(handle.remaining()).isEqualTo(2);
507+
assertThat(channel.getBytesConsumed()).isEqualTo(5);
508+
509+
int written3 = c.write(s_8_12);
510+
assertThat(written3).isEqualTo(0);
511+
assertThat(s_8_12.remaining()).isEqualTo(4);
512+
assertThat(handle.remaining()).isEqualTo(2);
513+
assertThat(channel.getBytesConsumed()).isEqualTo(5);
514+
}
515+
516+
@Example
517+
void nonBlockingWritePartialDoesNotBlock_withBuffering_oversized() throws IOException {
518+
BufferHandle handle = BufferHandle.allocate(3);
519+
OnlyConsumeNBytes channel = new OnlyConsumeNBytes(6, 3);
520+
DefaultBufferedWritableByteChannel c =
521+
new DefaultBufferedWritableByteChannel(handle, channel, false);
522+
523+
ChecksummedTestContent all = ChecksummedTestContent.gen(13);
524+
ByteBuffer s_0_4 = ByteBuffer.wrap(all.slice(0, 4).getBytes());
525+
ByteBuffer s_4_4 = ByteBuffer.wrap(all.slice(4, 4).getBytes());
526+
ByteBuffer s_8_12 = ByteBuffer.wrap(all.slice(8, 4).getBytes());
527+
528+
// slice 3 bytes and consume them, then enqueue the remaining 1 byte
529+
int written1_1 = c.write(s_0_4);
530+
assertThat(written1_1).isEqualTo(4);
531+
assertThat(s_0_4.remaining()).isEqualTo(0);
532+
assertThat(handle.remaining()).isEqualTo(2);
533+
assertThat(channel.getBytesConsumed()).isEqualTo(3);
534+
535+
// write 1 buffered byte and 2 sliced bytes, enqueue 2 remaining
536+
int written2 = c.write(s_4_4);
537+
assertThat(written2).isEqualTo(4);
538+
assertThat(s_4_4.remaining()).isEqualTo(0);
539+
assertThat(handle.remaining()).isEqualTo(1);
540+
assertThat(channel.getBytesConsumed()).isEqualTo(6);
541+
542+
// attempt to write 4 bytes, non will be consumed and the buffer should remain the same
543+
int written3 = c.write(s_8_12);
544+
assertThat(written3).isEqualTo(0);
545+
assertThat(s_8_12.remaining()).isEqualTo(4);
546+
assertThat(handle.remaining()).isEqualTo(1);
547+
assertThat(channel.getBytesConsumed()).isEqualTo(6);
548+
}
549+
550+
@Example
551+
void illegalStateExceptionIfWrittenLt0_slice_eqBuffer() {
552+
BufferHandle handle = BufferHandle.allocate(4);
553+
DefaultBufferedWritableByteChannel c =
554+
new DefaultBufferedWritableByteChannel(handle, new NegativeOneWritableByteChannel(), false);
555+
556+
ChecksummedTestContent all = ChecksummedTestContent.gen(11);
557+
IllegalStateException ise =
558+
assertThrows(IllegalStateException.class, () -> c.write(all.slice(0, 4).asByteBuffer()));
559+
ise.printStackTrace(System.out);
560+
}
561+
562+
@Example
563+
void illegalStateExceptionIfWrittenLt0_slice_gtBuffer() {
564+
BufferHandle handle = BufferHandle.allocate(4);
565+
DefaultBufferedWritableByteChannel c =
566+
new DefaultBufferedWritableByteChannel(handle, new NegativeOneWritableByteChannel(), false);
567+
568+
ChecksummedTestContent all = ChecksummedTestContent.gen(11);
569+
IllegalStateException ise =
570+
assertThrows(IllegalStateException.class, () -> c.write(all.slice(0, 5).asByteBuffer()));
571+
ise.printStackTrace(System.out);
572+
}
573+
574+
@Example
575+
void illegalStateExceptionIfWrittenLt0_slice_ltBuffer() {
576+
BufferHandle handle = BufferHandle.allocate(4);
577+
DefaultBufferedWritableByteChannel c =
578+
new DefaultBufferedWritableByteChannel(handle, new NegativeOneWritableByteChannel(), false);
579+
580+
ChecksummedTestContent all = ChecksummedTestContent.gen(11);
581+
IllegalStateException ise =
582+
assertThrows(
583+
IllegalStateException.class,
584+
() -> {
585+
int written1 = c.write(all.slice(0, 3).asByteBuffer());
586+
assertThat(written1).isEqualTo(3);
587+
c.write(all.slice(3, 3).asByteBuffer());
588+
fail("should have errored in previous write call");
589+
});
590+
ise.printStackTrace(System.out);
591+
}
592+
593+
@Example
594+
void test() {
595+
illegalStateExceptionIfWrittenLt0_slice_eqBuffer();
596+
illegalStateExceptionIfWrittenLt0_slice_gtBuffer();
597+
illegalStateExceptionIfWrittenLt0_slice_ltBuffer();
598+
}
599+
404600
@Property
405601
void bufferAllocationShouldOnlyHappenWhenNeeded(@ForAll("BufferSizes") WriteOps writeOps)
406602
throws IOException {
@@ -697,4 +893,20 @@ public ByteBuffer get() {
697893
return delegate.get();
698894
}
699895
}
896+
897+
private static class NegativeOneWritableByteChannel implements UnbufferedWritableByteChannel {
898+
899+
@Override
900+
public long write(ByteBuffer[] srcs, int offset, int length) {
901+
return -1;
902+
}
903+
904+
@Override
905+
public boolean isOpen() {
906+
return true;
907+
}
908+
909+
@Override
910+
public void close() {}
911+
}
700912
}

google-cloud-storage/src/test/java/com/google/cloud/storage/ITAppendableUploadFakeTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -491,7 +491,7 @@ public void testFlushMultipleSegments_failsHalfway_partialFlush() throws Excepti
491491
smallSegmenter,
492492
0);
493493
ChecksummedTestContent content = ChecksummedTestContent.of(ALL_OBJECT_BYTES, 0, 10);
494-
channel.write(ByteBuffer.wrap(content.getBytes()));
494+
StorageChannelUtils.blockingEmptyTo(ByteBuffer.wrap(content.getBytes()), channel);
495495
channel.nextWriteShouldFinalize();
496496
channel.close();
497497
assertThat(done.get(777, TimeUnit.MILLISECONDS).getResource().getSize()).isEqualTo(10);
@@ -641,8 +641,8 @@ public void testFlushMultipleSegmentsTwice_firstSucceeds_secondFailsHalfway_part
641641
0);
642642
ChecksummedTestContent content1 = ChecksummedTestContent.of(ALL_OBJECT_BYTES, 0, 10);
643643
ChecksummedTestContent content2 = ChecksummedTestContent.of(ALL_OBJECT_BYTES, 10, 10);
644-
channel.write(ByteBuffer.wrap(content1.getBytes()));
645-
channel.write(ByteBuffer.wrap(content2.getBytes()));
644+
StorageChannelUtils.blockingEmptyTo(ByteBuffer.wrap(content1.getBytes()), channel);
645+
StorageChannelUtils.blockingEmptyTo(ByteBuffer.wrap(content2.getBytes()), channel);
646646
channel.nextWriteShouldFinalize();
647647
channel.close();
648648
assertThat(done.get(777, TimeUnit.MILLISECONDS).getResource().getSize()).isEqualTo(20);
@@ -793,7 +793,7 @@ public void testFlushMultipleSegments_200ResponsePartialFlushHalfway() throws Ex
793793
BidiAppendableUnbufferedWritableByteChannel channel =
794794
new BidiAppendableUnbufferedWritableByteChannel(stream, smallSegmenter, 0);
795795
ChecksummedTestContent content = ChecksummedTestContent.of(ALL_OBJECT_BYTES, 0, 10);
796-
channel.write(ByteBuffer.wrap(content.getBytes()));
796+
StorageChannelUtils.blockingEmptyTo(ByteBuffer.wrap(content.getBytes()), channel);
797797
channel.nextWriteShouldFinalize();
798798
channel.close();
799799
assertThat(stream.getResultFuture().get(777, TimeUnit.MILLISECONDS).getResource().getSize())
@@ -912,7 +912,7 @@ public void crc32cWorks() throws Exception {
912912
BlobAppendableUpload upload =
913913
storage.blobAppendableUpload(BlobInfo.newBuilder(id).build(), config);
914914
try (AppendableUploadWriteableByteChannel channel = upload.open()) {
915-
channel.write(ByteBuffer.wrap(b));
915+
StorageChannelUtils.blockingEmptyTo(ByteBuffer.wrap(b), channel);
916916
}
917917
ApiFuture<BlobInfo> result = upload.getResult();
918918
result.get(5, TimeUnit.SECONDS);
@@ -1050,7 +1050,7 @@ private static void runTestFlushMultipleSegments(FakeStorage fake) throws Except
10501050
storage.storageDataClient.retryContextProvider.create());
10511051
BidiAppendableUnbufferedWritableByteChannel channel =
10521052
new BidiAppendableUnbufferedWritableByteChannel(stream, smallSegmenter, 0);
1053-
channel.write(ByteBuffer.wrap(content.getBytes()));
1053+
StorageChannelUtils.blockingEmptyTo(ByteBuffer.wrap(content.getBytes()), channel);
10541054
channel.nextWriteShouldFinalize();
10551055
channel.close();
10561056
BidiWriteObjectResponse response = stream.getResultFuture().get(777, TimeUnit.MILLISECONDS);

0 commit comments

Comments
 (0)