Skip to content

Commit 02f0bc7

Browse files
authored
Spanner: migrate streaming methods to gapic (#3139)
* Make all the unit tests work. ITs are failing pending on another PR. * Make integration tests work * Documentation and format
1 parent 45e76f3 commit 02f0bc7

9 files changed

Lines changed: 227 additions & 64 deletions

File tree

google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.google.cloud.spanner.SpannerImpl.MultiUseReadOnlyTransaction;
2424
import com.google.cloud.spanner.SpannerImpl.SessionImpl;
2525
import com.google.cloud.spanner.spi.v1.SpannerRpc;
26+
import com.google.cloud.spanner.spi.v1.GapicSpannerRpc;
2627
import com.google.common.base.Preconditions;
2728
import com.google.common.collect.ImmutableList;
2829
import com.google.protobuf.Struct;
@@ -68,7 +69,7 @@ private static class BatchReadOnlyTransactionImpl extends MultiUseReadOnlyTransa
6869
super(
6970
checkNotNull(session),
7071
checkNotNull(bound),
71-
checkNotNull(spanner).getOptions().getSpannerRpcV1(),
72+
checkNotNull(spanner).getOptions().getGapicSpannerRpc(),
7273
spanner.getOptions().getPrefetchChunks());
7374
this.sessionName = session.getName();
7475
this.options = session.getOptions();
@@ -81,7 +82,7 @@ private static class BatchReadOnlyTransactionImpl extends MultiUseReadOnlyTransa
8182
checkNotNull(session),
8283
checkNotNull(batchTransactionId).getTransactionId(),
8384
batchTransactionId.getTimestamp(),
84-
checkNotNull(spanner).getOptions().getSpannerRpcV1(),
85+
checkNotNull(spanner).getOptions().getGapicSpannerRpc(),
8586
spanner.getOptions().getPrefetchChunks());
8687
this.sessionName = session.getName();
8788
this.options = session.getOptions();

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java

Lines changed: 68 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.google.api.client.util.BackOff;
2626
import com.google.api.client.util.ExponentialBackOff;
2727
import com.google.api.gax.paging.Page;
28+
import com.google.api.gax.rpc.ServerStream;
2829
import com.google.api.pathtemplate.PathTemplate;
2930
import com.google.cloud.BaseService;
3031
import com.google.cloud.ByteArray;
@@ -164,7 +165,7 @@ class SpannerImpl extends BaseService<SpannerOptions> implements Spanner {
164165
SpannerImpl(SpannerOptions options) {
165166
this(
166167
options.getSpannerRpcV1(),
167-
GapicSpannerRpc.create(options),
168+
options.getGapicSpannerRpc(),
168169
options.getPrefetchChunks(),
169170
options);
170171
}
@@ -828,7 +829,7 @@ public ReadContext singleUse() {
828829

829830
@Override
830831
public ReadContext singleUse(TimestampBound bound) {
831-
return setActive(new SingleReadContext(this, bound, rawGrpcRpc, defaultPrefetchChunks));
832+
return setActive(new SingleReadContext(this, bound, gapicRpc, defaultPrefetchChunks));
832833
}
833834

834835
@Override
@@ -839,7 +840,7 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction() {
839840
@Override
840841
public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) {
841842
return setActive(
842-
new SingleUseReadOnlyTransaction(this, bound, rawGrpcRpc, defaultPrefetchChunks));
843+
new SingleUseReadOnlyTransaction(this, bound, gapicRpc, defaultPrefetchChunks));
843844
}
844845

845846
@Override
@@ -850,12 +851,12 @@ public ReadOnlyTransaction readOnlyTransaction() {
850851
@Override
851852
public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
852853
return setActive(
853-
new MultiUseReadOnlyTransaction(this, bound, rawGrpcRpc, defaultPrefetchChunks));
854+
new MultiUseReadOnlyTransaction(this, bound, gapicRpc, defaultPrefetchChunks));
854855
}
855856

