Skip to content

Commit 39829a0

Browse files
committed
Merge pull request #390 from mziccard/rewriter-check-blob-update
Make BlobReadChannel should fail if content is updated
2 parents a4c4273 + bfe8924 commit 39829a0

File tree

9 files changed

+128
-32
lines changed

9 files changed

+128
-32
lines changed

gcloud-java-storage/src/main/java/com/google/gcloud/spi/DefaultStorageRpc.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -429,8 +429,8 @@ public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) {
429429
}
430430

431431
@Override
432-
public byte[] read(StorageObject from, Map<Option, ?> options, long position, int bytes)
433-
throws StorageException {
432+
public Tuple<String, byte[]> read(StorageObject from, Map<Option, ?> options, long position,
433+
int bytes) throws StorageException {
434434
try {
435435
Get req = storage.objects()
436436
.get(from.getBucket(), from.getName())
@@ -439,12 +439,13 @@ public byte[] read(StorageObject from, Map<Option, ?> options, long position, in
439439
.setIfMetagenerationNotMatch(IF_METAGENERATION_NOT_MATCH.getLong(options))
440440
.setIfGenerationMatch(IF_GENERATION_MATCH.getLong(options))
441441
.setIfGenerationNotMatch(IF_GENERATION_NOT_MATCH.getLong(options));
442-
MediaHttpDownloader downloader = req.getMediaHttpDownloader();
443-
downloader.setContentRange(position, Ints.checkedCast(position + bytes - 1));
444-
downloader.setDirectDownloadEnabled(true);
442+
StringBuilder range = new StringBuilder();
443+
range.append("bytes=").append(position).append("-").append(position + bytes - 1);
444+
req.getRequestHeaders().setRange(range.toString());
445445
ByteArrayOutputStream output = new ByteArrayOutputStream();
446-
req.executeMediaAndDownloadTo(output);
447-
return output.toByteArray();
446+
req.executeMedia().download(output);
447+
String etag = req.getLastResponseHeaders().getETag();
448+
return Tuple.of(etag, output.toByteArray());
448449
} catch (IOException ex) {
449450
throw translate(ex);
450451
}

gcloud-java-storage/src/main/java/com/google/gcloud/spi/StorageRpc.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ StorageObject compose(Iterable<StorageObject> sources, StorageObject target,
259259
byte[] load(StorageObject storageObject, Map<Option, ?> options)
260260
throws StorageException;
261261

262-
byte[] read(StorageObject from, Map<Option, ?> options, long position, int bytes)
262+
Tuple<String, byte[]> read(StorageObject from, Map<Option, ?> options, long position, int bytes)
263263
throws StorageException;
264264

265265
String open(StorageObject object, Map<Option, ?> options) throws StorageException;

gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobReadChannel.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,8 @@
2626
/**
2727
* A channel for reading data from a Google Cloud Storage object.
2828
*
29-
* Implementations of this class may buffer data internally to reduce remote calls.
30-
*
31-
* This class is @{link Serializable}, which allows incremental reads.
29+
* Implementations of this class may buffer data internally to reduce remote calls. This interface
30+
* implements {@link Restorable} to allow saving the reader's state to continue reading afterwards.
3231
*/
3332
public interface BlobReadChannel extends ReadableByteChannel, Closeable,
3433
Restorable<BlobReadChannel> {

gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobReadChannelImpl.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.google.gcloud.RestorableState;
2424
import com.google.gcloud.RetryHelper;
2525
import com.google.gcloud.spi.StorageRpc;
26+
import com.google.gcloud.spi.StorageRpc.Tuple;
2627

2728
import java.io.IOException;
2829
import java.io.Serializable;
@@ -41,6 +42,7 @@ class BlobReadChannelImpl implements BlobReadChannel {
4142
private final StorageOptions serviceOptions;
4243
private final BlobId blob;
4344
private final Map<StorageRpc.Option, ?> requestOptions;
45+
private String lastEtag;
4446
private int position;
4547
private boolean isOpen;
4648
private boolean endOfStream;
@@ -117,12 +119,19 @@ public int read(ByteBuffer byteBuffer) throws IOException {
117119
}
118120
final int toRead = Math.max(byteBuffer.remaining(), chunkSize);
119121
try {
120-
buffer = runWithRetries(new Callable<byte[]>() {
122+
Tuple<String, byte[]> result = runWithRetries(new Callable<Tuple<String, byte[]>>() {
121123
@Override
122-
public byte[] call() {
124+
public Tuple<String, byte[]> call() {
123125
return storageRpc.read(storageObject, requestOptions, position, toRead);
124126
}
125127
}, serviceOptions.retryParams(), StorageImpl.EXCEPTION_HANDLER);
128+
if (lastEtag != null && !Objects.equals(result.x(), lastEtag)) {
129+
StringBuilder messageBuilder = new StringBuilder();
130+
messageBuilder.append("Blob ").append(blob).append(" was updated while reading");
131+
throw new StorageException(0, messageBuilder.toString(), false);
132+
}
133+
lastEtag = result.x();
134+
buffer = result.y();
126135
} catch (RetryHelper.RetryHelperException e) {
127136
throw StorageException.translateAndThrow(e);
128137
}
@@ -152,6 +161,7 @@ static class StateImpl implements RestorableState<BlobReadChannel>, Serializable
152161
private final StorageOptions serviceOptions;
153162
private final BlobId blob;
154163
private final Map<StorageRpc.Option, ?> requestOptions;
164+
private final String lastEtag;
155165
private final int position;
156166
private final boolean isOpen;
157167
private final boolean endOfStream;
@@ -161,6 +171,7 @@ static class StateImpl implements RestorableState<BlobReadChannel>, Serializable
161171
this.serviceOptions = builder.serviceOptions;
162172
this.blob = builder.blob;
163173
this.requestOptions = builder.requestOptions;
174+
this.lastEtag = builder.lastEtag;
164175
this.position = builder.position;
165176
this.isOpen = builder.isOpen;
166177
this.endOfStream = builder.endOfStream;
@@ -171,6 +182,7 @@ static class Builder {
171182
private final StorageOptions serviceOptions;
172183
private final BlobId blob;
173184
private final Map<StorageRpc.Option, ?> requestOptions;
185+
private String lastEtag;
174186
private int position;
175187
private boolean isOpen;
176188
private boolean endOfStream;
@@ -182,6 +194,11 @@ private Builder(StorageOptions options, BlobId blob, Map<StorageRpc.Option, ?> r
182194
this.requestOptions = reqOptions;
183195
}
184196

197+
Builder lastEtag(String lastEtag) {
198+
this.lastEtag = lastEtag;
199+
return this;
200+
}
201+
185202
Builder position(int position) {
186203
this.position = position;
187204
return this;
@@ -215,6 +232,7 @@ static Builder builder(
215232
@Override
216233
public BlobReadChannel restore() {
217234
BlobReadChannelImpl channel = new BlobReadChannelImpl(serviceOptions, blob, requestOptions);
235+
channel.lastEtag = lastEtag;
218236
channel.position = position;
219237
channel.isOpen = isOpen;
220238
channel.endOfStream = endOfStream;
@@ -224,8 +242,8 @@ public BlobReadChannel restore() {
224242

225243
@Override
226244
public int hashCode() {
227-
return Objects.hash(serviceOptions, blob, requestOptions, position, isOpen, endOfStream,
228-
chunkSize);
245+
return Objects.hash(serviceOptions, blob, requestOptions, lastEtag, position, isOpen,
246+
endOfStream, chunkSize);
229247
}
230248

231249
@Override
@@ -240,6 +258,7 @@ public boolean equals(Object obj) {
240258
return Objects.equals(this.serviceOptions, other.serviceOptions)
241259
&& Objects.equals(this.blob, other.blob)
242260
&& Objects.equals(this.requestOptions, other.requestOptions)
261+
&& Objects.equals(this.lastEtag, other.lastEtag)
243262
&& this.position == other.position
244263
&& this.isOpen == other.isOpen
245264
&& this.endOfStream == other.endOfStream

gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobWriteChannel.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@
2626
* A channel for writing data to a Google Cloud Storage object.
2727
*
2828
* Implementations of this class may further buffer data internally to reduce remote calls. Written
29-
* data will only be visible after calling {@link #close()}. This class is serializable, to allow
30-
* incremental writes.
29+
* data will only be visible after calling {@link #close()}. This interface implements
30+
* {@link Restorable} to allow saving the writer's state to continue writing afterwards.
3131
*/
3232
public interface BlobWriteChannel extends WritableByteChannel, Closeable,
3333
Restorable<BlobWriteChannel> {

gcloud-java-storage/src/main/java/com/google/gcloud/storage/Storage.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.io.InputStream;
3434
import java.io.Serializable;
3535
import java.net.URL;
36+
import java.nio.ByteBuffer;
3637
import java.util.Arrays;
3738
import java.util.Collections;
3839
import java.util.HashSet;
@@ -1405,14 +1406,29 @@ private static void checkContentType(BlobInfo blobInfo) throws IllegalArgumentEx
14051406
BatchResponse apply(BatchRequest batchRequest);
14061407

14071408
/**
1408-
* Return a channel for reading the blob's content.
1409+
* Return a channel for reading the blob's content. The blob's latest generation is read. If the
1410+
* blob changes while reading (i.e. {@link BlobInfo#etag()} changes), subsequent calls to
1411+
* {@code blobReadChannel.read(ByteBuffer)} may throw {@link StorageException}.
1412+
*
1413+
* <p>The {@link BlobSourceOption#generationMatch(long)} option can be provided to ensure that
1414+
* {@code blobReadChannel.read(ByteBuffer)} calls will throw {@link StorageException} if blob`s
1415+
* generation differs from the expected one.
14091416
*
14101417
* @throws StorageException upon failure
14111418
*/
14121419
BlobReadChannel reader(String bucket, String blob, BlobSourceOption... options);
14131420

14141421
/**
1415-
* Return a channel for reading the blob's content.
1422+
* Return a channel for reading the blob's content. If {@code blob.generation()} is set
1423+
* data corresponding to that generation is read. If {@code blob.generation()} is {@code null}
1424+
* the blob's latest generation is read. If the blob changes while reading (i.e.
1425+
* {@link BlobInfo#etag()} changes), subsequent calls to {@code blobReadChannel.read(ByteBuffer)}
1426+
* may throw {@link StorageException}.
1427+
*
1428+
* <p>The {@link BlobSourceOption#generationMatch()} and
1429+
* {@link BlobSourceOption#generationMatch(long)} options can be used to ensure that
1430+
* {@code blobReadChannel.read(ByteBuffer)} calls will throw {@link StorageException} if the
1431+
* blob`s generation differs from the expected one.
14161432
*
14171433
* @throws StorageException upon failure
14181434
*/

gcloud-java-storage/src/test/java/com/google/gcloud/storage/BlobReadChannelImplTest.java

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import static org.easymock.EasyMock.anyObject;
2020
import static org.easymock.EasyMock.createMock;
2121
import static org.easymock.EasyMock.expect;
22-
import static org.easymock.EasyMock.expectLastCall;
2322
import static org.easymock.EasyMock.replay;
2423
import static org.easymock.EasyMock.verify;
2524
import static org.junit.Assert.assertArrayEquals;
@@ -46,7 +45,7 @@ public class BlobReadChannelImplTest {
4645

4746
private static final String BUCKET_NAME = "b";
4847
private static final String BLOB_NAME = "n";
49-
private static final BlobId BLOB_ID = BlobId.of(BUCKET_NAME, BLOB_NAME);
48+
private static final BlobId BLOB_ID = BlobId.of(BUCKET_NAME, BLOB_NAME, -1L);
5049
private static final Map<StorageRpc.Option, ?> EMPTY_RPC_OPTIONS = ImmutableMap.of();
5150
private static final int DEFAULT_CHUNK_SIZE = 2 * 1024 * 1024;
5251
private static final int CUSTOM_CHUNK_SIZE = 2 * 1024 * 1024;
@@ -88,7 +87,7 @@ public void testReadBuffered() throws IOException {
8887
ByteBuffer firstReadBuffer = ByteBuffer.allocate(42);
8988
ByteBuffer secondReadBuffer = ByteBuffer.allocate(42);
9089
expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 0, DEFAULT_CHUNK_SIZE))
91-
.andReturn(result);
90+
.andReturn(StorageRpc.Tuple.of("etag", result));
9291
replay(storageRpcMock);
9392
reader.read(firstReadBuffer);
9493
reader.read(secondReadBuffer);
@@ -107,10 +106,11 @@ public void testReadBig() throws IOException {
107106
byte[] secondResult = randomByteArray(DEFAULT_CHUNK_SIZE);
108107
ByteBuffer firstReadBuffer = ByteBuffer.allocate(DEFAULT_CHUNK_SIZE);
109108
ByteBuffer secondReadBuffer = ByteBuffer.allocate(42);
110-
storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 0, DEFAULT_CHUNK_SIZE);
111-
expectLastCall().andReturn(firstResult);
112-
storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, DEFAULT_CHUNK_SIZE, CUSTOM_CHUNK_SIZE);
113-
expectLastCall().andReturn(secondResult);
109+
expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 0, DEFAULT_CHUNK_SIZE))
110+
.andReturn(StorageRpc.Tuple.of("etag", firstResult));
111+
expect(storageRpcMock.read(
112+
BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, DEFAULT_CHUNK_SIZE, CUSTOM_CHUNK_SIZE))
113+
.andReturn(StorageRpc.Tuple.of("etag", secondResult));
114114
replay(storageRpcMock);
115115
reader.read(firstReadBuffer);
116116
reader.read(secondReadBuffer);
@@ -125,7 +125,7 @@ public void testReadFinish() throws IOException {
125125
byte[] result = {};
126126
ByteBuffer readBuffer = ByteBuffer.allocate(DEFAULT_CHUNK_SIZE);
127127
expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 0, DEFAULT_CHUNK_SIZE))
128-
.andReturn(result);
128+
.andReturn(StorageRpc.Tuple.of("etag", result));
129129
replay(storageRpcMock);
130130
assertEquals(-1, reader.read(readBuffer));
131131
}
@@ -137,7 +137,7 @@ public void testSeek() throws IOException {
137137
byte[] result = randomByteArray(DEFAULT_CHUNK_SIZE);
138138
ByteBuffer readBuffer = ByteBuffer.allocate(DEFAULT_CHUNK_SIZE);
139139
expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 42, DEFAULT_CHUNK_SIZE))
140-
.andReturn(result);
140+
.andReturn(StorageRpc.Tuple.of("etag", result));
141141
replay(storageRpcMock);
142142
reader.read(readBuffer);
143143
assertArrayEquals(result, readBuffer.array());
@@ -166,16 +166,41 @@ public void testReadClosed() {
166166
}
167167
}
168168

169+
@Test
170+
public void testReadGenerationChanged() throws IOException {
171+
BlobId blobId = BlobId.of(BUCKET_NAME, BLOB_NAME);
172+
reader = new BlobReadChannelImpl(options, blobId, EMPTY_RPC_OPTIONS);
173+
byte[] firstResult = randomByteArray(DEFAULT_CHUNK_SIZE);
174+
byte[] secondResult = randomByteArray(DEFAULT_CHUNK_SIZE);
175+
ByteBuffer firstReadBuffer = ByteBuffer.allocate(DEFAULT_CHUNK_SIZE);
176+
ByteBuffer secondReadBuffer = ByteBuffer.allocate(DEFAULT_CHUNK_SIZE);
177+
expect(storageRpcMock.read(blobId.toPb(), EMPTY_RPC_OPTIONS, 0, DEFAULT_CHUNK_SIZE))
178+
.andReturn(StorageRpc.Tuple.of("etag1", firstResult));
179+
expect(storageRpcMock.read(
180+
blobId.toPb(), EMPTY_RPC_OPTIONS, DEFAULT_CHUNK_SIZE, DEFAULT_CHUNK_SIZE))
181+
.andReturn(StorageRpc.Tuple.of("etag2", firstResult));
182+
replay(storageRpcMock);
183+
reader.read(firstReadBuffer);
184+
try {
185+
reader.read(secondReadBuffer);
186+
fail("Expected BlobReadChannel read to throw StorageException");
187+
} catch (StorageException ex) {
188+
StringBuilder messageBuilder = new StringBuilder();
189+
messageBuilder.append("Blob ").append(blobId).append(" was updated while reading");
190+
assertEquals(messageBuilder.toString(), ex.getMessage());
191+
}
192+
}
193+
169194
@Test
170195
public void testSaveAndRestore() throws IOException, ClassNotFoundException {
171196
byte[] firstResult = randomByteArray(DEFAULT_CHUNK_SIZE);
172197
byte[] secondResult = randomByteArray(DEFAULT_CHUNK_SIZE);
173198
ByteBuffer firstReadBuffer = ByteBuffer.allocate(42);
174199
ByteBuffer secondReadBuffer = ByteBuffer.allocate(DEFAULT_CHUNK_SIZE);
175200
expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 0, DEFAULT_CHUNK_SIZE))
176-
.andReturn(firstResult);
201+
.andReturn(StorageRpc.Tuple.of("etag", firstResult));
177202
expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 42, DEFAULT_CHUNK_SIZE))
178-
.andReturn(secondResult);
203+
.andReturn(StorageRpc.Tuple.of("etag", secondResult));
179204
replay(storageRpcMock);
180205
reader = new BlobReadChannelImpl(options, BLOB_ID, EMPTY_RPC_OPTIONS);
181206
reader.read(firstReadBuffer);

gcloud-java-storage/src/test/java/com/google/gcloud/storage/ITStorageTest.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import java.util.Iterator;
5050
import java.util.List;
5151
import java.util.Map;
52+
import java.util.Random;
5253
import java.util.concurrent.ExecutionException;
5354
import java.util.concurrent.TimeUnit;
5455
import java.util.concurrent.TimeoutException;
@@ -731,6 +732,41 @@ public void testReadChannelFail() throws IOException {
731732
assertTrue(storage.delete(BUCKET, blobName));
732733
}
733734

735+
@Test
736+
public void testReadChannelFailUpdatedGeneration() throws IOException {
737+
String blobName = "test-read-blob-fail-updated-generation";
738+
BlobInfo blob = BlobInfo.builder(BUCKET, blobName).build();
739+
Random random = new Random();
740+
int chunkSize = 1024;
741+
int blobSize = 2 * chunkSize;
742+
byte[] content = new byte[blobSize];
743+
random.nextBytes(content);
744+
BlobInfo remoteBlob = storage.create(blob, content);
745+
assertNotNull(remoteBlob);
746+
assertEquals(blobSize, (long) remoteBlob.size());
747+
try (BlobReadChannel reader = storage.reader(blob.blobId())) {
748+
reader.chunkSize(chunkSize);
749+
ByteBuffer readBytes = ByteBuffer.allocate(chunkSize);
750+
int numReadBytes = reader.read(readBytes);
751+
assertEquals(chunkSize, numReadBytes);
752+
assertArrayEquals(Arrays.copyOf(content, chunkSize), readBytes.array());
753+
try (BlobWriteChannel writer = storage.writer(blob)) {
754+
byte[] newContent = new byte[blobSize];
755+
random.nextBytes(newContent);
756+
int numWrittenBytes = writer.write(ByteBuffer.wrap(newContent));
757+
assertEquals(blobSize, numWrittenBytes);
758+
}
759+
readBytes = ByteBuffer.allocate(chunkSize);
760+
reader.read(readBytes);
761+
fail("StorageException was expected");
762+
} catch(StorageException ex) {
763+
StringBuilder messageBuilder = new StringBuilder();
764+
messageBuilder.append("Blob ").append(blob.blobId()).append(" was updated while reading");
765+
assertEquals(messageBuilder.toString(), ex.getMessage());
766+
}
767+
assertTrue(storage.delete(BUCKET, blobName));
768+
}
769+
734770
@Test
735771
public void testWriteChannelFail() throws IOException {
736772
String blobName = "test-write-channel-blob-fail";

gcloud-java-storage/src/test/java/com/google/gcloud/storage/StorageImplTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1030,7 +1030,7 @@ public void testReaderWithOptions() throws IOException {
10301030
byte[] result = new byte[DEFAULT_CHUNK_SIZE];
10311031
EasyMock.expect(
10321032
storageRpcMock.read(BLOB_INFO2.toPb(), BLOB_SOURCE_OPTIONS, 0, DEFAULT_CHUNK_SIZE))
1033-
.andReturn(result);
1033+
.andReturn(StorageRpc.Tuple.of("etag", result));
10341034
EasyMock.replay(storageRpcMock);
10351035
storage = options.service();
10361036
BlobReadChannel channel = storage.reader(BUCKET_NAME1, BLOB_NAME2, BLOB_SOURCE_GENERATION,
@@ -1045,7 +1045,7 @@ public void testReaderWithOptionsFromBlobId() throws IOException {
10451045
byte[] result = new byte[DEFAULT_CHUNK_SIZE];
10461046
EasyMock.expect(
10471047
storageRpcMock.read(BLOB_INFO1.blobId().toPb(), BLOB_SOURCE_OPTIONS, 0, DEFAULT_CHUNK_SIZE))
1048-
.andReturn(result);
1048+
.andReturn(StorageRpc.Tuple.of("etag", result));
10491049
EasyMock.replay(storageRpcMock);
10501050
storage = options.service();
10511051
BlobReadChannel channel = storage.reader(BLOB_INFO1.blobId(),

0 commit comments

Comments
 (0)