mergedList = new ArrayList<>();
+ mergedList.addAll(last.getListValue().getValuesList());
+ concatLists(mergedList, first.getListValue().getValuesList());
+ merged =
+ com.google.protobuf.Value.newBuilder()
+ .setListValue(ListValue.newBuilder().addAllValues(mergedList))
+ .build();
+ }
+ a.set(a.size() - 1, merged);
+ a.addAll(b.subList(1, b.size()));
+ } else {
+ a.addAll(b);
+ }
+ }
+ }
+
+ private boolean isMergeable(KindCase kind) {
+ return kind == KindCase.STRING_VALUE || kind == KindCase.LIST_VALUE;
+ }
+}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java
index 57feabbfcca..76d0f24225a 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java
@@ -243,6 +243,10 @@ public static ReadAndQueryOption directedRead(DirectedReadOptions directedReadOp
return new DirectedReadOption(directedReadOptions);
}
+ public static ReadAndQueryOption decodeMode(DecodeMode decodeMode) {
+ return new DecodeOption(decodeMode);
+ }
+
/** Option to request {@link CommitStats} for read/write transactions. */
static final class CommitStatsOption extends InternalOption implements TransactionOption {
@Override
@@ -374,6 +378,19 @@ void appendToOptions(Options options) {
}
}
+ static final class DecodeOption extends InternalOption implements ReadAndQueryOption {
+ private final DecodeMode decodeMode;
+
+ DecodeOption(DecodeMode decodeMode) {
+ this.decodeMode = Preconditions.checkNotNull(decodeMode, "DecodeMode cannot be null");
+ }
+
+ @Override
+ void appendToOptions(Options options) {
+ options.decodeMode = decodeMode;
+ }
+ }
+
private boolean withCommitStats;
private Duration maxCommitDelay;
@@ -391,6 +408,7 @@ void appendToOptions(Options options) {
private Boolean withOptimisticLock;
private Boolean dataBoostEnabled;
private DirectedReadOptions directedReadOptions;
+ private DecodeMode decodeMode;
// Construction is via factory methods below.
private Options() {}
@@ -507,6 +525,14 @@ DirectedReadOptions directedReadOptions() {
return directedReadOptions;
}
+ boolean hasDecodeMode() {
+ return decodeMode != null;
+ }
+
+ DecodeMode decodeMode() {
+ return decodeMode;
+ }
+
@Override
public String toString() {
StringBuilder b = new StringBuilder();
@@ -552,6 +578,9 @@ public String toString() {
if (directedReadOptions != null) {
b.append("directedReadOptions: ").append(directedReadOptions).append(' ');
}
+ if (decodeMode != null) {
+ b.append("decodeMode: ").append(decodeMode).append(' ');
+ }
return b.toString();
}
@@ -640,6 +669,9 @@ public int hashCode() {
if (directedReadOptions != null) {
result = 31 * result + directedReadOptions.hashCode();
}
+ if (decodeMode != null) {
+ result = 31 * result + decodeMode.hashCode();
+ }
return result;
}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ProtobufResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ProtobufResultSet.java
new file mode 100644
index 00000000000..bbd8c41291f
--- /dev/null
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ProtobufResultSet.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.spanner;
+
+import com.google.api.core.InternalApi;
+import com.google.protobuf.Value;
+
+/** Interface for {@link ResultSet}s that can return a protobuf value. */
+@InternalApi
+public interface ProtobufResultSet extends ResultSet {
+
+ /** Returns true if the protobuf value for the given column is still available. */
+ boolean canGetProtobufValue(int columnIndex);
+
+ /**
+ * Returns the column value as a protobuf value.
+ *
+ * This is an internal method not intended for external usage.
+ *
+ *
This method may only be called before the column value has been decoded to a plain Java
+ * object. This means that the {@link DecodeMode} that is used for the {@link ResultSet} must be
+ * one of {@link DecodeMode#LAZY_PER_ROW} and {@link DecodeMode#LAZY_PER_COL}, and that the
+ * corresponding {@link ResultSet#getValue(int)}, {@link ResultSet#getBoolean(int)}, ... method
+ * may not yet have been called for the column.
+ */
+ @InternalApi
+ Value getProtobufValue(int columnIndex);
+}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResultSets.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResultSets.java
index d55d4091b9f..a6cc7c729e5 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResultSets.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResultSets.java
@@ -109,7 +109,7 @@ public ResultSet get() {
}
}
- private static class PrePopulatedResultSet implements ResultSet {
+ private static class PrePopulatedResultSet implements ProtobufResultSet {
private final List rows;
private final Type type;
private int index = -1;
@@ -137,6 +137,19 @@ public boolean next() throws SpannerException {
return ++index < rows.size();
}
+ @Override
+ public boolean canGetProtobufValue(int columnIndex) {
+ return !closed && index >= 0 && index < rows.size();
+ }
+
+ @Override
+ public com.google.protobuf.Value getProtobufValue(int columnIndex) {
+ Preconditions.checkState(!closed, "ResultSet is closed");
+ Preconditions.checkState(index >= 0, "Must be preceded by a next() call");
+ Preconditions.checkElementIndex(index, rows.size(), "All rows have been yielded");
+ return getValue(columnIndex).toProto();
+ }
+
@Override
public Struct getCurrentRowAsStruct() {
Preconditions.checkState(!closed, "ResultSet is closed");
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java
new file mode 100644
index 00000000000..590797c0999
--- /dev/null
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java
@@ -0,0 +1,277 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.spanner;
+
+import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;
+import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerExceptionForCancellation;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.ExponentialBackOff;
+import com.google.api.gax.grpc.GrpcStatusCode;
+import com.google.api.gax.retrying.RetrySettings;
+import com.google.api.gax.rpc.StatusCode.Code;
+import com.google.cloud.spanner.AbstractResultSet.CloseableIterator;
+import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.AbstractIterator;
+import com.google.protobuf.ByteString;
+import com.google.spanner.v1.PartialResultSet;
+import io.grpc.Context;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import javax.annotation.Nullable;
+
+/**
+ * Wraps an iterator over partial result sets, supporting resuming RPCs on error. This class keeps
+ * track of the most recent resume token seen, and will buffer partial result set chunks that do not
+ * have a resume token until one is seen or buffer space is exceeded, which reduces the chance of
+ * yielding data to the caller that cannot be resumed.
+ */
+@VisibleForTesting
+abstract class ResumableStreamIterator extends AbstractIterator
+ implements CloseableIterator {
+ private static final RetrySettings DEFAULT_STREAMING_RETRY_SETTINGS =
+ SpannerStubSettings.newBuilder().executeStreamingSqlSettings().getRetrySettings();
+ private final RetrySettings streamingRetrySettings;
+ private final Set retryableCodes;
+ private static final Logger logger = Logger.getLogger(ResumableStreamIterator.class.getName());
+ private final BackOff backOff;
+ private final LinkedList buffer = new LinkedList<>();
+ private final int maxBufferSize;
+ private final ISpan span;
+ private final TraceWrapper tracer;
+ private CloseableIterator stream;
+ private ByteString resumeToken;
+ private boolean finished;
+ /**
+ * Indicates whether it is currently safe to retry RPCs. This will be {@code false} if we have
+ * reached the maximum buffer size without seeing a restart token; in this case, we will drain the
+ * buffer and remain in this state until we see a new restart token.
+ */
+ private boolean safeToRetry = true;
+
+ protected ResumableStreamIterator(
+ int maxBufferSize,
+ String streamName,
+ ISpan parent,
+ TraceWrapper tracer,
+ RetrySettings streamingRetrySettings,
+ Set retryableCodes) {
+ checkArgument(maxBufferSize >= 0);
+ this.maxBufferSize = maxBufferSize;
+ this.tracer = tracer;
+ this.span = tracer.spanBuilderWithExplicitParent(streamName, parent);
+ this.streamingRetrySettings = Preconditions.checkNotNull(streamingRetrySettings);
+ this.retryableCodes = Preconditions.checkNotNull(retryableCodes);
+ this.backOff = newBackOff();
+ }
+
+ private ExponentialBackOff newBackOff() {
+ if (Objects.equals(streamingRetrySettings, DEFAULT_STREAMING_RETRY_SETTINGS)) {
+ return new ExponentialBackOff.Builder()
+ .setMultiplier(streamingRetrySettings.getRetryDelayMultiplier())
+ .setInitialIntervalMillis(
+ Math.max(10, (int) streamingRetrySettings.getInitialRetryDelay().toMillis()))
+ .setMaxIntervalMillis(
+ Math.max(1000, (int) streamingRetrySettings.getMaxRetryDelay().toMillis()))
+ .setMaxElapsedTimeMillis(Integer.MAX_VALUE) // Prevent Backoff.STOP from getting returned.
+ .build();
+ }
+ return new ExponentialBackOff.Builder()
+ .setMultiplier(streamingRetrySettings.getRetryDelayMultiplier())
+ // All of these values must be > 0.
+ .setInitialIntervalMillis(
+ Math.max(
+ 1,
+ (int)
+ Math.min(
+ streamingRetrySettings.getInitialRetryDelay().toMillis(),
+ Integer.MAX_VALUE)))
+ .setMaxIntervalMillis(
+ Math.max(
+ 1,
+ (int)
+ Math.min(
+ streamingRetrySettings.getMaxRetryDelay().toMillis(), Integer.MAX_VALUE)))
+ .setMaxElapsedTimeMillis(
+ Math.max(
+ 1,
+ (int)
+ Math.min(
+ streamingRetrySettings.getTotalTimeout().toMillis(), Integer.MAX_VALUE)))
+ .build();
+ }
+
+ private void backoffSleep(Context context, BackOff backoff) throws SpannerException {
+ backoffSleep(context, nextBackOffMillis(backoff));
+ }
+
+ private static long nextBackOffMillis(BackOff backoff) throws SpannerException {
+ try {
+ return backoff.nextBackOffMillis();
+ } catch (IOException e) {
+ throw newSpannerException(ErrorCode.INTERNAL, e.getMessage(), e);
+ }
+ }
+
+ private void backoffSleep(Context context, long backoffMillis) throws SpannerException {
+ tracer.getCurrentSpan().addAnnotation("Backing off", "Delay", backoffMillis);
+ final CountDownLatch latch = new CountDownLatch(1);
+ final Context.CancellationListener listener =
+ ignored -> {
+ // Wakeup on cancellation / DEADLINE_EXCEEDED.
+ latch.countDown();
+ };
+
+ context.addListener(listener, DirectExecutor.INSTANCE);
+ try {
+ if (backoffMillis == BackOff.STOP) {
+ // Highly unlikely but we handle it just in case.
+ backoffMillis = streamingRetrySettings.getMaxRetryDelay().toMillis();
+ }
+ if (latch.await(backoffMillis, TimeUnit.MILLISECONDS)) {
+ // Woken by context cancellation.
+ throw newSpannerExceptionForCancellation(context, null);
+ }
+ } catch (InterruptedException interruptExcept) {
+ throw newSpannerExceptionForCancellation(context, interruptExcept);
+ } finally {
+ context.removeListener(listener);
+ }
+ }
+
+ private enum DirectExecutor implements Executor {
+ INSTANCE;
+
+ @Override
+ public void execute(Runnable command) {
+ command.run();
+ }
+ }
+
+ abstract CloseableIterator startStream(@Nullable ByteString resumeToken);
+
+ @Override
+ public void close(@Nullable String message) {
+ if (stream != null) {
+ stream.close(message);
+ span.end();
+ stream = null;
+ }
+ }
+
+ @Override
+ public boolean isWithBeginTransaction() {
+ return stream != null && stream.isWithBeginTransaction();
+ }
+
+ @Override
+ protected PartialResultSet computeNext() {
+ Context context = Context.current();
+ while (true) {
+ // Eagerly start stream before consuming any buffered items.
+ if (stream == null) {
+ span.addAnnotation(
+ "Starting/Resuming stream",
+ "ResumeToken",
+ resumeToken == null ? "null" : resumeToken.toStringUtf8());
+ try (IScope scope = tracer.withSpan(span)) {
+ // When start a new stream set the Span as current to make the gRPC Span a child of
+ // this Span.
+ stream = checkNotNull(startStream(resumeToken));
+ }
+ }
+ // Buffer contains items up to a resume token or has reached capacity: flush.
+ if (!buffer.isEmpty()
+ && (finished || !safeToRetry || !buffer.getLast().getResumeToken().isEmpty())) {
+ return buffer.pop();
+ }
+ try {
+ if (stream.hasNext()) {
+ PartialResultSet next = stream.next();
+ boolean hasResumeToken = !next.getResumeToken().isEmpty();
+ if (hasResumeToken) {
+ resumeToken = next.getResumeToken();
+ safeToRetry = true;
+ }
+ // If the buffer is empty and this chunk has a resume token or we cannot resume safely
+ // anyway, we can yield it immediately rather than placing it in the buffer to be
+ // returned on the next iteration.
+ if ((hasResumeToken || !safeToRetry) && buffer.isEmpty()) {
+ return next;
+ }
+ buffer.add(next);
+ if (buffer.size() > maxBufferSize && buffer.getLast().getResumeToken().isEmpty()) {
+ // We need to flush without a restart token. Errors encountered until we see
+ // such a token will fail the read.
+ safeToRetry = false;
+ }
+ } else {
+ finished = true;
+ if (buffer.isEmpty()) {
+ endOfData();
+ return null;
+ }
+ }
+ } catch (SpannerException spannerException) {
+ if (safeToRetry && isRetryable(spannerException)) {
+ span.addAnnotation("Stream broken. Safe to retry", spannerException);
+ logger.log(Level.FINE, "Retryable exception, will sleep and retry", spannerException);
+ // Truncate any items in the buffer before the last retry token.
+ while (!buffer.isEmpty() && buffer.getLast().getResumeToken().isEmpty()) {
+ buffer.removeLast();
+ }
+ assert buffer.isEmpty() || buffer.getLast().getResumeToken().equals(resumeToken);
+ stream = null;
+ try (IScope s = tracer.withSpan(span)) {
+ long delay = spannerException.getRetryDelayInMillis();
+ if (delay != -1) {
+ backoffSleep(context, delay);
+ } else {
+ backoffSleep(context, backOff);
+ }
+ }
+
+ continue;
+ }
+ span.addAnnotation("Stream broken. Not safe to retry", spannerException);
+ span.setStatus(spannerException);
+ throw spannerException;
+ } catch (RuntimeException e) {
+ span.addAnnotation("Stream broken. Not safe to retry", e);
+ span.setStatus(e);
+ throw e;
+ }
+ }
+ }
+
+ boolean isRetryable(SpannerException spannerException) {
+ return spannerException.isRetryable()
+ || retryableCodes.contains(
+ GrpcStatusCode.of(spannerException.getErrorCode().getGrpcStatusCode()).getCode());
+ }
+}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java
index 29928f61cec..81b00001105 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java
@@ -263,6 +263,7 @@ public ReadContext singleUse(TimestampBound bound) {
.setRpc(spanner.getRpc())
.setDefaultQueryOptions(spanner.getDefaultQueryOptions(databaseId))
.setDefaultPrefetchChunks(spanner.getDefaultPrefetchChunks())
+ .setDefaultDecodeMode(spanner.getDefaultDecodeMode())
.setDefaultDirectedReadOptions(spanner.getOptions().getDirectedReadOptions())
.setSpan(currentSpan)
.setTracer(tracer)
@@ -284,6 +285,7 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) {
.setRpc(spanner.getRpc())
.setDefaultQueryOptions(spanner.getDefaultQueryOptions(databaseId))
.setDefaultPrefetchChunks(spanner.getDefaultPrefetchChunks())
+ .setDefaultDecodeMode(spanner.getDefaultDecodeMode())
.setDefaultDirectedReadOptions(spanner.getOptions().getDirectedReadOptions())
.setSpan(currentSpan)
.setTracer(tracer)
@@ -305,6 +307,7 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
.setRpc(spanner.getRpc())
.setDefaultQueryOptions(spanner.getDefaultQueryOptions(databaseId))
.setDefaultPrefetchChunks(spanner.getDefaultPrefetchChunks())
+ .setDefaultDecodeMode(spanner.getDefaultDecodeMode())
.setDefaultDirectedReadOptions(spanner.getOptions().getDirectedReadOptions())
.setSpan(currentSpan)
.setTracer(tracer)
@@ -423,6 +426,7 @@ TransactionContextImpl newTransaction(Options options) {
.setRpc(spanner.getRpc())
.setDefaultQueryOptions(spanner.getDefaultQueryOptions(databaseId))
.setDefaultPrefetchChunks(spanner.getDefaultPrefetchChunks())
+ .setDefaultDecodeMode(spanner.getDefaultDecodeMode())
.setSpan(currentSpan)
.setTracer(tracer)
.setExecutorProvider(spanner.getAsyncExecutorProvider())
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Spanner.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Spanner.java
index 52c35cb7130..7ccbc88d978 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Spanner.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Spanner.java
@@ -26,7 +26,12 @@
* quota.
*/
public interface Spanner extends Service, AutoCloseable {
- /** Returns a {@code DatabaseAdminClient} to do admin operations on Cloud Spanner databases. */
+
+ /**
+ * Returns a {@code DatabaseAdminClient} to execute admin operations on Cloud Spanner databases.
+ *
+ * @return {@code DatabaseAdminClient}
+ */
/*
*
* {@code
@@ -38,7 +43,34 @@ public interface Spanner extends Service, AutoCloseable {
*/
DatabaseAdminClient getDatabaseAdminClient();
- /** Returns an {@code InstanceAdminClient} to do admin operations on Cloud Spanner instances. */
+ /**
+ * Returns a {@link com.google.cloud.spanner.admin.database.v1.DatabaseAdminClient} to execute
+ * admin operations on Cloud Spanner databases. This method always creates a new instance of
+ * {@link com.google.cloud.spanner.admin.database.v1.DatabaseAdminClient} which is an {@link
+ * AutoCloseable} resource. For optimising the number of clients, caller may choose to cache the
+ * clients instead of repeatedly invoking this method and creating new instances.
+ *
+ * @return {@link com.google.cloud.spanner.admin.database.v1.DatabaseAdminClient}
+ */
+ /*
+ *
+ * {@code
+ * SpannerOptions options = SpannerOptions.newBuilder().build();
+ * Spanner spanner = options.getService();
+ * DatabaseAdminClient dbAdminClient = spanner.createDatabaseAdminClient();
+ * }
+ *
+ */
+ default com.google.cloud.spanner.admin.database.v1.DatabaseAdminClient
+ createDatabaseAdminClient() {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ /**
+ * Returns an {@code InstanceAdminClient} to execute admin operations on Cloud Spanner instances.
+ *
+ * @return {@code InstanceAdminClient}
+ */
/*
*
* {@code
@@ -50,6 +82,29 @@ public interface Spanner extends Service, AutoCloseable {
*/
InstanceAdminClient getInstanceAdminClient();
+ /**
+ * Returns a {@link com.google.cloud.spanner.admin.instance.v1.InstanceAdminClient} to execute
+ * admin operations on Cloud Spanner databases. This method always creates a new instance of
+ * {@link com.google.cloud.spanner.admin.instance.v1.InstanceAdminClient} which is an {@link
+ * AutoCloseable} resource. For optimising the number of clients, caller may choose to cache the
+ * clients instead of repeatedly invoking this method and creating new instances.
+ *
+ * @return {@link com.google.cloud.spanner.admin.instance.v1.InstanceAdminClient}
+ */
+ /*
+ *
+ * {@code
+ * SpannerOptions options = SpannerOptions.newBuilder().build();
+ * Spanner spanner = options.getService();
+ * InstanceAdminClient instanceAdminClient = spanner.createInstanceAdminClient();
+ * }
+ *
+ */
+ default com.google.cloud.spanner.admin.instance.v1.InstanceAdminClient
+ createInstanceAdminClient() {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
/**
* Returns a {@code DatabaseClient} for the given database. It uses a pool of sessions to talk to
* the database.
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java
index 326a51d803e..2ab75d74174 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java
@@ -25,6 +25,8 @@
import com.google.cloud.grpc.GrpcTransportOptions;
import com.google.cloud.spanner.SessionClient.SessionId;
import com.google.cloud.spanner.SpannerOptions.CloseableExecutorProvider;
+import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStubSettings;
+import com.google.cloud.spanner.admin.instance.v1.stub.InstanceAdminStubSettings;
import com.google.cloud.spanner.spi.v1.GapicSpannerRpc;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.cloud.spanner.spi.v1.SpannerRpc.Paginated;
@@ -40,6 +42,7 @@
import io.opencensus.trace.Tracing;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -151,6 +154,10 @@ int getDefaultPrefetchChunks() {
return getOptions().getPrefetchChunks();
}
+ DecodeMode getDefaultDecodeMode() {
+ return getOptions().getDecodeMode();
+ }
+
/** Returns the default query options that should be used for the specified database. */
QueryOptions getDefaultQueryOptions(DatabaseId databaseId) {
return getOptions().getDefaultQueryOptions(databaseId);
@@ -203,11 +210,37 @@ public DatabaseAdminClient getDatabaseAdminClient() {
return dbAdminClient;
}
+ @Override
+ public com.google.cloud.spanner.admin.database.v1.DatabaseAdminClient
+ createDatabaseAdminClient() {
+ try {
+ final DatabaseAdminStubSettings settings =
+ Preconditions.checkNotNull(gapicRpc.getDatabaseAdminStubSettings());
+ return com.google.cloud.spanner.admin.database.v1.DatabaseAdminClient.create(
+ settings.createStub());
+ } catch (IOException ex) {
+ throw SpannerExceptionFactory.newSpannerException(ex);
+ }
+ }
+
@Override
public InstanceAdminClient getInstanceAdminClient() {
return instanceClient;
}
+ @Override
+ public com.google.cloud.spanner.admin.instance.v1.InstanceAdminClient
+ createInstanceAdminClient() {
+ try {
+ final InstanceAdminStubSettings settings =
+ Preconditions.checkNotNull(gapicRpc.getInstanceAdminStubSettings());
+ return com.google.cloud.spanner.admin.instance.v1.InstanceAdminClient.create(
+ settings.createStub());
+ } catch (IOException ex) {
+ throw SpannerExceptionFactory.newSpannerException(ex);
+ }
+ }
+
@Override
public DatabaseClient getDatabaseClient(DatabaseId db) {
synchronized (this) {
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java
index 9c6044aa938..a16be179ce3 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java
@@ -113,6 +113,7 @@ public class SpannerOptions extends ServiceOptions {
private final GrpcInterceptorProvider interceptorProvider;
private final SessionPoolOptions sessionPoolOptions;
private final int prefetchChunks;
+ private final DecodeMode decodeMode;
private final int numChannels;
private final String transportChannelExecutorThreadNameFormat;
private final String databaseRole;
@@ -616,6 +617,7 @@ protected SpannerOptions(Builder builder) {
? builder.sessionPoolOptions
: SessionPoolOptions.newBuilder().build();
prefetchChunks = builder.prefetchChunks;
+ decodeMode = builder.decodeMode;
databaseRole = builder.databaseRole;
sessionLabels = builder.sessionLabels;
try {
@@ -704,6 +706,9 @@ public static class Builder
extends ServiceOptions.Builder {
static final int DEFAULT_PREFETCH_CHUNKS = 4;
static final QueryOptions DEFAULT_QUERY_OPTIONS = QueryOptions.getDefaultInstance();
+ // TODO: Set the default to DecodeMode.DIRECT before merging to keep the current default.
+ // It is currently set to LAZY_PER_COL so it is used in all tests.
+ static final DecodeMode DEFAULT_DECODE_MODE = DecodeMode.LAZY_PER_COL;
static final RetrySettings DEFAULT_ADMIN_REQUESTS_LIMIT_EXCEEDED_RETRY_SETTINGS =
RetrySettings.newBuilder()
.setInitialRetryDelay(Duration.ofSeconds(5L))
@@ -730,6 +735,7 @@ public static class Builder
private String transportChannelExecutorThreadNameFormat = "Cloud-Spanner-TransportChannel-%d";
private int prefetchChunks = DEFAULT_PREFETCH_CHUNKS;
+ private DecodeMode decodeMode = DEFAULT_DECODE_MODE;
private SessionPoolOptions sessionPoolOptions;
private String databaseRole;
private ImmutableMap sessionLabels;
@@ -797,6 +803,7 @@ protected Builder() {
options.transportChannelExecutorThreadNameFormat;
this.sessionPoolOptions = options.sessionPoolOptions;
this.prefetchChunks = options.prefetchChunks;
+ this.decodeMode = options.decodeMode;
this.databaseRole = options.databaseRole;
this.sessionLabels = options.sessionLabels;
this.spannerStubSettingsBuilder = options.spannerStubSettings.toBuilder();
@@ -1224,6 +1231,15 @@ public Builder setPrefetchChunks(int prefetchChunks) {
return this;
}
+ /**
+ * Specifies how values that are returned from a query should be decoded and converted from
+ * protobuf values into plain Java objects.
+ */
+ public Builder setDecodeMode(DecodeMode decodeMode) {
+ this.decodeMode = decodeMode;
+ return this;
+ }
+
@Override
public Builder setHost(String host) {
super.setHost(host);
@@ -1568,6 +1584,10 @@ public int getPrefetchChunks() {
return prefetchChunks;
}
+ public DecodeMode getDecodeMode() {
+ return decodeMode;
+ }
+
public static GrpcTransportOptions getDefaultGrpcTransportOptions() {
return GrpcTransportOptions.newBuilder().build();
}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Value.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Value.java
index a845eb118bf..3f0155e4a5e 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Value.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Value.java
@@ -1518,6 +1518,10 @@ void valueToString(StringBuilder b) {
@Override
boolean valueEquals(Value v) {
+ // NaN == NaN always returns false, so we need a custom check.
+ if (Double.isNaN(this.value)) {
+ return Double.isNaN(((Float64Impl) v).value);
+ }
return ((Float64Impl) v).value == value;
}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ChecksumResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ChecksumResultSet.java
index dc373cf03bd..c642d7e505a 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ChecksumResultSet.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ChecksumResultSet.java
@@ -16,28 +16,28 @@
package com.google.cloud.spanner.connection;
-import com.google.cloud.ByteArray;
-import com.google.cloud.Date;
-import com.google.cloud.Timestamp;
import com.google.cloud.spanner.AbortedException;
+import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.Options.QueryOption;
+import com.google.cloud.spanner.ProtobufResultSet;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.SpannerExceptionFactory;
-import com.google.cloud.spanner.Struct;
+import com.google.cloud.spanner.Type;
import com.google.cloud.spanner.Type.Code;
+import com.google.cloud.spanner.Type.StructField;
import com.google.cloud.spanner.connection.AbstractStatementParser.ParsedStatement;
import com.google.cloud.spanner.connection.ReadWriteTransaction.RetriableStatement;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.common.hash.Funnel;
import com.google.common.hash.HashCode;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hasher;
-import com.google.common.hash.Hashing;
-import com.google.common.hash.PrimitiveSink;
-import java.math.BigDecimal;
-import java.util.Objects;
+import com.google.protobuf.Value;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharsetEncoder;
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;
@@ -71,11 +71,11 @@ class ChecksumResultSet extends ReplaceableForwardingResultSet implements Retria
private final ParsedStatement statement;
private final AnalyzeMode analyzeMode;
private final QueryOption[] options;
- private final ChecksumResultSet.ChecksumCalculator checksumCalculator = new ChecksumCalculator();
+ private final ChecksumCalculator checksumCalculator = new ChecksumCalculator();
ChecksumResultSet(
ReadWriteTransaction transaction,
- ResultSet delegate,
+ ProtobufResultSet delegate,
ParsedStatement statement,
AnalyzeMode analyzeMode,
QueryOption... options) {
@@ -91,6 +91,13 @@ class ChecksumResultSet extends ReplaceableForwardingResultSet implements Retria
this.options = options;
}
+ @Override
+ public Value getProtobufValue(int columnIndex) {
+ // We can safely cast to ProtobufResultSet here, as the constructor of this class only accepts
+ // instances of ProtobufResultSet.
+ return ((ProtobufResultSet) getDelegate()).getProtobufValue(columnIndex);
+ }
+
/** Simple {@link Callable} for calling {@link ResultSet#next()} */
private final class NextCallable implements Callable {
@Override
@@ -102,7 +109,7 @@ public Boolean call() {
boolean res = ChecksumResultSet.super.next();
// Only update the checksum if there was another row to be consumed.
if (res) {
- checksumCalculator.calculateNextChecksum(getCurrentRowAsStruct());
+ checksumCalculator.calculateNextChecksum(ChecksumResultSet.this);
}
numberOfNextCalls.incrementAndGet();
return res;
@@ -118,8 +125,9 @@ public boolean next() {
}
@VisibleForTesting
- HashCode getChecksum() {
- // HashCode is immutable and can be safely returned.
+ byte[] getChecksum() {
+ // Getting the checksum from the checksumCalculator will create a clone of the current digest
+ // and return the checksum from the clone, so it is safe to return this value.
return checksumCalculator.getChecksum();
}
@@ -132,8 +140,8 @@ HashCode getChecksum() {
@Override
public void retry(AbortedException aborted) throws AbortedException {
// Execute the same query and consume the result set to the same point as the original.
- ChecksumResultSet.ChecksumCalculator newChecksumCalculator = new ChecksumCalculator();
- ResultSet resultSet = null;
+ ChecksumCalculator newChecksumCalculator = new ChecksumCalculator();
+ ProtobufResultSet resultSet = null;
long counter = 0L;
try {
transaction
@@ -150,7 +158,7 @@ public void retry(AbortedException aborted) throws AbortedException {
statement, StatementExecutionStep.RETRY_NEXT_ON_RESULT_SET, transaction);
next = resultSet.next();
if (next) {
- newChecksumCalculator.calculateNextChecksum(resultSet.getCurrentRowAsStruct());
+ newChecksumCalculator.calculateNextChecksum(resultSet);
}
counter++;
}
@@ -168,9 +176,9 @@ public void retry(AbortedException aborted) throws AbortedException {
throw e;
}
// Check that we have the same number of rows and the same checksum.
- HashCode newChecksum = newChecksumCalculator.getChecksum();
- HashCode currentChecksum = checksumCalculator.getChecksum();
- if (counter == numberOfNextCalls.get() && Objects.equals(newChecksum, currentChecksum)) {
+ byte[] newChecksum = newChecksumCalculator.getChecksum();
+ byte[] currentChecksum = checksumCalculator.getChecksum();
+ if (counter == numberOfNextCalls.get() && Arrays.equals(newChecksum, currentChecksum)) {
// Checksum is ok, we only need to replace the delegate result set if it's still open.
if (isClosed()) {
resultSet.close();
@@ -184,222 +192,165 @@ public void retry(AbortedException aborted) throws AbortedException {
}
}
- /** Calculates and keeps the current checksum of a {@link ChecksumResultSet} */
+ /**
+ * Calculates a running checksum for all the data that has been seen sofar in this result set. The
+ * calculation is performed on the protobuf values that were returned by Cloud Spanner, which
+ * means that no decoding of the results is needed (or allowed!) before calculating the checksum.
+ * This is more efficient, both in terms of CPU usage and memory consumption, especially if the
+ * consumer of the result set does not read all values, or is only reading the underlying protobuf
+ * values.
+ */
private static final class ChecksumCalculator {
- private static final HashFunction SHA256_FUNCTION = Hashing.sha256();
- private HashCode currentChecksum;
+ // Use a buffer of max 1Mb to hash string data. This means that strings of up to 1Mb in size
+ // will be hashed in one go, while strings larger than 1Mb will be chunked into pieces of at
+ // most 1Mb and then fed into the digest. The digest internally creates a copy of the string
+ // that is being hashed, so chunking large strings prevents them from being loaded into memory
+ // twice.
+ private static final int MAX_BUFFER_SIZE = 1 << 20;
- private void calculateNextChecksum(Struct row) {
- Hasher hasher = SHA256_FUNCTION.newHasher();
- if (currentChecksum != null) {
- hasher.putBytes(currentChecksum.asBytes());
+ private boolean firstRow = true;
+ private final MessageDigest digest;
+ private ByteBuffer buffer;
+ private ByteBuffer float64Buffer;
+
+ ChecksumCalculator() {
+ try {
+ // This is safe, as all Java implementations are required to have MD5 implemented.
+ // See https://docs.oracle.com/javase/8/docs/api/java/security/MessageDigest.html
+ // MD5 requires less CPU power than SHA-256, and still offers a low enough collision
+ // probability for the use case at hand here.
+ digest = MessageDigest.getInstance("MD5");
+ } catch (Throwable t) {
+ throw SpannerExceptionFactory.asSpannerException(t);
}
- hasher.putObject(row, StructFunnel.INSTANCE);
- currentChecksum = hasher.hash();
}
- private HashCode getChecksum() {
- return currentChecksum;
+ private byte[] getChecksum() {
+ try {
+ // This is safe, as the MD5 MessageDigest is known to be cloneable.
+ MessageDigest clone = (MessageDigest) digest.clone();
+ return clone.digest();
+ } catch (CloneNotSupportedException e) {
+ throw SpannerExceptionFactory.asSpannerException(e);
+ }
}
- }
- /**
- * A {@link Funnel} implementation for calculating a {@link HashCode} for each row in a {@link
- * ResultSet}.
- */
- private enum StructFunnel implements Funnel {
- INSTANCE;
- private static final String NULL = "null";
-
- @Override
- public void funnel(Struct row, PrimitiveSink into) {
- for (int i = 0; i < row.getColumnCount(); i++) {
- if (row.isNull(i)) {
- funnelValue(Code.STRING, null, into);
+ private void calculateNextChecksum(ProtobufResultSet resultSet) {
+ if (firstRow) {
+ for (StructField field : resultSet.getType().getStructFields()) {
+ digest.update(field.getType().toString().getBytes(StandardCharsets.UTF_8));
+ }
+ }
+ for (int col = 0; col < resultSet.getColumnCount(); col++) {
+ Type type = resultSet.getColumnType(col);
+ if (resultSet.canGetProtobufValue(col)) {
+ Value value = resultSet.getProtobufValue(col);
+ digest.update((byte) value.getKindCase().getNumber());
+ pushValue(type, value);
} else {
- Code type = row.getColumnType(i).getCode();
- switch (type) {
- case ARRAY:
- funnelArray(row.getColumnType(i).getArrayElementType().getCode(), row, i, into);
- break;
- case BOOL:
- funnelValue(type, row.getBoolean(i), into);
- break;
- case BYTES:
- case PROTO:
- funnelValue(type, row.getBytes(i), into);
- break;
- case DATE:
- funnelValue(type, row.getDate(i), into);
- break;
- case FLOAT64:
- funnelValue(type, row.getDouble(i), into);
- break;
- case NUMERIC:
- funnelValue(type, row.getBigDecimal(i), into);
- break;
- case PG_NUMERIC:
- funnelValue(type, row.getString(i), into);
- break;
- case INT64:
- case ENUM:
- funnelValue(type, row.getLong(i), into);
- break;
- case STRING:
- funnelValue(type, row.getString(i), into);
- break;
- case JSON:
- funnelValue(type, row.getJson(i), into);
- break;
- case PG_JSONB:
- funnelValue(type, row.getPgJsonb(i), into);
- break;
- case TIMESTAMP:
- funnelValue(type, row.getTimestamp(i), into);
- break;
-
- case STRUCT:
- default:
- throw new IllegalArgumentException("unsupported row type");
- }
+ // This will normally not happen, unless the user explicitly sets the decoding mode to
+ // DIRECT for a query in a read/write transaction. The default decoding mode in the
+ // Connection API is set to LAZY_PER_COL.
+ throw SpannerExceptionFactory.newSpannerException(
+ ErrorCode.FAILED_PRECONDITION,
+ "Failed to get the underlying protobuf value for the column "
+ + resultSet.getMetadata().getRowType().getFields(col).getName()
+ + ". "
+ + "Executing queries with DecodeMode#DIRECT is not supported in read/write transactions.");
}
}
+ firstRow = false;
}
- private void funnelArray(
- Code arrayElementType, Struct row, int columnIndex, PrimitiveSink into) {
- funnelValue(Code.STRING, "BeginArray", into);
- switch (arrayElementType) {
- case BOOL:
- into.putInt(row.getBooleanList(columnIndex).size());
- for (Boolean value : row.getBooleanList(columnIndex)) {
- funnelValue(Code.BOOL, value, into);
- }
+ private void pushValue(Type type, Value value) {
+ // Protobuf Value has a very limited set of possible types of values. All Cloud Spanner types
+ // are mapped to one of the protobuf values listed here, meaning that no changes are needed to
+ // this calculation when a new type is added to Cloud Spanner.
+ switch (value.getKindCase()) {
+ case NULL_VALUE:
+ // nothing needed, writing the KindCase is enough.
break;
- case BYTES:
- case PROTO:
- into.putInt(row.getBytesList(columnIndex).size());
- for (ByteArray value : row.getBytesList(columnIndex)) {
- funnelValue(Code.BYTES, value, into);
- }
+ case BOOL_VALUE:
+ digest.update(value.getBoolValue() ? (byte) 1 : 0);
break;
- case DATE:
- into.putInt(row.getDateList(columnIndex).size());
- for (Date value : row.getDateList(columnIndex)) {
- funnelValue(Code.DATE, value, into);
- }
+ case STRING_VALUE:
+ putString(value.getStringValue());
break;
- case FLOAT64:
- into.putInt(row.getDoubleList(columnIndex).size());
- for (Double value : row.getDoubleList(columnIndex)) {
- funnelValue(Code.FLOAT64, value, into);
+ case NUMBER_VALUE:
+ if (float64Buffer == null) {
+ // Create an 8-byte buffer that can be re-used for all float64 values in this result
+ // set.
+ float64Buffer = ByteBuffer.allocate(Double.BYTES);
+ } else {
+ float64Buffer.clear();
}
+ float64Buffer.putDouble(value.getNumberValue());
+ float64Buffer.flip();
+ digest.update(float64Buffer);
break;
- case NUMERIC:
- into.putInt(row.getBigDecimalList(columnIndex).size());
- for (BigDecimal value : row.getBigDecimalList(columnIndex)) {
- funnelValue(Code.NUMERIC, value, into);
+ case LIST_VALUE:
+ if (type.getCode() == Code.ARRAY) {
+ for (Value item : value.getListValue().getValuesList()) {
+ digest.update((byte) item.getKindCase().getNumber());
+ pushValue(type.getArrayElementType(), item);
+ }
+ } else {
+ // This should not be possible.
+ throw SpannerExceptionFactory.newSpannerException(
+ ErrorCode.FAILED_PRECONDITION,
+ "List values that are not an ARRAY are not supported");
}
break;
- case PG_NUMERIC:
- into.putInt(row.getStringList(columnIndex).size());
- for (String value : row.getStringList(columnIndex)) {
- funnelValue(Code.STRING, value, into);
+ case STRUCT_VALUE:
+ if (type.getCode() == Code.STRUCT) {
+ for (int col = 0; col < type.getStructFields().size(); col++) {
+ String name = type.getStructFields().get(col).getName();
+ putString(name);
+ Value item = value.getStructValue().getFieldsMap().get(name);
+ digest.update((byte) item.getKindCase().getNumber());
+ pushValue(type.getStructFields().get(col).getType(), item);
+ }
+ } else {
+ // This should not be possible.
+ throw SpannerExceptionFactory.newSpannerException(
+ ErrorCode.FAILED_PRECONDITION,
+ "Struct values without a struct type are not supported");
}
break;
- case INT64:
- case ENUM:
- into.putInt(row.getLongList(columnIndex).size());
- for (Long value : row.getLongList(columnIndex)) {
- funnelValue(Code.INT64, value, into);
- }
- break;
- case STRING:
- into.putInt(row.getStringList(columnIndex).size());
- for (String value : row.getStringList(columnIndex)) {
- funnelValue(Code.STRING, value, into);
- }
- break;
- case JSON:
- into.putInt(row.getJsonList(columnIndex).size());
- for (String value : row.getJsonList(columnIndex)) {
- funnelValue(Code.JSON, value, into);
- }
- break;
- case PG_JSONB:
- into.putInt(row.getPgJsonbList(columnIndex).size());
- for (String value : row.getPgJsonbList(columnIndex)) {
- funnelValue(Code.PG_JSONB, value, into);
- }
- break;
- case TIMESTAMP:
- into.putInt(row.getTimestampList(columnIndex).size());
- for (Timestamp value : row.getTimestampList(columnIndex)) {
- funnelValue(Code.TIMESTAMP, value, into);
- }
- break;
-
- case ARRAY:
- case STRUCT:
default:
- throw new IllegalArgumentException("unsupported array element type");
+ throw SpannerExceptionFactory.newSpannerException(
+ ErrorCode.UNIMPLEMENTED, "Unsupported protobuf value: " + value.getKindCase());
}
- funnelValue(Code.STRING, "EndArray", into);
}
- private void funnelValue(Code type, T value, PrimitiveSink into) {
- // Include the type name in case the type of a column has changed.
- into.putUnencodedChars(type.name());
- if (value == null) {
- if (type == Code.BYTES || type == Code.STRING) {
- // Put length -1 to distinguish from the string value 'null'.
- into.putInt(-1);
- }
- into.putUnencodedChars(NULL);
+ /** Hashes a string value in blocks of max MAX_BUFFER_SIZE. */
+ private void putString(String stringValue) {
+ int length = stringValue.length();
+ if (buffer == null || (buffer.capacity() < MAX_BUFFER_SIZE && buffer.capacity() < length)) {
+ // Create a ByteBuffer with a maximum buffer size.
+ // This buffer is re-used for all string values in the result set.
+ buffer = ByteBuffer.allocate(Math.min(MAX_BUFFER_SIZE, length));
} else {
- switch (type) {
- case BOOL:
- into.putBoolean((Boolean) value);
- break;
- case BYTES:
- case PROTO:
- ByteArray byteArray = (ByteArray) value;
- into.putInt(byteArray.length());
- into.putBytes(byteArray.toByteArray());
- break;
- case DATE:
- Date date = (Date) value;
- into.putInt(date.getYear()).putInt(date.getMonth()).putInt(date.getDayOfMonth());
- break;
- case FLOAT64:
- into.putDouble((Double) value);
- break;
- case NUMERIC:
- String stringRepresentation = value.toString();
- into.putInt(stringRepresentation.length());
- into.putUnencodedChars(stringRepresentation);
- break;
- case INT64:
- case ENUM:
- into.putLong((Long) value);
- break;
- case PG_NUMERIC:
- case STRING:
- case JSON:
- case PG_JSONB:
- String stringValue = (String) value;
- into.putInt(stringValue.length());
- into.putUnencodedChars(stringValue);
- break;
- case TIMESTAMP:
- Timestamp timestamp = (Timestamp) value;
- into.putLong(timestamp.getSeconds()).putInt(timestamp.getNanos());
- break;
- case ARRAY:
- case STRUCT:
- default:
- throw new IllegalArgumentException("invalid type for single value");
- }
+ buffer.clear();
+ }
+
+ // Wrap the string in a CharBuffer. This allows us to read from the string in sections without
+ // creating a new copy of (a part of) the string. E.g. using something like substring(..)
+ // would create a copy of that part of the string, using CharBuffer.wrap(..) does not.
+ CharBuffer source = CharBuffer.wrap(stringValue);
+ CharsetEncoder encoder = StandardCharsets.UTF_8.newEncoder();
+ // source.hasRemaining() returns false when all the characters in the string have been
+ // processed.
+ while (source.hasRemaining()) {
+ // Encode the string into bytes and write them into the byte buffer.
+ // At most MAX_BUFFER_SIZE bytes will be written.
+ encoder.encode(source, buffer, false);
+ // Flip the buffer so we can read from the start.
+ buffer.flip();
+ // Put the bytes from the buffer into the digest.
+ digest.update(buffer);
+ // Flip the buffer again, so we can repeat and write to the start of the buffer again.
+ buffer.flip();
}
}
}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/DirectExecuteResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/DirectExecuteResultSet.java
index dff915e2cce..1b15ec50822 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/DirectExecuteResultSet.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/DirectExecuteResultSet.java
@@ -19,6 +19,7 @@
import com.google.cloud.ByteArray;
import com.google.cloud.Date;
import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.ProtobufResultSet;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.Struct;
@@ -40,7 +41,7 @@
* to the actual query execution. It also ensures that any invalid query will throw an exception at
* execution instead of the first next() call by a client.
*/
-class DirectExecuteResultSet implements ResultSet {
+class DirectExecuteResultSet implements ProtobufResultSet {
private static final String MISSING_NEXT_CALL = "Must be preceded by a next() call";
private final ResultSet delegate;
private boolean nextCalledByClient = false;
@@ -79,6 +80,21 @@ public boolean next() throws SpannerException {
return initialNextResult;
}
+ @Override
+ public boolean canGetProtobufValue(int columnIndex) {
+ return nextCalledByClient
+ && delegate instanceof ProtobufResultSet
+ && ((ProtobufResultSet) delegate).canGetProtobufValue(columnIndex);
+ }
+
+ @Override
+ public com.google.protobuf.Value getProtobufValue(int columnIndex) {
+ Preconditions.checkState(nextCalledByClient, MISSING_NEXT_CALL);
+ Preconditions.checkState(
+ delegate instanceof ProtobufResultSet, "The result set does not support protobuf values");
+ return ((ProtobufResultSet) delegate).getProtobufValue(columnIndex);
+ }
+
@Override
public Struct getCurrentRowAsStruct() {
Preconditions.checkState(nextCalledByClient, MISSING_NEXT_CALL);
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReadWriteTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReadWriteTransaction.java
index e1fb87e4ade..6c4290c3b18 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReadWriteTransaction.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReadWriteTransaction.java
@@ -39,6 +39,7 @@
import com.google.cloud.spanner.Options.QueryOption;
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.Options.UpdateOption;
+import com.google.cloud.spanner.ProtobufResultSet;
import com.google.cloud.spanner.ReadContext;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.SpannerException;
@@ -427,7 +428,7 @@ public ApiFuture executeQueryAsync(
statement,
StatementExecutionStep.EXECUTE_STATEMENT,
ReadWriteTransaction.this);
- ResultSet delegate =
+ DirectExecuteResultSet delegate =
DirectExecuteResultSet.ofResultSet(
internalExecuteQuery(statement, analyzeMode, options));
return createAndAddRetryResultSet(
@@ -797,7 +798,7 @@ T runWithRetry(Callable callable) throws SpannerException {
* returns a retryable {@link ResultSet}.
*/
private ResultSet createAndAddRetryResultSet(
- ResultSet resultSet,
+ ProtobufResultSet resultSet,
ParsedStatement statement,
AnalyzeMode analyzeMode,
QueryOption... options) {
@@ -1091,7 +1092,7 @@ interface RetriableStatement {
/** Creates a {@link ChecksumResultSet} for this {@link ReadWriteTransaction}. */
@VisibleForTesting
ChecksumResultSet createChecksumResultSet(
- ResultSet delegate,
+ ProtobufResultSet delegate,
ParsedStatement statement,
AnalyzeMode analyzeMode,
QueryOption... options) {
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReplaceableForwardingResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReplaceableForwardingResultSet.java
index 7370551a46f..a8de14e5121 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReplaceableForwardingResultSet.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReplaceableForwardingResultSet.java
@@ -20,6 +20,7 @@
import com.google.cloud.Date;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.ErrorCode;
+import com.google.cloud.spanner.ProtobufResultSet;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.SpannerExceptionFactory;
@@ -42,7 +43,7 @@
* that is fetched using the new transaction. This is achieved by wrapping the returned result sets
* in a {@link ReplaceableForwardingResultSet} that replaces its delegate after a transaction retry.
*/
-class ReplaceableForwardingResultSet implements ResultSet {
+class ReplaceableForwardingResultSet implements ProtobufResultSet {
private ResultSet delegate;
private boolean closed;
@@ -60,6 +61,10 @@ void replaceDelegate(ResultSet delegate) {
this.delegate = delegate;
}
+ protected ResultSet getDelegate() {
+ return this.delegate;
+ }
+
private void checkClosed() {
if (closed) {
throw SpannerExceptionFactory.newSpannerException(
@@ -77,6 +82,21 @@ public boolean next() throws SpannerException {
return delegate.next();
}
+ @Override
+ public boolean canGetProtobufValue(int columnIndex) {
+ return !closed
+ && delegate instanceof ProtobufResultSet
+ && ((ProtobufResultSet) delegate).canGetProtobufValue(columnIndex);
+ }
+
+ @Override
+ public com.google.protobuf.Value getProtobufValue(int columnIndex) {
+ checkClosed();
+ Preconditions.checkState(
+ delegate instanceof ProtobufResultSet, "The result set does not support protobuf values");
+ return ((ProtobufResultSet) getDelegate()).getProtobufValue(columnIndex);
+ }
+
@Override
public Struct getCurrentRowAsStruct() {
checkClosed();
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/SpannerPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/SpannerPool.java
index 2a5a805c2c7..da8da78d92e 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/SpannerPool.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/SpannerPool.java
@@ -17,6 +17,7 @@
package com.google.cloud.spanner.connection;
import com.google.cloud.NoCredentials;
+import com.google.cloud.spanner.DecodeMode;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.SessionPoolOptions;
import com.google.cloud.spanner.Spanner;
@@ -342,6 +343,9 @@ Spanner createSpanner(SpannerPoolKey key, ConnectionOptions options) {
.setClientLibToken(MoreObjects.firstNonNull(key.userAgent, CONNECTION_API_CLIENT_LIB_TOKEN))
.setHost(key.host)
.setProjectId(key.projectId)
+ // Use lazy decoding, so we can use the protobuf values for calculating the checksum that is
+ // needed for read/write transactions.
+ .setDecodeMode(DecodeMode.LAZY_PER_COL)
.setDatabaseRole(options.getDatabaseRole())
.setCredentials(options.getCredentials());
builder.setSessionPoolOption(key.sessionPoolOptions);
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java
index c9aa5987663..0f4b2275717 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java
@@ -243,6 +243,7 @@ public class GapicSpannerRpc implements SpannerRpc {
private final Set readRetryableCodes;
private final SpannerStub partitionedDmlStub;
private final RetrySettings partitionedDmlRetrySettings;
+ private final InstanceAdminStubSettings instanceAdminStubSettings;
private final InstanceAdminStub instanceAdminStub;
private final DatabaseAdminStubSettings databaseAdminStubSettings;
private final DatabaseAdminStub databaseAdminStub;
@@ -435,16 +436,15 @@ public GapicSpannerRpc(final SpannerOptions options) {
.withCheckInterval(pdmlSettings.getStreamWatchdogCheckInterval()));
}
this.partitionedDmlStub = GrpcSpannerStub.create(pdmlSettings.build());
-
- this.instanceAdminStub =
- GrpcInstanceAdminStub.create(
- options
- .getInstanceAdminStubSettings()
- .toBuilder()
- .setTransportChannelProvider(channelProvider)
- .setCredentialsProvider(credentialsProvider)
- .setStreamWatchdogProvider(watchdogProvider)
- .build());
+ this.instanceAdminStubSettings =
+ options
+ .getInstanceAdminStubSettings()
+ .toBuilder()
+ .setTransportChannelProvider(channelProvider)
+ .setCredentialsProvider(credentialsProvider)
+ .setStreamWatchdogProvider(watchdogProvider)
+ .build();
+ this.instanceAdminStub = GrpcInstanceAdminStub.create(instanceAdminStubSettings);
this.databaseAdminStubSettings =
options
@@ -510,6 +510,7 @@ public UnaryCallable createUnaryCalla
this.executeQueryRetryableCodes = null;
this.partitionedDmlStub = null;
this.databaseAdminStubSettings = null;
+ this.instanceAdminStubSettings = null;
this.spannerWatchdog = null;
this.partitionedDmlRetrySettings = null;
}
@@ -2004,6 +2005,16 @@ public boolean isClosed() {
return rpcIsClosed;
}
+ @Override
+ public DatabaseAdminStubSettings getDatabaseAdminStubSettings() {
+ return databaseAdminStubSettings;
+ }
+
+ @Override
+ public InstanceAdminStubSettings getInstanceAdminStubSettings() {
+ return instanceAdminStubSettings;
+ }
+
private static final class GrpcStreamingCall implements StreamingCall {
private final ApiCallContext callContext;
private final StreamController controller;
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java
index 89659e4741e..7868f3ec099 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java
@@ -28,7 +28,9 @@
import com.google.cloud.spanner.Restore;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStub;
+import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStubSettings;
import com.google.cloud.spanner.admin.instance.v1.stub.InstanceAdminStub;
+import com.google.cloud.spanner.admin.instance.v1.stub.InstanceAdminStubSettings;
import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
import com.google.common.collect.ImmutableList;
import com.google.iam.v1.GetPolicyOptions;
@@ -500,4 +502,24 @@ TestIamPermissionsResponse testInstanceAdminIAMPermissions(
void shutdown();
boolean isClosed();
+
+ /**
+ * Getter method to obtain the auto-generated instance admin client stub settings.
+ *
+ * @return InstanceAdminStubSettings
+ */
+ @InternalApi
+ default InstanceAdminStubSettings getInstanceAdminStubSettings() {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ /**
+ * Getter method to obtain the auto-generated database admin client stub settings.
+ *
+ * @return DatabaseAdminStubSettings
+ */
+ @InternalApi
+ default DatabaseAdminStubSettings getDatabaseAdminStubSettings() {
+ throw new UnsupportedOperationException("Not implemented");
+ }
}
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java
index ccbf3c0b2b9..8dfbb986eb8 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java
@@ -46,7 +46,6 @@
import com.google.cloud.ByteArray;
import com.google.cloud.NoCredentials;
import com.google.cloud.Timestamp;
-import com.google.cloud.spanner.AbstractResultSet.GrpcStreamIterator;
import com.google.cloud.spanner.AsyncResultSet.CallbackResponse;
import com.google.cloud.spanner.AsyncTransactionManager.TransactionContextFuture;
import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime;
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java
index 914ce391f4a..cb73618d998 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java
@@ -56,13 +56,13 @@
import org.junit.runners.JUnit4;
import org.threeten.bp.Duration;
-/** Unit tests for {@link com.google.cloud.spanner.AbstractResultSet.GrpcResultSet}. */
+/** Unit tests for {@link GrpcResultSet}. */
@RunWith(JUnit4.class)
public class GrpcResultSetTest {
- private AbstractResultSet.GrpcResultSet resultSet;
+ private GrpcResultSet resultSet;
private SpannerRpc.ResultStreamConsumer consumer;
- private AbstractResultSet.GrpcStreamIterator stream;
+ private GrpcStreamIterator stream;
private final Duration streamWaitTimeout = Duration.ofNanos(1L);
private static class NoOpListener implements AbstractResultSet.Listener {
@@ -81,7 +81,7 @@ public void onDone(boolean withBeginTransaction) {}
@Before
public void setUp() {
- stream = new AbstractResultSet.GrpcStreamIterator(10);
+ stream = new GrpcStreamIterator(10);
stream.setCall(
new SpannerRpc.StreamingCall() {
@Override
@@ -97,11 +97,11 @@ public void request(int numMessages) {}
},
false);
consumer = stream.consumer();
- resultSet = new AbstractResultSet.GrpcResultSet(stream, new NoOpListener());
+ resultSet = new GrpcResultSet(stream, new NoOpListener());
}
- public AbstractResultSet.GrpcResultSet resultSetWithMode(QueryMode queryMode) {
- return new AbstractResultSet.GrpcResultSet(stream, new NoOpListener());
+ public GrpcResultSet resultSetWithMode(QueryMode queryMode) {
+ return new GrpcResultSet(stream, new NoOpListener());
}
@Test
@@ -609,7 +609,7 @@ public com.google.protobuf.Value apply(@Nullable Value input) {
private void verifySerialization(
Function protoFn, Value... values) {
- resultSet = new AbstractResultSet.GrpcResultSet(stream, new NoOpListener());
+ resultSet = new GrpcResultSet(stream, new NoOpListener());
PartialResultSet.Builder builder = PartialResultSet.newBuilder();
List types = new ArrayList<>();
for (Value value : values) {
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java
index 7bf9f51a4ea..f27aa405aaa 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java
@@ -19,7 +19,6 @@
import com.google.api.gax.grpc.testing.MockGrpcService;
import com.google.cloud.ByteArray;
import com.google.cloud.Date;
-import com.google.cloud.spanner.AbstractResultSet.GrpcStruct;
import com.google.cloud.spanner.AbstractResultSet.LazyByteArray;
import com.google.cloud.spanner.SessionPool.SessionPoolTransactionContext;
import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl;
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java
index af558d14dd4..a72c9872faf 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java
@@ -104,9 +104,9 @@ protected List getChildren() {
}
private static class TestCaseRunner {
- private AbstractResultSet.GrpcResultSet resultSet;
+ private GrpcResultSet resultSet;
private SpannerRpc.ResultStreamConsumer consumer;
- private AbstractResultSet.GrpcStreamIterator stream;
+ private GrpcStreamIterator stream;
private JSONObject testCase;
TestCaseRunner(JSONObject testCase) {
@@ -114,7 +114,7 @@ private static class TestCaseRunner {
}
private void run() throws Exception {
- stream = new AbstractResultSet.GrpcStreamIterator(10);
+ stream = new GrpcStreamIterator(10);
stream.setCall(
new SpannerRpc.StreamingCall() {
@Override
@@ -130,7 +130,7 @@ public void request(int numMessages) {}
},
false);
consumer = stream.consumer();
- resultSet = new AbstractResultSet.GrpcResultSet(stream, new NoOpListener());
+ resultSet = new GrpcResultSet(stream, new NoOpListener());
JSONArray chunks = testCase.getJSONArray("chunks");
JSONObject expectedResult = testCase.getJSONObject("result");
@@ -143,8 +143,7 @@ public void request(int numMessages) {}
assertResultSet(resultSet, expectedResult.getJSONArray("value"));
}
- private void assertResultSet(AbstractResultSet.GrpcResultSet actual, JSONArray expected)
- throws Exception {
+ private void assertResultSet(GrpcResultSet actual, JSONArray expected) throws Exception {
int i = 0;
while (actual.next()) {
Struct actualRow = actual.getCurrentRowAsStruct();
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResultSetsHelper.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResultSetsHelper.java
index 51cca1bc684..fc494c6f3ff 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResultSetsHelper.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResultSetsHelper.java
@@ -17,7 +17,6 @@
package com.google.cloud.spanner;
import com.google.cloud.spanner.AbstractResultSet.CloseableIterator;
-import com.google.cloud.spanner.AbstractResultSet.GrpcResultSet;
import com.google.cloud.spanner.AbstractResultSet.Listener;
import com.google.protobuf.ListValue;
import com.google.spanner.v1.PartialResultSet;
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResumableStreamIteratorTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResumableStreamIteratorTest.java
index 217e818d42c..d153696ab45 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResumableStreamIteratorTest.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResumableStreamIteratorTest.java
@@ -24,7 +24,6 @@
import static org.mockito.Mockito.when;
import com.google.api.client.util.BackOff;
-import com.google.cloud.spanner.AbstractResultSet.ResumableStreamIterator;
import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.Lists;
@@ -56,7 +55,7 @@
import org.junit.runners.JUnit4;
import org.mockito.Mockito;
-/** Unit tests for {@link AbstractResultSet.ResumableStreamIterator}. */
+/** Unit tests for {@link ResumableStreamIterator}. */
@RunWith(JUnit4.class)
public class ResumableStreamIteratorTest {
interface Starter {
@@ -131,7 +130,7 @@ public boolean isWithBeginTransaction() {
}
Starter starter = Mockito.mock(Starter.class);
- AbstractResultSet.ResumableStreamIterator resumableStreamIterator;
+ ResumableStreamIterator resumableStreamIterator;
@Before
public void setUp() {
@@ -143,7 +142,7 @@ public void setUp() {
private void initWithLimit(int maxBufferSize) {
resumableStreamIterator =
- new AbstractResultSet.ResumableStreamIterator(
+ new ResumableStreamIterator(
maxBufferSize,
"",
new OpenTelemetrySpan(mock(io.opentelemetry.api.trace.Span.class)),
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerImplTest.java
index 31a6cad4c8a..3cf13dc58d3 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerImplTest.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerImplTest.java
@@ -19,6 +19,7 @@
import static com.google.common.truth.Truth.assertThat;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.when;
@@ -29,9 +30,16 @@
import com.google.cloud.grpc.GrpcTransportOptions;
import com.google.cloud.spanner.SpannerException.DoNotConstructDirectly;
import com.google.cloud.spanner.SpannerImpl.ClosedException;
+import com.google.cloud.spanner.admin.database.v1.DatabaseAdminClient;
+import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStub;
+import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStubSettings;
+import com.google.cloud.spanner.admin.instance.v1.InstanceAdminClient;
+import com.google.cloud.spanner.admin.instance.v1.stub.InstanceAdminStub;
+import com.google.cloud.spanner.admin.instance.v1.stub.InstanceAdminStubSettings;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions;
import io.opentelemetry.api.OpenTelemetry;
+import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Collections;
@@ -55,6 +63,10 @@
public class SpannerImplTest {
@Mock private SpannerRpc rpc;
@Mock private SpannerOptions spannerOptions;
+ @Mock private DatabaseAdminStubSettings databaseAdminStubSettings;
+ @Mock private DatabaseAdminStub databaseAdminStub;
+ @Mock private InstanceAdminStubSettings instanceAdminStubSettings;
+ @Mock private InstanceAdminStub instanceAdminStub;
private SpannerImpl impl;
@Captor ArgumentCaptor