Skip to content

Commit 067a190

Browse files
committed
Make integration tests work
1 parent f208590 commit 067a190

5 files changed

Lines changed: 40 additions & 15 deletions

File tree

google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.google.cloud.spanner.SpannerImpl.MultiUseReadOnlyTransaction;
2424
import com.google.cloud.spanner.SpannerImpl.SessionImpl;
2525
import com.google.cloud.spanner.spi.v1.SpannerRpc;
26+
import com.google.cloud.spanner.spi.v1.GapicSpannerRpc;
2627
import com.google.common.base.Preconditions;
2728
import com.google.common.collect.ImmutableList;
2829
import com.google.protobuf.Struct;
@@ -68,7 +69,7 @@ private static class BatchReadOnlyTransactionImpl extends MultiUseReadOnlyTransa
6869
super(
6970
checkNotNull(session),
7071
checkNotNull(bound),
71-
checkNotNull(spanner).getOptions().getSpannerRpcV1(),
72+
checkNotNull(spanner).getOptions().getGapicSpannerRpc(),
7273
spanner.getOptions().getPrefetchChunks());
7374
this.sessionName = session.getName();
7475
this.options = session.getOptions();
@@ -81,7 +82,7 @@ private static class BatchReadOnlyTransactionImpl extends MultiUseReadOnlyTransa
8182
checkNotNull(session),
8283
checkNotNull(batchTransactionId).getTransactionId(),
8384
batchTransactionId.getTimestamp(),
84-
checkNotNull(spanner).getOptions().getSpannerRpcV1(),
85+
checkNotNull(spanner).getOptions().getGapicSpannerRpc(),
8586
spanner.getOptions().getPrefetchChunks());
8687
this.sessionName = session.getName();
8788
this.options = session.getOptions();

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ class SpannerImpl extends BaseService<SpannerOptions> implements Spanner {
165165
SpannerImpl(SpannerOptions options) {
166166
this(
167167
options.getSpannerRpcV1(),
168-
GapicSpannerRpc.create(options),
168+
options.getGapicSpannerRpc(),
169169
options.getPrefetchChunks(),
170170
options);
171171
}
@@ -1063,7 +1063,9 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
10631063
null,
10641064
session.options));
10651065

1066-
// let resume fail for now
1066+
// Let resume fail for now. Gapic has its own resume, but in order not
1067+
// to introduce too much change at a time, we decide to plumb up
1068+
// ServerStream first and then figure out how to make resume work
10671069
}
10681070
};
10691071
return new GrpcResultSet(stream, this, queryMode);
@@ -1170,7 +1172,9 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
11701172
null,
11711173
session.options));
11721174

1173-
// let resume fail for now
1175+
// Let resume fail for now. Gapic has its own resume, but in order not
1176+
// to introduce too much change at a time, we decide to plumb up
1177+
// ServerStream first and then figure out how to make resume work
11741178
}
11751179
};
11761180
GrpcResultSet resultSet =
@@ -2288,17 +2292,32 @@ public CloseableServerStreamIterator(ServerStream<T> stream) {
22882292

22892293
@Override
22902294
public boolean hasNext() {
2291-
return iterator.hasNext();
2295+
try {
2296+
return iterator.hasNext();
2297+
}
2298+
catch (Exception e) {
2299+
throw SpannerExceptionFactory.newSpannerException(e);
2300+
}
22922301
}
22932302

22942303
@Override
22952304
public T next() {
2296-
return iterator.next();
2305+
try {
2306+
return iterator.next();
2307+
}
2308+
catch (Exception e) {
2309+
throw SpannerExceptionFactory.newSpannerException(e);
2310+
}
22972311
}
22982312

22992313
@Override
23002314
public void close(@Nullable String message) {
2301-
stream.cancel();
2315+
try {
2316+
stream.cancel();
2317+
}
2318+
catch (Exception e) {
2319+
throw SpannerExceptionFactory.newSpannerException(e);
2320+
}
23022321
}
23032322
}
23042323

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.google.cloud.ServiceOptions;
2222
import com.google.cloud.ServiceRpc;
2323
import com.google.cloud.TransportOptions;
24+
import com.google.cloud.spanner.spi.v1.GapicSpannerRpc;
2425
import com.google.cloud.spanner.spi.v1.GrpcSpannerRpc;
2526
import com.google.cloud.spanner.spi.v1.SpannerRpc;
2627
import com.google.cloud.spanner.spi.SpannerRpcFactory;
@@ -343,6 +344,10 @@ protected SpannerRpc getSpannerRpcV1() {
343344
return (SpannerRpc) getRpc();
344345
}
345346

