|
16 | 16 |
|
17 | 17 | package com.google.cloud.storage; |
18 | 18 |
|
19 | | -import static com.google.cloud.storage.ByteSizeConstants._2MiB; |
20 | | -import static java.util.Objects.requireNonNull; |
21 | | - |
22 | 19 | import com.google.api.services.storage.Storage; |
23 | 20 | import com.google.api.services.storage.model.StorageObject; |
24 | 21 | import com.google.cloud.ReadChannel; |
25 | 22 | import com.google.cloud.RestorableState; |
26 | 23 | import com.google.cloud.storage.ApiaryUnbufferedReadableByteChannel.ApiaryReadRequest; |
27 | | -import com.google.cloud.storage.BufferedReadableByteChannelSession.BufferedReadableByteChannel; |
28 | 24 | import com.google.cloud.storage.spi.v1.StorageRpc; |
29 | 25 | import com.google.common.base.MoreObjects; |
30 | | -import java.io.IOException; |
31 | 26 | import java.io.Serializable; |
32 | | -import java.nio.ByteBuffer; |
33 | 27 | import java.util.Map; |
34 | 28 | import java.util.Objects; |
35 | 29 |
|
36 | | -final class BlobReadChannelV2 implements StorageReadChannel { |
| 30 | +final class BlobReadChannelV2 extends BaseStorageReadChannel<StorageObject> { |
37 | 31 |
|
38 | 32 | private final StorageObject storageObject; |
39 | 33 | private final Map<StorageRpc.Option, ?> opts; |
40 | 34 | private final BlobReadChannelContext blobReadChannelContext; |
41 | 35 |
|
42 | | - private LazyReadChannel<StorageObject> lazyReadChannel; |
43 | | - private StorageObject resolvedObject; |
44 | | - private ByteRangeSpec byteRangeSpec; |
45 | | - |
46 | | - private int chunkSize = _2MiB; |
47 | | - private BufferHandle bufferHandle; |
48 | | - |
49 | 36 | BlobReadChannelV2( |
50 | 37 | StorageObject storageObject, |
51 | 38 | Map<StorageRpc.Option, ?> opts, |
52 | 39 | BlobReadChannelContext blobReadChannelContext) { |
53 | 40 | this.storageObject = storageObject; |
54 | 41 | this.opts = opts; |
55 | 42 | this.blobReadChannelContext = blobReadChannelContext; |
56 | | - this.byteRangeSpec = ByteRangeSpec.nullRange(); |
57 | | - } |
58 | | - |
59 | | - @Override |
60 | | - public synchronized void setChunkSize(int chunkSize) { |
61 | | - StorageException.wrapIOException(() -> maybeResetChannel(true)); |
62 | | - this.chunkSize = chunkSize; |
63 | | - } |
64 | | - |
65 | | - @Override |
66 | | - public synchronized boolean isOpen() { |
67 | | - if (lazyReadChannel == null) { |
68 | | - return true; |
69 | | - } else { |
70 | | - LazyReadChannel<StorageObject> tmp = internalGetLazyChannel(); |
71 | | - return tmp.isOpen(); |
72 | | - } |
73 | | - } |
74 | | - |
75 | | - @Override |
76 | | - public synchronized void close() { |
77 | | - if (internalGetLazyChannel().isOpen()) { |
78 | | - StorageException.wrapIOException(internalGetLazyChannel().getChannel()::close); |
79 | | - } |
80 | | - } |
81 | | - |
82 | | - @Override |
83 | | - public synchronized StorageReadChannel setByteRangeSpec(ByteRangeSpec byteRangeSpec) { |
84 | | - requireNonNull(byteRangeSpec, "byteRangeSpec must be non null"); |
85 | | - StorageException.wrapIOException(() -> maybeResetChannel(false)); |
86 | | - this.byteRangeSpec = byteRangeSpec; |
87 | | - return this; |
88 | | - } |
89 | | - |
90 | | - @Override |
91 | | - public ByteRangeSpec getByteRangeSpec() { |
92 | | - return byteRangeSpec; |
93 | | - } |
94 | | - |
95 | | - @Override |
96 | | - public synchronized int read(ByteBuffer dst) throws IOException { |
97 | | - long diff = byteRangeSpec.length(); |
98 | | - if (diff <= 0) { |
99 | | - close(); |
100 | | - return -1; |
101 | | - } |
102 | | - try { |
103 | | - int read = internalGetLazyChannel().getChannel().read(dst); |
104 | | - if (read != -1) { |
105 | | - byteRangeSpec = byteRangeSpec.withShiftBeginOffset(read); |
106 | | - } else { |
107 | | - close(); |
108 | | - } |
109 | | - return read; |
110 | | - } catch (StorageException e) { |
111 | | - if (e.getCode() == 416) { |
112 | | - // HttpStorageRpc turns 416 into a null etag with an empty byte array, leading |
113 | | - // BlobReadChannel to believe it read 0 bytes, returning -1 and leaving the channel open. |
114 | | - // Emulate that same behavior here to preserve behavior compatibility, though this should |
115 | | - // be removed in the next major version. |
116 | | - return -1; |
117 | | - } else { |
118 | | - throw new IOException(e); |
119 | | - } |
120 | | - } catch (IOException e) { |
121 | | - throw e; |
122 | | - } catch (Exception e) { |
123 | | - throw new IOException(StorageException.coalesce(e)); |
124 | | - } |
125 | 43 | } |
126 | 44 |
|
127 | 45 | @Override |
128 | 46 | public RestorableState<ReadChannel> capture() { |
129 | 47 | ApiaryReadRequest apiaryReadRequest = getApiaryReadRequest(); |
130 | 48 | return new BlobReadChannelV2State( |
131 | | - apiaryReadRequest, blobReadChannelContext.getStorageOptions(), chunkSize); |
| 49 | + apiaryReadRequest, blobReadChannelContext.getStorageOptions(), getChunkSize()); |
132 | 50 | } |
133 | 51 |
|
134 | | - private void maybeResetChannel(boolean umallocBuffer) throws IOException { |
135 | | - if (lazyReadChannel != null && lazyReadChannel.isOpen()) { |
136 | | - try (BufferedReadableByteChannel ignore = lazyReadChannel.getChannel()) { |
137 | | - if (bufferHandle != null && !umallocBuffer) { |
138 | | - bufferHandle.get().clear(); |
139 | | - } else if (umallocBuffer) { |
140 | | - bufferHandle = null; |
141 | | - } |
142 | | - lazyReadChannel = null; |
143 | | - } |
144 | | - } |
145 | | - } |
146 | | - |
147 | | - private LazyReadChannel<StorageObject> internalGetLazyChannel() { |
148 | | - if (lazyReadChannel == null) { |
149 | | - lazyReadChannel = newLazyReadChannel(); |
150 | | - } |
151 | | - return lazyReadChannel; |
152 | | - } |
153 | | - |
154 | | - private LazyReadChannel<StorageObject> newLazyReadChannel() { |
| 52 | + protected LazyReadChannel<StorageObject> newLazyReadChannel() { |
155 | 53 | return new LazyReadChannel<>( |
156 | | - () -> { |
157 | | - if (bufferHandle == null) { |
158 | | - bufferHandle = BufferHandle.allocate(chunkSize); |
159 | | - } |
160 | | - return ResumableMedia.http() |
161 | | - .read() |
162 | | - .byteChannel(blobReadChannelContext) |
163 | | - .setCallback(this::setResolvedObject) |
164 | | - .buffered(bufferHandle) |
165 | | - .setApiaryReadRequest(getApiaryReadRequest()) |
166 | | - .build(); |
167 | | - }); |
168 | | - } |
169 | | - |
170 | | - private void setResolvedObject(StorageObject resolvedObject) { |
171 | | - this.resolvedObject = resolvedObject; |
| 54 | + () -> |
| 55 | + ResumableMedia.http() |
| 56 | + .read() |
| 57 | + .byteChannel(blobReadChannelContext) |
| 58 | + .setCallback(this::setResolvedObject) |
| 59 | + .buffered(getBufferHandle()) |
| 60 | + .setApiaryReadRequest(getApiaryReadRequest()) |
| 61 | + .build()); |
172 | 62 | } |
173 | 63 |
|
174 | 64 | private ApiaryReadRequest getApiaryReadRequest() { |
175 | | - StorageObject object = resolvedObject != null ? resolvedObject : storageObject; |
176 | | - return new ApiaryReadRequest(object, opts, byteRangeSpec); |
| 65 | + StorageObject object = getResolvedObject() != null ? getResolvedObject() : storageObject; |
| 66 | + return new ApiaryReadRequest(object, opts, getByteRangeSpec()); |
177 | 67 | } |
178 | 68 |
|
179 | 69 | static class BlobReadChannelV2State implements RestorableState<ReadChannel>, Serializable { |
|
0 commit comments