856857
@Override
857858
public TransactionRunner readWriteTransaction() {
858-
return setActive(new TransactionRunnerImpl(this, rawGrpcRpc, defaultPrefetchChunks));
859+
return setActive(new TransactionRunnerImpl(this, gapicRpc, defaultPrefetchChunks));
859860
}
860861

861862
@Override
@@ -1055,20 +1056,18 @@ ResultSet executeQueryInternalWithOptions(
10551056
new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, QUERY) {
10561057
@Override
10571058
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken) {
1058-
GrpcStreamIterator stream = new GrpcStreamIterator(prefetchChunks);
1059-
SpannerRpc.StreamingCall call =
1059+
return new CloseableServerStreamIterator<PartialResultSet>(
10601060
rpc.executeQuery(
10611061
resumeToken == null
10621062
? request
10631063
: request.toBuilder().setResumeToken(resumeToken).build(),
1064-
stream.consumer(),
1065-
session.options);
1066-
// We get one message for free.
1067-
if (prefetchChunks > 1) {
1068-
call.request(prefetchChunks - 1);
1069-
}
1070-
stream.setCall(call);
1071-
return stream;
1064+
null,
1065+
session.options));
1066+
1067+
// TODO(hzyi): make resume work
1068+
// Let resume fail for now. Gapic has its own resume, but in order not
1069+
// to introduce too much change at a time, we decide to plumb up
1070+
// ServerStream first and then figure out how to make resume work
10721071
}
10731072
};
10741073
return new GrpcResultSet(stream, this, queryMode);
@@ -1168,20 +1167,18 @@ ResultSet readInternalWithOptions(
11681167
new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, READ) {
11691168
@Override
11701169
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken) {
1171-
GrpcStreamIterator stream = new GrpcStreamIterator(prefetchChunks);
1172-
SpannerRpc.StreamingCall call =
1170+
return new CloseableServerStreamIterator<PartialResultSet>(
11731171
rpc.read(
11741172
resumeToken == null
11751173
? request
11761174
: request.toBuilder().setResumeToken(resumeToken).build(),
1177-
stream.consumer(),
1178-
session.options);
1179-
// We get one message for free.
1180-
if (prefetchChunks > 1) {
1181-
call.request(prefetchChunks - 1);
1182-
}
1183-
stream.setCall(call);
1184-
return stream;
1175+
null,
1176+
session.options));
1177+
1178+
// TODO(hzyi): make resume work
1179+
// Let resume fail for now. Gapic has its own resume, but in order not
1180+
// to introduce too much change at a time, we decide to plumb up
1181+
// ServerStream first and then figure out how to make resume work
11851182
}
11861183
};
11871184
GrpcResultSet resultSet =
@@ -2287,6 +2284,52 @@ interface CloseableIterator<T> extends Iterator<T> {
22872284
void close(@Nullable String message);
22882285
}
22892286

2287+
private static final class CloseableServerStreamIterator<T> implements CloseableIterator<T> {
2288+
2289+
private final ServerStream<T> stream;
2290+
private final Iterator<T> iterator;
2291+
2292+
public CloseableServerStreamIterator(ServerStream<T> stream) {
2293+
this.stream = stream;
2294+
this.iterator = stream.iterator();
2295+
}
2296+
2297+
@Override
2298+
public boolean hasNext() {
2299+
try {
2300+
return iterator.hasNext();
2301+
}
2302+
catch (Exception e) {
2303+
throw SpannerExceptionFactory.newSpannerException(e);
2304+
}
2305+
}
2306+
2307+
@Override
2308+
public T next() {
2309+
try {
2310+
return iterator.next();
2311+
}
2312+
catch (Exception e) {
2313+
throw SpannerExceptionFactory.newSpannerException(e);
2314+
}
2315+
}
2316+
2317+
@Override
2318+
public void remove() {
2319+
throw new UnsupportedOperationException("Not supported: remove.");
2320+
}
2321+
2322+
@Override
2323+
public void close(@Nullable String message) {
2324+
try {
2325+
stream.cancel();
2326+
}
2327+
catch (Exception e) {
2328+
throw SpannerExceptionFactory.newSpannerException(e);
2329+
}
2330+
}
2331+
}
2332+
22902333
/** Adapts a streaming read/query call into an iterator over partial result sets. */
22912334
@VisibleForTesting
22922335
static class GrpcStreamIterator extends AbstractIterator<PartialResultSet>

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.google.cloud.ServiceOptions;
2222
import com.google.cloud.ServiceRpc;
2323
import com.google.cloud.TransportOptions;
24+
import com.google.cloud.spanner.spi.v1.GapicSpannerRpc;
2425
import com.google.cloud.spanner.spi.v1.GrpcSpannerRpc;
2526
import com.google.cloud.spanner.spi.v1.SpannerRpc;
2627
import com.google.cloud.spanner.spi.SpannerRpcFactory;
@@ -343,6 +344,10 @@ protected SpannerRpc getSpannerRpcV1() {
343344
return (SpannerRpc) getRpc();
344345
}
345346

