3737import com .google .cloud .spanner .Options .ListOption ;
3838import com .google .cloud .spanner .Options .QueryOption ;
3939import com .google .cloud .spanner .Options .ReadOption ;
40- import com .google .cloud .spanner .spi .v1 .GapicSpannerRpc ;
4140import com .google .cloud .spanner .spi .v1 .SpannerRpc ;
4241import com .google .cloud .spanner .spi .v1 .SpannerRpc .Paginated ;
4342import com .google .common .annotations .VisibleForTesting ;
@@ -191,8 +190,11 @@ private static long nextBackOffMillis(BackOff backoff) throws SpannerException {
191190 }
192191
193192 private static void backoffSleep (Context context , long backoffMillis ) throws SpannerException {
194- tracer .getCurrentSpan ().addAnnotation ("Backing off" ,
195- ImmutableMap .of ("Delay" , AttributeValue .longAttributeValue (backoffMillis )));
193+ tracer
194+ .getCurrentSpan ()
195+ .addAnnotation (
196+ "Backing off" ,
197+ ImmutableMap .of ("Delay" , AttributeValue .longAttributeValue (backoffMillis )));
196198 final CountDownLatch latch = new CountDownLatch (1 );
197199 final Context .CancellationListener listener =
198200 new Context .CancellationListener () {
@@ -234,7 +236,8 @@ static <T> T runWithRetries(Callable<T> callable) {
234236 while (true ) {
235237 attempt ++;
236238 try {
237- span .addAnnotation ("Starting operation" ,
239+ span .addAnnotation (
240+ "Starting operation" ,
238241 ImmutableMap .of ("Attempt" , AttributeValue .longAttributeValue (attempt )));
239242 T result = callable .call ();
240243 return result ;
@@ -389,7 +392,8 @@ Object value() {
389392 return ImmutableMap .copyOf (tmp );
390393 }
391394
392- private static <T extends Message > T unpack (Any response , Class <T > clazz ) throws SpannerException {
395+ private static <T extends Message > T unpack (Any response , Class <T > clazz )
396+ throws SpannerException {
393397 try {
394398 return response .unpack (clazz );
395399 } catch (InvalidProtocolBufferException e ) {
@@ -398,7 +402,7 @@ private static <T extends Message> T unpack(Any response, Class<T> clazz) throws
398402 }
399403 }
400404
401- private static abstract class PageFetcher <S , T > implements NextPageFetcher <S > {
405+ private abstract static class PageFetcher <S , T > implements NextPageFetcher <S > {
402406 private String nextPageToken ;
403407
404408 @ Override
@@ -794,12 +798,12 @@ public Timestamp writeAtLeastOnce(Iterable<Mutation> mutations) throws SpannerEx
794798 Mutation .toProto (mutations , mutationsProto );
795799 final CommitRequest request =
796800 CommitRequest .newBuilder ()
797- .setSession (name )
798- .addAllMutations (mutationsProto )
799- .setSingleUseTransaction (
800- TransactionOptions .newBuilder ()
801- .setReadWrite (TransactionOptions .ReadWrite .getDefaultInstance ()))
802- .build ();
801+ .setSession (name )
802+ .addAllMutations (mutationsProto )
803+ .setSingleUseTransaction (
804+ TransactionOptions .newBuilder ()
805+ .setReadWrite (TransactionOptions .ReadWrite .getDefaultInstance ()))
806+ .build ();
803807 Span span = tracer .spanBuilder (COMMIT ).startSpan ();
804808 try (Scope s = tracer .withSpan (span )) {
805809 CommitResponse response =
@@ -889,11 +893,11 @@ ByteString beginTransaction() {
889893 try (Scope s = tracer .withSpan (span )) {
890894 final BeginTransactionRequest request =
891895 BeginTransactionRequest .newBuilder ()
892- .setSession (name )
893- .setOptions (
894- TransactionOptions .newBuilder ()
895- .setReadWrite (TransactionOptions .ReadWrite .getDefaultInstance ()))
896- .build ();
896+ .setSession (name )
897+ .setOptions (
898+ TransactionOptions .newBuilder ()
899+ .setReadWrite (TransactionOptions .ReadWrite .getDefaultInstance ()))
900+ .build ();
897901 Transaction txn =
898902 runWithRetries (
899903 new Callable <Transaction >() {
@@ -955,8 +959,8 @@ private AbstractReadContext(SessionImpl session, SpannerRpc rpc, int defaultPref
955959 this (session , rpc , defaultPrefetchChunks , Tracing .getTracer ().getCurrentSpan ());
956960 }
957961
958- private AbstractReadContext (SessionImpl session , SpannerRpc rpc , int defaultPrefetchChunks ,
959- Span span ) {
962+ private AbstractReadContext (
963+ SessionImpl session , SpannerRpc rpc , int defaultPrefetchChunks , Span span ) {
960964 this .session = session ;
961965 this .rpc = rpc ;
962966 this .defaultPrefetchChunks = defaultPrefetchChunks ;
@@ -1056,15 +1060,17 @@ ResultSet executeQueryInternalWithOptions(
10561060 new ResumableStreamIterator (MAX_BUFFERED_CHUNKS , QUERY ) {
10571061 @ Override
10581062 CloseableIterator <PartialResultSet > startStream (@ Nullable ByteString resumeToken ) {
1059- return new CloseableServerStreamIterator <PartialResultSet >(rpc .executeQuery (
1060- resumeToken == null
1061- ? request
1062- : request .toBuilder ().setResumeToken (resumeToken ).build (),
1063- null ,
1064- session .options ));
1065-
1066- // Let resume fail for now. Gapic has its own resume, but in order not
1067- // to introduce too much change at a time, we decide to plumb up
1063+ return new CloseableServerStreamIterator <PartialResultSet >(
1064+ rpc .executeQuery (
1065+ resumeToken == null
1066+ ? request
1067+ : request .toBuilder ().setResumeToken (resumeToken ).build (),
1068+ null ,
1069+ session .options ));
1070+
1071+ // TODO(hzyi): make resume work
1072+ // Let resume fail for now. Gapic has its own resume, but in order not
1073+ // to introduce too many changes at a time, we decide to plumb up
10681074 // ServerStream first and then figure out how to make resume work
10691075 }
10701076 };
@@ -1165,15 +1171,17 @@ ResultSet readInternalWithOptions(
11651171 new ResumableStreamIterator (MAX_BUFFERED_CHUNKS , READ ) {
11661172 @ Override
11671173 CloseableIterator <PartialResultSet > startStream (@ Nullable ByteString resumeToken ) {
1168- return new CloseableServerStreamIterator <PartialResultSet >(rpc .read (
1169- resumeToken == null
1170- ? request
1171- : request .toBuilder ().setResumeToken (resumeToken ).build (),
1172- null ,
1173- session .options ));
1174-
1175- // Let resume fail for now. Gapic has its own resume, but in order not
1176- // to introduce too much change at a time, we decide to plumb up
1174+ return new CloseableServerStreamIterator <PartialResultSet >(
1175+ rpc .read (
1176+ resumeToken == null
1177+ ? request
1178+ : request .toBuilder ().setResumeToken (resumeToken ).build (),
1179+ null ,
1180+ session .options ));
1181+
1182+ // TODO(hzyi): make resume work
1183+ // Let resume fail for now. Gapic has its own resume, but in order not
1184+ // to introduce too many changes at a time, we decide to plumb up
11771185 // ServerStream first and then figure out how to make resume work
11781186 }
11791187 };
@@ -1226,8 +1234,8 @@ void backoffSleep(Context context, long backoffMillis) {
12261234 this .span = Tracing .getTracer ().getCurrentSpan ();
12271235 ByteString transactionId = session .readyTransactionId ;
12281236 session .readyTransactionId = null ;
1229- this .txn = new TransactionContextImpl ( session , transactionId , rpc , defaultPrefetchChunks ,
1230- span );
1237+ this .txn =
1238+ new TransactionContextImpl ( session , transactionId , rpc , defaultPrefetchChunks , span );
12311239 }
12321240
12331241 TransactionRunnerImpl (SessionImpl session , SpannerRpc rpc , int defaultPrefetchChunks ) {
@@ -1258,7 +1266,8 @@ private <T> T runInternal(TransactionCallable<T> callable) {
12581266 attempt ++;
12591267 // TODO(user): When using streaming reads, consider using the first read to begin
12601268 // the txn.
1261- span .addAnnotation ("Starting Transaction Attempt" ,
1269+ span .addAnnotation (
1270+ "Starting Transaction Attempt" ,
12621271 ImmutableMap .of ("Attempt" , AttributeValue .longAttributeValue (attempt )));
12631272 txn .ensureTxn ();
12641273
@@ -1270,7 +1279,8 @@ private <T> T runInternal(TransactionCallable<T> callable) {
12701279 } catch (Exception e ) {
12711280 txnLogger .log (Level .FINE , "User-provided TransactionCallable raised exception" , e );
12721281 if (txn .isAborted ()) {
1273- span .addAnnotation ("Transaction Attempt Aborted in user operation. Retrying" ,
1282+ span .addAnnotation (
1283+ "Transaction Attempt Aborted in user operation. Retrying" ,
12741284 ImmutableMap .of ("Attempt" , AttributeValue .longAttributeValue (attempt )));
12751285 shouldRollback = false ;
12761286 backoff (context , backoff );
@@ -1282,10 +1292,12 @@ private <T> T runInternal(TransactionCallable<T> callable) {
12821292 } else {
12831293 toThrow = newSpannerException (ErrorCode .UNKNOWN , e .getMessage (), e );
12841294 }
1285- span .addAnnotation ("Transaction Attempt Failed in user operation" ,
1295+ span .addAnnotation (
1296+ "Transaction Attempt Failed in user operation" ,
12861297 ImmutableMap .<String , AttributeValue >builder ()
1287- .putAll (TraceUtil .getExceptionAnnotations (toThrow ))
1288- .put ("Attempt" , AttributeValue .longAttributeValue (attempt )).build ());
1298+ .putAll (TraceUtil .getExceptionAnnotations (toThrow ))
1299+ .put ("Attempt" , AttributeValue .longAttributeValue (attempt ))
1300+ .build ());
12891301 throw toThrow ;
12901302 } finally {
12911303 if (shouldRollback ) {
@@ -1295,19 +1307,23 @@ private <T> T runInternal(TransactionCallable<T> callable) {
12951307
12961308 try {
12971309 txn .commit ();
1298- span .addAnnotation ("Transaction Attempt Succeeded" ,
1310+ span .addAnnotation (
1311+ "Transaction Attempt Succeeded" ,
12991312 ImmutableMap .of ("Attempt" , AttributeValue .longAttributeValue (attempt )));
13001313 return result ;
13011314 } catch (AbortedException e ) {
13021315 txnLogger .log (Level .FINE , "Commit aborted" , e );
1303- span .addAnnotation ("Transaction Attempt Aborted in Commit. Retrying" ,
1316+ span .addAnnotation (
1317+ "Transaction Attempt Aborted in Commit. Retrying" ,
13041318 ImmutableMap .of ("Attempt" , AttributeValue .longAttributeValue (attempt )));
13051319 backoff (context , backoff );
13061320 } catch (SpannerException e ) {
1307- span .addAnnotation ("Transaction Attempt Failed in Commit" ,
1321+ span .addAnnotation (
1322+ "Transaction Attempt Failed in Commit" ,
13081323 ImmutableMap .<String , AttributeValue >builder ()
1309- .putAll (TraceUtil .getExceptionAnnotations (e ))
1310- .put ("Attempt" , AttributeValue .longAttributeValue (attempt )).build ());
1324+ .putAll (TraceUtil .getExceptionAnnotations (e ))
1325+ .put ("Attempt" , AttributeValue .longAttributeValue (attempt ))
1326+ .build ());
13111327 throw e ;
13121328 }
13131329 }
@@ -1326,8 +1342,8 @@ public void invalidate() {
13261342 private void backoff (Context context , BackOff backoff ) {
13271343 long delay = txn .getRetryDelayInMillis (backoff );
13281344 txn = new TransactionContextImpl (session , null , txn .rpc , txn .defaultPrefetchChunks , span );
1329- span .addAnnotation ("Backing off" ,
1330- ImmutableMap .of ("Delay" , AttributeValue .longAttributeValue (delay )));
1345+ span .addAnnotation (
1346+ "Backing off" , ImmutableMap .of ("Delay" , AttributeValue .longAttributeValue (delay )));
13311347 sleeper .backoffSleep (context , delay );
13321348 }
13331349 }
@@ -1362,8 +1378,10 @@ void ensureTxn() {
13621378 span .addAnnotation ("Creating Transaction" );
13631379 try {
13641380 transactionId = session .beginTransaction ();
1365- span .addAnnotation ("Transaction Creation Done" , ImmutableMap .of ("Id" ,
1366- AttributeValue .stringAttributeValue (transactionId .toStringUtf8 ())));
1381+ span .addAnnotation (
1382+ "Transaction Creation Done" ,
1383+ ImmutableMap .of (
1384+ "Id" , AttributeValue .stringAttributeValue (transactionId .toStringUtf8 ())));
13671385 txnLogger .log (
13681386 Level .FINER ,
13691387 "Started transaction {0}" ,
@@ -1373,9 +1391,10 @@ void ensureTxn() {
13731391 throw e ;
13741392 }
13751393 } else {
1376- span .addAnnotation ("Transaction Initialized" ,
1377- ImmutableMap .of ("Id" , AttributeValue .stringAttributeValue (
1378- transactionId .toStringUtf8 ())));
1394+ span .addAnnotation (
1395+ "Transaction Initialized" ,
1396+ ImmutableMap .of (
1397+ "Id" , AttributeValue .stringAttributeValue (transactionId .toStringUtf8 ())));
13791398 txnLogger .log (
13801399 Level .FINER ,
13811400 "Using prepared transaction {0}" ,
@@ -1677,9 +1696,9 @@ void initTransaction() {
16771696 bound .applyToBuilder (options .getReadOnlyBuilder ()).setReturnReadTimestamp (true );
16781697 final BeginTransactionRequest request =
16791698 BeginTransactionRequest .newBuilder ()
1680- .setSession (session .getName ())
1681- .setOptions (options )
1682- .build ();
1699+ .setSession (session .getName ())
1700+ .setOptions (options )
1701+ .build ();
16831702 Transaction transaction =
16841703 runWithRetries (
16851704 new Callable <Transaction >() {
@@ -1703,8 +1722,8 @@ public Transaction call() throws Exception {
17031722 ErrorCode .INTERNAL , "Bad value in transaction.read_timestamp metadata field" , e );
17041723 }
17051724 transactionId = transaction .getId ();
1706- span .addAnnotation ("Transaction Creation Done" ,
1707- TraceUtil .getTransactionAnnotations (transaction ));
1725+ span .addAnnotation (
1726+ "Transaction Creation Done" , TraceUtil .getTransactionAnnotations (transaction ));
17081727 } catch (SpannerException e ) {
17091728 span .addAnnotation ("Transaction Creation Failed" , TraceUtil .getExceptionAnnotations (e ));
17101729 throw e ;
@@ -1915,8 +1934,8 @@ private static class GrpcStruct extends Struct implements Serializable {
19151934 protected final List <Object > rowData ;
19161935
19171936 /**
1918- * Builds an immutable version of this struct using {@link Struct#newBuilder()} which is used
1919- * as a serialization proxy.
1937+ * Builds an immutable version of this struct using {@link Struct#newBuilder()} which is used as
1938+ * a serialization proxy.
19201939 */
19211940 private Object writeReplace () {
19221941 Builder builder = Struct .newBuilder ();
@@ -1972,7 +1991,10 @@ private Object writeReplace() {
19721991 builder .set (fieldName ).toDateArray ((Iterable <Date >) value );
19731992 break ;
19741993 case STRUCT :
1975- builder .add (fieldName , fieldType .getArrayElementType ().getStructFields (), (Iterable <Struct >) value );
1994+ builder .add (
1995+ fieldName ,
1996+ fieldType .getArrayElementType ().getStructFields (),
1997+ (Iterable <Struct >) value );
19761998 break ;
19771999 default :
19782000 throw new AssertionError (
@@ -1983,7 +2005,6 @@ private Object writeReplace() {
19832005 default :
19842006 throw new AssertionError ("Unhandled type code: " + fieldType .getCode ());
19852007 }
1986-
19872008 }
19882009 return builder .build ();
19892010 }
@@ -2294,8 +2315,7 @@ public CloseableServerStreamIterator(ServerStream<T> stream) {
22942315 public boolean hasNext () {
22952316 try {
22962317 return iterator .hasNext ();
2297- }
2298- catch (Exception e ) {
2318+ } catch (Exception e ) {
22992319 throw SpannerExceptionFactory .newSpannerException (e );
23002320 }
23012321 }
@@ -2304,18 +2324,21 @@ public boolean hasNext() {
23042324 public T next () {
23052325 try {
23062326 return iterator .next ();
2307- }
2308- catch (Exception e ) {
2327+ } catch (Exception e ) {
23092328 throw SpannerExceptionFactory .newSpannerException (e );
23102329 }
23112330 }
23122331
2332+ @ Override
2333+ public void remove () {
2334+ throw UnsupportedOperationException ("Not supported: remove" );
2335+ }
2336+
23132337 @ Override
23142338 public void close (@ Nullable String message ) {
23152339 try {
23162340 stream .cancel ();
2317- }
2318- catch (Exception e ) {
2341+ } catch (Exception e ) {
23192342 throw SpannerExceptionFactory .newSpannerException (e );
23202343 }
23212344 }
@@ -2458,8 +2481,10 @@ protected PartialResultSet computeNext() {
24582481 while (true ) {
24592482 // Eagerly start stream before consuming any buffered items.
24602483 if (stream == null ) {
2461- span .addAnnotation ("Starting/Resuming stream" ,
2462- ImmutableMap .of ("ResumeToken" ,
2484+ span .addAnnotation (
2485+ "Starting/Resuming stream" ,
2486+ ImmutableMap .of (
2487+ "ResumeToken" ,
24632488 AttributeValue .stringAttributeValue (
24642489 resumeToken == null ? "null" : resumeToken .toStringUtf8 ())));
24652490 stream = checkNotNull (startStream (resumeToken ));
@@ -2498,8 +2523,8 @@ protected PartialResultSet computeNext() {
24982523 }
24992524 } catch (SpannerException e ) {
25002525 if (safeToRetry && e .isRetryable ()) {
2501- span .addAnnotation ("Stream broken. Safe to retry" ,
2502- TraceUtil .getExceptionAnnotations (e ));
2526+ span .addAnnotation (
2527+ "Stream broken. Safe to retry" , TraceUtil .getExceptionAnnotations (e ));
25032528 logger .log (Level .FINE , "Retryable exception, will sleep and retry" , e );
25042529 // Truncate any items in the buffer before the last retry token.
25052530 while (!buffer .isEmpty () && buffer .getLast ().getResumeToken ().isEmpty ()) {
0 commit comments