Skip to content

Commit eb8c5c8

Browse files
authored
chore: add com.google.cloud.storage.RangeProjectionConfigs.SeekableChannelConfig which allows creating a SeekableByteChannel projection for a BlobReadSession
1 parent 50e4589 commit eb8c5c8

4 files changed

Lines changed: 359 additions & 21 deletions

File tree

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
* Copyright 2025 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.storage;
18+
19+
import static com.google.common.base.Preconditions.checkState;
20+
21+
import com.google.cloud.storage.RangeProjectionConfigs.SeekableChannelConfig;
22+
import com.google.common.base.Preconditions;
23+
import java.io.IOException;
24+
import java.nio.ByteBuffer;
25+
import java.nio.channels.ClosedChannelException;
26+
import java.nio.channels.ReadableByteChannel;
27+
import java.nio.channels.SeekableByteChannel;
28+
import org.checkerframework.checker.nullness.qual.Nullable;
29+
30+
final class ObjectReadSessionSeekableByteChannel implements SeekableByteChannel, IOAutoCloseable {
31+
32+
private final ObjectReadSession session;
33+
private final SeekableChannelConfig config;
34+
private final long size;
35+
private final IOAutoCloseable closeAlongWithThis;
36+
37+
private ReadableByteChannel rbc;
38+
39+
private long position;
40+
private boolean open = true;
41+
42+
@Nullable private RangeSpec lastRangeSpec;
43+
44+
ObjectReadSessionSeekableByteChannel(
45+
ObjectReadSession session, SeekableChannelConfig config, IOAutoCloseable closeAlongWithThis) {
46+
this.session = session;
47+
this.config = config;
48+
this.closeAlongWithThis = closeAlongWithThis;
49+
this.size = session.getResource().getSize();
50+
this.position = 0;
51+
}
52+
53+
@Override
54+
public int read(ByteBuffer dst) throws IOException {
55+
if (!open) {
56+
throw new ClosedChannelException();
57+
}
58+
if (remaining() <= 0) {
59+
return -1;
60+
}
61+
62+
int totalRead = 0;
63+
if (rbc == null) {
64+
RangeSpec apply = config.getRangeSpecFunction().apply(position, lastRangeSpec);
65+
checkState(
66+
apply.begin() == position,
67+
"RangeSpec does not begin at provided position. expected = %s, actual = %s",
68+
position,
69+
apply.begin());
70+
rbc = session.readRange(apply, RangeProjectionConfigs.asChannel());
71+
lastRangeSpec = apply;
72+
}
73+
74+
int read = rbc.read(dst);
75+
if (read < 0) {
76+
rbc.close();
77+
rbc = null;
78+
} else {
79+
totalRead += read;
80+
position += read;
81+
}
82+
83+
return totalRead;
84+
}
85+
86+
private long remaining() {
87+
return size - position;
88+
}
89+
90+
@Override
91+
public long size() throws IOException {
92+
return size;
93+
}
94+
95+
@Override
96+
public long position() throws IOException {
97+
return position;
98+
}
99+
100+
@Override
101+
public SeekableByteChannel position(long newPosition) throws IOException {
102+
Preconditions.checkArgument(newPosition >= 0, "newPosition >= 0 (%s >= 0)", newPosition);
103+
if (position == newPosition) {
104+
return this;
105+
}
106+
position = newPosition;
107+
try (ReadableByteChannel ignore = rbc) {
108+
rbc = null;
109+
}
110+
return this;
111+
}
112+
113+
@Override
114+
public int write(ByteBuffer src) throws IOException {
115+
throw new UnsupportedOperationException("write(ByteBuffer)");
116+
}
117+
118+
@Override
119+
public SeekableByteChannel truncate(long size) throws IOException {
120+
throw new UnsupportedOperationException("truncate(long)");
121+
}
122+
123+
@Override
124+
public boolean isOpen() {
125+
return open;
126+
}
127+
128+
@Override
129+
public void close() throws IOException {
130+
try (IOAutoCloseable ignore1 = closeAlongWithThis;
131+
ReadableByteChannel ignore2 = rbc) {
132+
open = false;
133+
rbc = null;
134+
}
135+
}
136+
}

google-cloud-storage/src/main/java/com/google/cloud/storage/RangeProjectionConfigs.java

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,16 @@
1616

1717
package com.google.cloud.storage;
1818

19+
import static java.util.Objects.requireNonNull;
20+
1921
import com.google.api.core.ApiFuture;
2022
import com.google.cloud.storage.BaseObjectReadSessionStreamRead.AccumulatingRead;
2123
import com.google.cloud.storage.BaseObjectReadSessionStreamRead.StreamingRead;
2224
import com.google.cloud.storage.ZeroCopySupport.DisposableByteString;
25+
import com.google.common.base.MoreObjects;
2326
import java.nio.channels.ScatteringByteChannel;
27+
import java.nio.channels.SeekableByteChannel;
28+
import java.util.Objects;
2429