347+
protected SpannerRpc getGapicSpannerRpc() {
348+
return GapicSpannerRpc.create(this);
349+
}
350+
346351
@SuppressWarnings("unchecked")
347352
@Override
348353
public Builder toBuilder() {

google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.google.api.gax.rpc.ApiClientHeaderProvider;
2828
import com.google.api.gax.rpc.FixedTransportChannelProvider;
2929
import com.google.api.gax.rpc.HeaderProvider;
30+
import com.google.api.gax.rpc.ServerStream;
3031
import com.google.api.gax.rpc.StatusCode;
3132
import com.google.api.gax.rpc.TransportChannelProvider;
3233
import com.google.api.gax.rpc.UnaryCallSettings;
@@ -79,6 +80,7 @@
7980
import com.google.spanner.v1.PartitionQueryRequest;
8081
import com.google.spanner.v1.PartitionReadRequest;
8182
import com.google.spanner.v1.PartitionResponse;
83+
import com.google.spanner.v1.PartialResultSet;
8284
import com.google.spanner.v1.ReadRequest;
8385
import com.google.spanner.v1.RollbackRequest;
8486
import com.google.spanner.v1.Session;
@@ -391,16 +393,17 @@ public void deleteSession(String sessionName, @Nullable Map<Option, ?> options)
391393
}
392394