347+
protected SpannerRpc getGapicSpannerRpc() {
348+
return GapicSpannerRpc.create(this);
349+
}
350+
346351
@SuppressWarnings("unchecked")
347352
@Override
348353
public Builder toBuilder() {

google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -402,8 +402,7 @@ public ServerStream<PartialResultSet> read(
402402
@Override
403403
public ServerStream<PartialResultSet> executeQuery(
404404
ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options) {
405-
GrpcCallContext context = GrpcCallContext.createDefault()
406-
.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue());
405+
GrpcCallContext context = newCallContext(options, request.getSession());
407406
return stub.executeStreamingSqlCallable().call(request, context);
408407
}
409408

google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchClientImplTest.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ public final class BatchClientImplTest {
4848
private static final ByteString TXN_ID = ByteString.copyFromUtf8("my-txn");
4949
private static final String TIMESTAMP = "2017-11-15T10:54:20Z";
5050

51-
@Mock private SpannerRpc rpc;
51+
@Mock private SpannerRpc rawGrpcRpc;
52+
@Mock private SpannerRpc gapicRpc;
5253
@Mock private SpannerOptions spannerOptions;
5354
@Captor private ArgumentCaptor<Map<SpannerRpc.Option, Object>> optionsCaptor;
5455
@Mock private BatchTransactionId txnID;
@@ -59,20 +60,20 @@ public final class BatchClientImplTest {
5960
public void setUp() {
6061
initMocks(this);
6162
DatabaseId db = DatabaseId.of(DB_NAME);
62-
SpannerImpl spanner = new SpannerImpl(rpc, rpc, 1, spannerOptions);
63+
SpannerImpl spanner = new SpannerImpl(rawGrpcRpc, gapicRpc, 1, spannerOptions);
6364
client = new BatchClientImpl(db, spanner);
6465
}
6566

6667
@Test
6768
public void testBatchReadOnlyTxnWithBound() throws Exception {
6869
Session sessionProto = Session.newBuilder().setName(SESSION_NAME).build();
69-
when(rpc.createSession(eq(DB_NAME), (Map<String, String>) anyMap(), optionsCaptor.capture()))
70+
when(gapicRpc.createSession(eq(DB_NAME), (Map<String, String>) anyMap(), optionsCaptor.capture()))
7071
.thenReturn(sessionProto);
7172
com.google.protobuf.Timestamp timestamp = Timestamps.parse(TIMESTAMP);
7273
Transaction txnMetadata =
7374
Transaction.newBuilder().setId(TXN_ID).setReadTimestamp(timestamp).build();
74-
when(spannerOptions.getSpannerRpcV1()).thenReturn(rpc);
75-
when(rpc.beginTransaction(Mockito.<BeginTransactionRequest>any(), optionsCaptor.capture()))
75+
when(spannerOptions.getGapicSpannerRpc()).thenReturn(gapicRpc);
76+
when(gapicRpc.beginTransaction(Mockito.<BeginTransactionRequest>any(), optionsCaptor.capture()))
7677
.thenReturn(txnMetadata);
7778

7879
BatchReadOnlyTransaction batchTxn = client.batchReadOnlyTransaction(TimestampBound.strong());

0 commit comments

Comments
 (0)