2530
public final class RangeProjectionConfigs {
2631

@@ -53,6 +58,68 @@ static RangeAsFutureByteString asFutureByteString() {
5358
return RangeAsFutureByteString.INSTANCE;
5459
}
5560

61+
public static SeekableChannelConfig asSeekableChannel() {
62+
return SeekableChannelConfig.INSTANCE;
63+
}
64+
65+
public static final class SeekableChannelConfig
66+
extends RangeProjectionConfig<SeekableByteChannel> {
67+
68+
private static final SeekableChannelConfig INSTANCE =
69+
new SeekableChannelConfig(LinearExponentialRangeSpecFunction.INSTANCE);
70+
71+
private final RangeSpecFunction rangeSpecFunction;
72+
73+
private SeekableChannelConfig(RangeSpecFunction rangeSpecFunction) {
74+
this.rangeSpecFunction = rangeSpecFunction;
75+
}
76+
77+
public RangeSpecFunction getRangeSpecFunction() {
78+
return rangeSpecFunction;
79+
}
80+
81+
public SeekableChannelConfig withRangeSpecFunction(RangeSpecFunction rangeSpecFunction) {
82+
requireNonNull(rangeSpecFunction, "rangeSpecFunction must be non null");
83+
return new SeekableChannelConfig(rangeSpecFunction);
84+
}
85+
86+
@Override
87+
SeekableByteChannel project(
88+
RangeSpec range, ObjectReadSession session, IOAutoCloseable closeAlongWith) {
89+
return StorageByteChannels.seekable(
90+
new ObjectReadSessionSeekableByteChannel(session, this, closeAlongWith));
91+
}
92+
93+
@Override
94+
ProjectionType getType() {
95+
return ProjectionType.SESSION_USER;
96+
}
97+
98+
@Override
99+
public boolean equals(Object o) {
100+
if (this == o) {
101+
return true;
102+
}
103+
if (!(o instanceof SeekableChannelConfig)) {
104+
return false;
105+
}
106+
SeekableChannelConfig that = (SeekableChannelConfig) o;
107+
return Objects.equals(rangeSpecFunction, that.rangeSpecFunction);
108+
}
109+
110+
@Override
111+
public int hashCode() {
112+
return Objects.hashCode(rangeSpecFunction);
113+
}
114+
115+
@Override
116+
public String toString() {
117+
return MoreObjects.toStringHelper(this)
118+
.add("rangeSpecFunction", rangeSpecFunction)
119+
.toString();
120+
}
121+
}
122+
56123
public static final class RangeAsChannel
57124
extends BaseConfig<ScatteringByteChannel, StreamingRead> {
58125
private static final RangeAsChannel INSTANCE = new RangeAsChannel();

google-cloud-storage/src/main/java/com/google/cloud/storage/StorageByteChannels.java

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.nio.channels.ClosedChannelException;
2626
import java.nio.channels.ReadableByteChannel;
2727
import java.nio.channels.ScatteringByteChannel;
28+
import java.nio.channels.SeekableByteChannel;
2829
import java.util.concurrent.locks.ReentrantLock;
2930

3031
final class StorageByteChannels {
@@ -37,6 +38,10 @@ static Writable writable() {
3738
return Writable.INSTANCE;
3839
}
3940

41+
public static SeekableByteChannel seekable(SeekableByteChannel delegate) {
42+
return new SynchronizedSeekableByteChannel(delegate);
43+
}
44+
4045
static final class Readable {
4146
private static final Readable INSTANCE = new Readable();
4247

@@ -376,4 +381,94 @@ public void close() throws IOException {
376381
c.close();
377382
}
378383
}
384+
385+
private static final class SynchronizedSeekableByteChannel implements SeekableByteChannel {
386+
private final SeekableByteChannel delegate;
387+
private final ReentrantLock lock;
388+
389+
private SynchronizedSeekableByteChannel(SeekableByteChannel delegate) {
390+
this.delegate = delegate;
391+
this.lock = new ReentrantLock();
392+
}
393+
394+
@Override
395+
public int read(ByteBuffer dst) throws IOException {
396+
lock.lock();
397+
try {
398+
return delegate.read(dst);
399+
} finally {
400+
lock.unlock();
401+
}
402+
}
403+
404+
@Override
405+
public int write(ByteBuffer src) throws IOException {
406+
lock.lock();
407+
try {
408+
return delegate.write(src);
409+
} finally {
410+
lock.unlock();
411+
}
412+
}
413+
414+
@Override
415+
public long position() throws IOException {
416+
lock.lock();
417+
try {
418+
return delegate.position();
419+
} finally {
420+
lock.unlock();
421+
}
422+
}
423+
424+
@Override
425+
public SeekableByteChannel position(long newPosition) throws IOException {
426+
lock.lock();
427+
try {
428+
return delegate.position(newPosition);
429+
} finally {
430+
lock.unlock();
431+
}
432+
}
433+
434+
@Override
435+
public long size() throws IOException {
436+
lock.lock();
437+
try {
438+
return delegate.size();
439+
} finally {
440+
lock.unlock();
441+
}
442+
}
443+
444+
@Override
445+
public SeekableByteChannel truncate(long size) throws IOException {
446+
lock.lock();
447+
try {
448+
return delegate.truncate(size);
449+
} finally {
450+
lock.unlock();
451+
}
452+
}
453+
454+
@Override
455+
public boolean isOpen() {
456+
lock.lock();
457+
try {
458+
return delegate.isOpen();
459+
} finally {
460+
lock.unlock();
461+
}
462+
}
463+
464+
@Override
465+
public void close() throws IOException {
466+
lock.lock();
467+
try {
468+
delegate.close();
469+
} finally {
470+
lock.unlock();
471+
}
472+
}
473+
}
379474
}

0 commit comments

Comments
 (0)