393395
@Override
394-
public StreamingCall read(
396+
public ServerStream<PartialResultSet> read(
395397
ReadRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options) {
396398
GrpcCallContext context = newCallContext(options, request.getSession());
397-
throw new UnsupportedOperationException("not implemented yet");
399+
return stub.streamingReadCallable().call(request, context);
398400
}
399401

400402
@Override
401-
public StreamingCall executeQuery(
403+
public ServerStream<PartialResultSet> executeQuery(
402404
ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options) {
403-
throw new UnsupportedOperationException("Not implemented yet.");
405+
GrpcCallContext context = newCallContext(options, request.getSession());
406+
return stub.executeStreamingSqlCallable().call(request, context);
404407
}
405408

406409
@Override

google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcSpannerRpc.java

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.google.api.gax.grpc.GaxGrpcProperties;
2323
import com.google.api.gax.rpc.ApiClientHeaderProvider;
2424
import com.google.api.gax.rpc.HeaderProvider;
25+
import com.google.api.gax.rpc.ServerStream;
2526
import com.google.api.pathtemplate.PathTemplate;
2627
import com.google.cloud.NoCredentials;
2728
import com.google.cloud.ServiceOptions;
@@ -366,25 +367,15 @@ public void deleteSession(String sessionName, @Nullable Map<Option, ?> options)
366367
}
367368

368369
@Override
369-
public StreamingCall read(
370+
public ServerStream<PartialResultSet> read(
370371
ReadRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options) {
371-
return doStreamingCall(
372-
SpannerGrpc.METHOD_STREAMING_READ,
373-
request,
374-
consumer,
375-
request.getSession(),
376-
Option.CHANNEL_HINT.getLong(options));
372+
throw new UnsupportedOperationException("Not implemented: read");
377373
}
378374

379375
@Override
380-
public StreamingCall executeQuery(
376+
public ServerStream<PartialResultSet> executeQuery(
381377
ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options) {
382-
return doStreamingCall(
383-
SpannerGrpc.METHOD_EXECUTE_STREAMING_SQL,
384-
request,
385-
consumer,
386-
request.getSession(),
387-
Option.CHANNEL_HINT.getLong(options));
378+
throw new UnsupportedOperationException("Not implemented: executeQuery");
388379
}
389380

390381
@Override

google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.google.cloud.spanner.spi.v1;
1818

19+
import com.google.api.gax.rpc.ServerStream;
1920
import com.google.cloud.ServiceRpc;
2021
import com.google.cloud.spanner.SpannerException;
2122
import com.google.cloud.spanner.spi.v1.SpannerRpc.Option;
@@ -197,10 +198,10 @@ Session createSession(String databaseName, @Nullable Map<String, String> labels,
197198

198199
void deleteSession(String sessionName, @Nullable Map<Option, ?> options) throws SpannerException;
199200

200-
StreamingCall read(
201+
ServerStream<PartialResultSet> read(
201202
ReadRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options);
202203

203-
StreamingCall executeQuery(
204+
ServerStream<PartialResultSet> executeQuery(
204205
ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options);
205206

206207
Transaction beginTransaction(BeginTransactionRequest request, @Nullable Map<Option, ?> options)

google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchClientImplTest.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ public final class BatchClientImplTest {
4848
private static final ByteString TXN_ID = ByteString.copyFromUtf8("my-txn");
4949
private static final String TIMESTAMP = "2017-11-15T10:54:20Z";
5050

51-
@Mock private SpannerRpc rpc;
51+
@Mock private SpannerRpc rawGrpcRpc;
52+
@Mock private SpannerRpc gapicRpc;
5253
@Mock private SpannerOptions spannerOptions;
5354
@Captor private ArgumentCaptor<Map<SpannerRpc.Option, Object>> optionsCaptor;
5455
@Mock private BatchTransactionId txnID;
@@ -59,20 +60,20 @@ public final class BatchClientImplTest {
5960
public void setUp() {
6061
initMocks(this);
6162
DatabaseId db = DatabaseId.of(DB_NAME);
62-
SpannerImpl spanner = new SpannerImpl(rpc, rpc, 1, spannerOptions);
63+
SpannerImpl spanner = new SpannerImpl(rawGrpcRpc, gapicRpc, 1, spannerOptions);
6364
client = new BatchClientImpl(db, spanner);
6465
}
6566

6667
@Test
6768
public void testBatchReadOnlyTxnWithBound() throws Exception {
6869
Session sessionProto = Session.newBuilder().setName(SESSION_NAME).build();
69-
when(rpc.createSession(eq(DB_NAME), (Map<String, String>) anyMap(), optionsCaptor.capture()))
70+
when(gapicRpc.createSession(eq(DB_NAME), (Map<String, String>) anyMap(), optionsCaptor.capture()))
7071
.thenReturn(sessionProto);
7172
com.google.protobuf.Timestamp timestamp = Timestamps.parse(TIMESTAMP);
7273
Transaction txnMetadata =
7374
Transaction.newBuilder().setId(TXN_ID).setReadTimestamp(timestamp).build();
74-
when(spannerOptions.getSpannerRpcV1()).thenReturn(rpc);
75-
when(rpc.beginTransaction(Mockito.<BeginTransactionRequest>any(), optionsCaptor.capture()))
75+
when(spannerOptions.getGapicSpannerRpc()).thenReturn(gapicRpc);
76+
when(gapicRpc.beginTransaction(Mockito.<BeginTransactionRequest>any(), optionsCaptor.capture()))
7677
.thenReturn(txnMetadata);
7778

7879
BatchReadOnlyTransaction batchTxn = client.batchReadOnlyTransaction(TimestampBound.strong());

0 commit comments

Comments
 (0)