Skip to content

Commit a6ca4c1

Browse files
committed
format and documentation
1 parent 067a190 commit a6ca4c1

File tree

3 files changed

+177
-152
lines changed

3 files changed

+177
-152
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java

Lines changed: 100 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import com.google.cloud.spanner.Options.ListOption;
3838
import com.google.cloud.spanner.Options.QueryOption;
3939
import com.google.cloud.spanner.Options.ReadOption;
40-
import com.google.cloud.spanner.spi.v1.GapicSpannerRpc;
4140
import com.google.cloud.spanner.spi.v1.SpannerRpc;
4241
import com.google.cloud.spanner.spi.v1.SpannerRpc.Paginated;
4342
import com.google.common.annotations.VisibleForTesting;
@@ -191,8 +190,11 @@ private static long nextBackOffMillis(BackOff backoff) throws SpannerException {
191190
}
192191

193192
private static void backoffSleep(Context context, long backoffMillis) throws SpannerException {
194-
tracer.getCurrentSpan().addAnnotation("Backing off",
195-
ImmutableMap.of("Delay", AttributeValue.longAttributeValue(backoffMillis)));
193+
tracer
194+
.getCurrentSpan()
195+
.addAnnotation(
196+
"Backing off",
197+
ImmutableMap.of("Delay", AttributeValue.longAttributeValue(backoffMillis)));
196198
final CountDownLatch latch = new CountDownLatch(1);
197199
final Context.CancellationListener listener =
198200
new Context.CancellationListener() {
@@ -234,7 +236,8 @@ static <T> T runWithRetries(Callable<T> callable) {
234236
while (true) {
235237
attempt++;
236238
try {
237-
span.addAnnotation("Starting operation",
239+
span.addAnnotation(
240+
"Starting operation",
238241
ImmutableMap.of("Attempt", AttributeValue.longAttributeValue(attempt)));
239242
T result = callable.call();
240243
return result;
@@ -389,7 +392,8 @@ Object value() {
389392
return ImmutableMap.copyOf(tmp);
390393
}
391394

392-
private static <T extends Message> T unpack(Any response, Class<T> clazz) throws SpannerException {
395+
private static <T extends Message> T unpack(Any response, Class<T> clazz)
396+
throws SpannerException {
393397
try {
394398
return response.unpack(clazz);
395399
} catch (InvalidProtocolBufferException e) {
@@ -398,7 +402,7 @@ private static <T extends Message> T unpack(Any response, Class<T> clazz) throws
398402
}
399403
}
400404

401-
private static abstract class PageFetcher<S, T> implements NextPageFetcher<S> {
405+
private abstract static class PageFetcher<S, T> implements NextPageFetcher<S> {
402406
private String nextPageToken;
403407

404408
@Override
@@ -794,12 +798,12 @@ public Timestamp writeAtLeastOnce(Iterable<Mutation> mutations) throws SpannerEx
794798
Mutation.toProto(mutations, mutationsProto);
795799
final CommitRequest request =
796800
CommitRequest.newBuilder()
797-
.setSession(name)
798-
.addAllMutations(mutationsProto)
799-
.setSingleUseTransaction(
800-
TransactionOptions.newBuilder()
801-
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()))
802-
.build();
801+
.setSession(name)
802+
.addAllMutations(mutationsProto)
803+
.setSingleUseTransaction(
804+
TransactionOptions.newBuilder()
805+
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()))
806+
.build();
803807
Span span = tracer.spanBuilder(COMMIT).startSpan();
804808
try (Scope s = tracer.withSpan(span)) {
805809
CommitResponse response =
@@ -889,11 +893,11 @@ ByteString beginTransaction() {
889893
try (Scope s = tracer.withSpan(span)) {
890894
final BeginTransactionRequest request =
891895
BeginTransactionRequest.newBuilder()
892-
.setSession(name)
893-
.setOptions(
894-
TransactionOptions.newBuilder()
895-
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()))
896-
.build();
896+
.setSession(name)
897+
.setOptions(
898+
TransactionOptions.newBuilder()
899+
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()))
900+
.build();
897901
Transaction txn =
898902
runWithRetries(
899903
new Callable<Transaction>() {
@@ -955,8 +959,8 @@ private AbstractReadContext(SessionImpl session, SpannerRpc rpc, int defaultPref
955959
this(session, rpc, defaultPrefetchChunks, Tracing.getTracer().getCurrentSpan());
956960
}
957961

958-
private AbstractReadContext(SessionImpl session, SpannerRpc rpc, int defaultPrefetchChunks,
959-
Span span) {
962+
private AbstractReadContext(
963+
SessionImpl session, SpannerRpc rpc, int defaultPrefetchChunks, Span span) {
960964
this.session = session;
961965
this.rpc = rpc;
962966
this.defaultPrefetchChunks = defaultPrefetchChunks;
@@ -1056,15 +1060,17 @@ ResultSet executeQueryInternalWithOptions(
10561060
new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, QUERY) {
10571061
@Override
10581062
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken) {
1059-
return new CloseableServerStreamIterator<PartialResultSet>(rpc.executeQuery(
1060-
resumeToken == null
1061-
? request
1062-
: request.toBuilder().setResumeToken(resumeToken).build(),
1063-
null,
1064-
session.options));
1065-
1066-
// Let resume fail for now. Gapic has its own resume, but in order not
1067-
// to introduce too much change at a time, we decide to plumb up
1063+
return new CloseableServerStreamIterator<PartialResultSet>(
1064+
rpc.executeQuery(
1065+
resumeToken == null
1066+
? request
1067+
: request.toBuilder().setResumeToken(resumeToken).build(),
1068+
null,
1069+
session.options));
1070+
1071+
// TODO(hzyi): make resume work
1072+
// Let resume fail for now. Gapic has its own resume, but in order not
1073+
// to introduce too many changes at a time, we decide to plumb up
10681074
// ServerStream first and then figure out how to make resume work
10691075
}
10701076
};
@@ -1165,15 +1171,17 @@ ResultSet readInternalWithOptions(
11651171
new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, READ) {
11661172
@Override
11671173
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken) {
1168-
return new CloseableServerStreamIterator<PartialResultSet>(rpc.read(
1169-
resumeToken == null
1170-
? request
1171-
: request.toBuilder().setResumeToken(resumeToken).build(),
1172-
null,
1173-
session.options));
1174-
1175-
// Let resume fail for now. Gapic has its own resume, but in order not
1176-
// to introduce too much change at a time, we decide to plumb up
1174+
return new CloseableServerStreamIterator<PartialResultSet>(
1175+
rpc.read(
1176+
resumeToken == null
1177+
? request
1178+
: request.toBuilder().setResumeToken(resumeToken).build(),
1179+
null,
1180+
session.options));
1181+
1182+
// TODO(hzyi): make resume work
1183+
// Let resume fail for now. Gapic has its own resume, but in order not
1184+
// to introduce too many changes at a time, we decide to plumb up
11771185
// ServerStream first and then figure out how to make resume work
11781186
}
11791187
};
@@ -1226,8 +1234,8 @@ void backoffSleep(Context context, long backoffMillis) {
12261234
this.span = Tracing.getTracer().getCurrentSpan();
12271235
ByteString transactionId = session.readyTransactionId;
12281236
session.readyTransactionId = null;
1229-
this.txn = new TransactionContextImpl(session, transactionId, rpc, defaultPrefetchChunks,
1230-
span);
1237+
this.txn =
1238+
new TransactionContextImpl(session, transactionId, rpc, defaultPrefetchChunks, span);
12311239
}
12321240

12331241
TransactionRunnerImpl(SessionImpl session, SpannerRpc rpc, int defaultPrefetchChunks) {
@@ -1258,7 +1266,8 @@ private <T> T runInternal(TransactionCallable<T> callable) {
12581266
attempt++;
12591267
// TODO(user): When using streaming reads, consider using the first read to begin
12601268
// the txn.
1261-
span.addAnnotation("Starting Transaction Attempt",
1269+
span.addAnnotation(
1270+
"Starting Transaction Attempt",
12621271
ImmutableMap.of("Attempt", AttributeValue.longAttributeValue(attempt)));
12631272
txn.ensureTxn();
12641273

@@ -1270,7 +1279,8 @@ private <T> T runInternal(TransactionCallable<T> callable) {
12701279
} catch (Exception e) {
12711280
txnLogger.log(Level.FINE, "User-provided TransactionCallable raised exception", e);
12721281
if (txn.isAborted()) {
1273-
span.addAnnotation("Transaction Attempt Aborted in user operation. Retrying",
1282+
span.addAnnotation(
1283+
"Transaction Attempt Aborted in user operation. Retrying",
12741284
ImmutableMap.of("Attempt", AttributeValue.longAttributeValue(attempt)));
12751285
shouldRollback = false;
12761286
backoff(context, backoff);
@@ -1282,10 +1292,12 @@ private <T> T runInternal(TransactionCallable<T> callable) {
12821292
} else {
12831293
toThrow = newSpannerException(ErrorCode.UNKNOWN, e.getMessage(), e);
12841294
}
1285-
span.addAnnotation("Transaction Attempt Failed in user operation",
1295+
span.addAnnotation(
1296+
"Transaction Attempt Failed in user operation",
12861297
ImmutableMap.<String, AttributeValue>builder()
1287-
.putAll(TraceUtil.getExceptionAnnotations(toThrow))
1288-
.put("Attempt", AttributeValue.longAttributeValue(attempt)).build());
1298+
.putAll(TraceUtil.getExceptionAnnotations(toThrow))
1299+
.put("Attempt", AttributeValue.longAttributeValue(attempt))
1300+
.build());
12891301
throw toThrow;
12901302
} finally {
12911303
if (shouldRollback) {
@@ -1295,19 +1307,23 @@ private <T> T runInternal(TransactionCallable<T> callable) {
12951307

12961308
try {
12971309
txn.commit();
1298-
span.addAnnotation("Transaction Attempt Succeeded",
1310+
span.addAnnotation(
1311+
"Transaction Attempt Succeeded",
12991312
ImmutableMap.of("Attempt", AttributeValue.longAttributeValue(attempt)));
13001313
return result;
13011314
} catch (AbortedException e) {
13021315
txnLogger.log(Level.FINE, "Commit aborted", e);
1303-
span.addAnnotation("Transaction Attempt Aborted in Commit. Retrying",
1316+
span.addAnnotation(
1317+
"Transaction Attempt Aborted in Commit. Retrying",
13041318
ImmutableMap.of("Attempt", AttributeValue.longAttributeValue(attempt)));
13051319
backoff(context, backoff);
13061320
} catch (SpannerException e) {
1307-
span.addAnnotation("Transaction Attempt Failed in Commit",
1321+
span.addAnnotation(
1322+
"Transaction Attempt Failed in Commit",
13081323
ImmutableMap.<String, AttributeValue>builder()
1309-
.putAll(TraceUtil.getExceptionAnnotations(e))
1310-
.put("Attempt", AttributeValue.longAttributeValue(attempt)).build());
1324+
.putAll(TraceUtil.getExceptionAnnotations(e))
1325+
.put("Attempt", AttributeValue.longAttributeValue(attempt))
1326+
.build());
13111327
throw e;
13121328
}
13131329
}
@@ -1326,8 +1342,8 @@ public void invalidate() {
13261342
private void backoff(Context context, BackOff backoff) {
13271343
long delay = txn.getRetryDelayInMillis(backoff);
13281344
txn = new TransactionContextImpl(session, null, txn.rpc, txn.defaultPrefetchChunks, span);
1329-
span.addAnnotation("Backing off",
1330-
ImmutableMap.of("Delay", AttributeValue.longAttributeValue(delay)));
1345+
span.addAnnotation(
1346+
"Backing off", ImmutableMap.of("Delay", AttributeValue.longAttributeValue(delay)));
13311347
sleeper.backoffSleep(context, delay);
13321348
}
13331349
}
@@ -1362,8 +1378,10 @@ void ensureTxn() {
13621378
span.addAnnotation("Creating Transaction");
13631379
try {
13641380
transactionId = session.beginTransaction();
1365-
span.addAnnotation("Transaction Creation Done", ImmutableMap.of("Id",
1366-
AttributeValue.stringAttributeValue(transactionId.toStringUtf8())));
1381+
span.addAnnotation(
1382+
"Transaction Creation Done",
1383+
ImmutableMap.of(
1384+
"Id", AttributeValue.stringAttributeValue(transactionId.toStringUtf8())));
13671385
txnLogger.log(
13681386
Level.FINER,
13691387
"Started transaction {0}",
@@ -1373,9 +1391,10 @@ void ensureTxn() {
13731391
throw e;
13741392
}
13751393
} else {
1376-
span.addAnnotation("Transaction Initialized",
1377-
ImmutableMap.of("Id", AttributeValue.stringAttributeValue(
1378-
transactionId.toStringUtf8())));
1394+
span.addAnnotation(
1395+
"Transaction Initialized",
1396+
ImmutableMap.of(
1397+
"Id", AttributeValue.stringAttributeValue(transactionId.toStringUtf8())));
13791398
txnLogger.log(
13801399
Level.FINER,
13811400
"Using prepared transaction {0}",
@@ -1677,9 +1696,9 @@ void initTransaction() {
16771696
bound.applyToBuilder(options.getReadOnlyBuilder()).setReturnReadTimestamp(true);
16781697
final BeginTransactionRequest request =
16791698
BeginTransactionRequest.newBuilder()
1680-
.setSession(session.getName())
1681-
.setOptions(options)
1682-
.build();
1699+
.setSession(session.getName())
1700+
.setOptions(options)
1701+
.build();
16831702
Transaction transaction =
16841703
runWithRetries(
16851704
new Callable<Transaction>() {
@@ -1703,8 +1722,8 @@ public Transaction call() throws Exception {
17031722
ErrorCode.INTERNAL, "Bad value in transaction.read_timestamp metadata field", e);
17041723
}
17051724
transactionId = transaction.getId();
1706-
span.addAnnotation("Transaction Creation Done",
1707-
TraceUtil.getTransactionAnnotations(transaction));
1725+
span.addAnnotation(
1726+
"Transaction Creation Done", TraceUtil.getTransactionAnnotations(transaction));
17081727
} catch (SpannerException e) {
17091728
span.addAnnotation("Transaction Creation Failed", TraceUtil.getExceptionAnnotations(e));
17101729
throw e;
@@ -1915,8 +1934,8 @@ private static class GrpcStruct extends Struct implements Serializable {
19151934
protected final List<Object> rowData;
19161935

19171936
/**
1918-
* Builds an immutable version of this struct using {@link Struct#newBuilder()} which is used
1919-
* as a serialization proxy.
1937+
* Builds an immutable version of this struct using {@link Struct#newBuilder()} which is used as
1938+
* a serialization proxy.
19201939
*/
19211940
private Object writeReplace() {
19221941
Builder builder = Struct.newBuilder();
@@ -1972,7 +1991,10 @@ private Object writeReplace() {
19721991
builder.set(fieldName).toDateArray((Iterable<Date>) value);
19731992
break;
19741993
case STRUCT:
1975-
builder.add(fieldName, fieldType.getArrayElementType().getStructFields(), (Iterable<Struct>) value);
1994+
builder.add(
1995+
fieldName,
1996+
fieldType.getArrayElementType().getStructFields(),
1997+
(Iterable<Struct>) value);
19761998
break;
19771999
default:
19782000
throw new AssertionError(
@@ -1983,7 +2005,6 @@ private Object writeReplace() {
19832005
default:
19842006
throw new AssertionError("Unhandled type code: " + fieldType.getCode());
19852007
}
1986-
19872008
}
19882009
return builder.build();
19892010
}
@@ -2294,8 +2315,7 @@ public CloseableServerStreamIterator(ServerStream<T> stream) {
22942315
public boolean hasNext() {
22952316
try {
22962317
return iterator.hasNext();
2297-
}
2298-
catch (Exception e) {
2318+
} catch (Exception e) {
22992319
throw SpannerExceptionFactory.newSpannerException(e);
23002320
}
23012321
}
@@ -2304,18 +2324,21 @@ public boolean hasNext() {
23042324
public T next() {
23052325
try {
23062326
return iterator.next();
2307-
}
2308-
catch (Exception e) {
2327+
} catch (Exception e) {
23092328
throw SpannerExceptionFactory.newSpannerException(e);
23102329
}
23112330
}
23122331

2332+
@Override
2333+
public void remove() {
2334+
throw UnsupportedOperationException("Not supported: remove");
2335+
}
2336+
23132337
@Override
23142338
public void close(@Nullable String message) {
23152339
try {
23162340
stream.cancel();
2317-
}
2318-
catch (Exception e) {
2341+
} catch (Exception e) {
23192342
throw SpannerExceptionFactory.newSpannerException(e);
23202343
}
23212344
}
@@ -2458,8 +2481,10 @@ protected PartialResultSet computeNext() {
24582481
while (true) {
24592482
// Eagerly start stream before consuming any buffered items.
24602483
if (stream == null) {
2461-
span.addAnnotation("Starting/Resuming stream",
2462-
ImmutableMap.of("ResumeToken",
2484+
span.addAnnotation(
2485+
"Starting/Resuming stream",
2486+
ImmutableMap.of(
2487+
"ResumeToken",
24632488
AttributeValue.stringAttributeValue(
24642489
resumeToken == null ? "null" : resumeToken.toStringUtf8())));
24652490
stream = checkNotNull(startStream(resumeToken));
@@ -2498,8 +2523,8 @@ protected PartialResultSet computeNext() {
24982523
}
24992524
} catch (SpannerException e) {
25002525
if (safeToRetry && e.isRetryable()) {
2501-
span.addAnnotation("Stream broken. Safe to retry",
2502-
TraceUtil.getExceptionAnnotations(e));
2526+
span.addAnnotation(
2527+
"Stream broken. Safe to retry", TraceUtil.getExceptionAnnotations(e));
25032528
logger.log(Level.FINE, "Retryable exception, will sleep and retry", e);
25042529
// Truncate any items in the buffer before the last retry token.
25052530
while (!buffer.isEmpty() && buffer.getLast().getResumeToken().isEmpty()) {

0 commit comments

Comments
 (0)