Skip to content

Commit c2ae349

Browse files
authored
Spanner Gapic Migration: fix updateDatabaseDdl (#3403)
* Change stub, databaseStub and instanceStub to spannerStub, databaseAdminStub and instanceAdminStub Return the original operation if updatabaseDdl fails with ALREADY_EXISTS
1 parent 174443b commit c2ae349

4 files changed

Lines changed: 71 additions & 62 deletions

File tree

google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -489,10 +489,6 @@ public OperationFuture<Void, UpdateDatabaseDdlMetadata> updateDatabaseDdl(
489489
throws SpannerException {
490490
final String dbName = getDatabaseName(instanceId, databaseId);
491491
final String opId = operationId != null ? operationId : randomOperationId();
492-
// TODO(hzyi)
493-
// Spanner checks the exception and if the error code is ALREADY_EXISTS
494-
// it creates a new Operation instead of throwing the exception. This
495-
// feature is not implemented in this PR but will come later
496492
OperationFuture<Empty, UpdateDatabaseDdlMetadata> rawOperationFuture =
497493
rpc.updateDatabaseDdl(dbName, statements, opId);
498494
return new OperationFutureImpl(

google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java

Lines changed: 54 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;
2020

21-
import com.google.common.base.Preconditions;
2221
import com.google.api.core.ApiFunction;
2322
import com.google.api.gax.core.CredentialsProvider;
2423
import com.google.api.gax.core.GaxProperties;
@@ -27,15 +26,15 @@
2726
import com.google.api.gax.grpc.GrpcCallContext;
2827
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
2928
import com.google.api.gax.longrunning.OperationFuture;
29+
import com.google.api.gax.rpc.AlreadyExistsException;
3030
import com.google.api.gax.rpc.ApiClientHeaderProvider;
31-
import com.google.api.gax.rpc.FixedTransportChannelProvider;
3231
import com.google.api.gax.rpc.HeaderProvider;
33-
import com.google.api.gax.rpc.ServerStream;
32+
import com.google.api.gax.rpc.OperationCallable;
33+
import com.google.api.gax.rpc.ResponseObserver;
3434
import com.google.api.gax.rpc.StatusCode;
35+
import com.google.api.gax.rpc.StreamController;
3536
import com.google.api.gax.rpc.TransportChannelProvider;
3637
import com.google.api.gax.rpc.UnaryCallSettings;
37-
import com.google.api.gax.rpc.ResponseObserver;
38-
import com.google.api.gax.rpc.StreamController;
3938
import com.google.api.pathtemplate.PathTemplate;
4039
import com.google.cloud.ServiceOptions;
4140
import com.google.cloud.grpc.GrpcTransportOptions;
@@ -53,6 +52,7 @@
5352
import com.google.cloud.spanner.v1.stub.SpannerStub;
5453
import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
5554
import com.google.common.base.MoreObjects;
55+
import com.google.common.base.Preconditions;
5656
import com.google.common.collect.ImmutableSet;
5757
import com.google.longrunning.GetOperationRequest;
5858
import com.google.longrunning.Operation;
@@ -108,12 +108,13 @@ public class GapicSpannerRpc implements SpannerRpc {
108108

109109
private static final PathTemplate PROJECT_NAME_TEMPLATE =
110110
PathTemplate.create("projects/{project}");
111+
private static final PathTemplate OPERATION_NAME_TEMPLATE =
112+
PathTemplate.create("{database=projects/*/instances/*/databases/*}/operations/{operation}");
111113
private static final int MAX_MESSAGE_SIZE = 100 * 1024 * 1024;
112-
113-
// TODO(hzyi): change the stub names to be more intuitive
114-
private final SpannerStub stub;
115-
private final InstanceAdminStub instanceStub;
116-
private final DatabaseAdminStub databaseStub;
114+
115+
private final SpannerStub spannerStub;
116+
private final InstanceAdminStub instanceAdminStub;
117+
private final DatabaseAdminStub databaseAdminStub;
117118
private final String projectId;
118119
private final String projectName;
119120
private final SpannerMetadataProvider metadataProvider;
@@ -126,9 +127,6 @@ public GapicSpannerRpc(SpannerOptions options) {
126127
this.projectId = options.getProjectId();
127128
this.projectName = PROJECT_NAME_TEMPLATE.instantiate("project", this.projectId);
128129

129-
// TODO(hzyi): inject userAgent to headerProvider so that it
130-
// can be picked up by ChannelProvider
131-
132130
// create a metadataProvider which combines both internal headers and
133131
// per-method-call extra headers for channelProvider to inject the headers
134132
// for rpc calls
@@ -177,7 +175,7 @@ public GapicSpannerRpc(SpannerOptions options) {
177175
try {
178176
// TODO: bump the version of gax and remove this try-catch block
179177
// applyToAllUnaryMethods does not throw exception in the latest version
180-
this.stub =
178+
this.spannerStub =
181179
GrpcSpannerStub.create(
182180
SpannerStubSettings.newBuilder()
183181
.setTransportChannelProvider(channelProvider)
@@ -192,7 +190,7 @@ public Void apply(UnaryCallSettings.Builder<?, ?> builder) {
192190
})
193191
.build());
194192

195-
this.instanceStub =
193+
this.instanceAdminStub =
196194
GrpcInstanceAdminStub.create(
197195
InstanceAdminStubSettings.newBuilder()
198196
.setTransportChannelProvider(channelProvider)
@@ -206,7 +204,7 @@ public Void apply(UnaryCallSettings.Builder<?, ?> builder) {
206204
}
207205
})
208206
.build());
209-
this.databaseStub =
207+
this.databaseAdminStub =
210208
GrpcDatabaseAdminStub.create(
211209
DatabaseAdminStubSettings.newBuilder()
212210
.setTransportChannelProvider(channelProvider)
@@ -237,7 +235,7 @@ public Paginated<InstanceConfig> listInstanceConfigs(int pageSize, @Nullable Str
237235

238236
GrpcCallContext context = newCallContext(null, projectName);
239237
ListInstanceConfigsResponse response =
240-
get(instanceStub.listInstanceConfigsCallable().futureCall(request, context));
238+
get(instanceAdminStub.listInstanceConfigsCallable().futureCall(request, context));
241239
return new Paginated<>(response.getInstanceConfigsList(), response.getNextPageToken());
242240
}
243241

@@ -247,7 +245,7 @@ public InstanceConfig getInstanceConfig(String instanceConfigName) throws Spanne
247245
GetInstanceConfigRequest.newBuilder().setName(instanceConfigName).build();
248246

249247
GrpcCallContext context = newCallContext(null, projectName);
250-
return get(instanceStub.getInstanceConfigCallable().futureCall(request, context));
248+
return get(instanceAdminStub.getInstanceConfigCallable().futureCall(request, context));
251249
}
252250

253251
@Override
@@ -265,7 +263,7 @@ public Paginated<Instance> listInstances(
265263

266264
GrpcCallContext context = newCallContext(null, projectName);
267265
ListInstancesResponse response =
268-
get(instanceStub.listInstancesCallable().futureCall(request, context));
266+
get(instanceAdminStub.listInstancesCallable().futureCall(request, context));
269267
return new Paginated<>(response.getInstancesList(), response.getNextPageToken());
270268
}
271269

@@ -279,7 +277,7 @@ public OperationFuture<Instance, CreateInstanceMetadata> createInstance(
279277
.setInstance(instance)
280278
.build();
281279
GrpcCallContext context = newCallContext(null, parent);
282-
return instanceStub.createInstanceOperationCallable().futureCall(request, context);
280+
return instanceAdminStub.createInstanceOperationCallable().futureCall(request, context);
283281
}
284282

285283
@Override
@@ -288,7 +286,7 @@ public OperationFuture<Instance, UpdateInstanceMetadata> updateInstance(
288286
UpdateInstanceRequest request =
289287
UpdateInstanceRequest.newBuilder().setInstance(instance).setFieldMask(fieldMask).build();
290288
GrpcCallContext context = newCallContext(null, instance.getName());
291-
return instanceStub.updateInstanceOperationCallable().futureCall(request, context);
289+
return instanceAdminStub.updateInstanceOperationCallable().futureCall(request, context);
292290
}
293291

294292
@Override
@@ -297,7 +295,7 @@ public Instance getInstance(String instanceName) throws SpannerException {
297295
GetInstanceRequest.newBuilder().setName(instanceName).build();
298296

299297
GrpcCallContext context = newCallContext(null, instanceName);
300-
return get(instanceStub.getInstanceCallable().futureCall(request, context));
298+
return get(instanceAdminStub.getInstanceCallable().futureCall(request, context));
301299
}
302300

303301
@Override
@@ -306,7 +304,7 @@ public void deleteInstance(String instanceName) throws SpannerException {
306304
DeleteInstanceRequest.newBuilder().setName(instanceName).build();
307305

308306
GrpcCallContext context = newCallContext(null, instanceName);
309-
get(instanceStub.deleteInstanceCallable().futureCall(request, context));
307+
get(instanceAdminStub.deleteInstanceCallable().futureCall(request, context));
310308
}
311309

312310
@Override
@@ -320,7 +318,7 @@ public Paginated<Database> listDatabases(
320318
ListDatabasesRequest request = requestBuilder.build();
321319

322320
GrpcCallContext context = newCallContext(null, instanceName);
323-
ListDatabasesResponse response = get(databaseStub.listDatabasesCallable()
321+
ListDatabasesResponse response = get(databaseAdminStub.listDatabasesCallable()
324322
.futureCall(request, context));
325323
return new Paginated<>(response.getDatabasesList(), response.getNextPageToken());
326324
}
@@ -335,7 +333,7 @@ public OperationFuture<Database, CreateDatabaseMetadata> createDatabase(
335333
.addAllExtraStatements(additionalStatements)
336334
.build();
337335
GrpcCallContext context = newCallContext(null, instanceName);
338-
return databaseStub.createDatabaseOperationCallable().futureCall(request, context);
336+
return databaseAdminStub.createDatabaseOperationCallable().futureCall(request, context);
339337
}
340338

341339
@Override
@@ -348,7 +346,21 @@ public OperationFuture<Empty, UpdateDatabaseDdlMetadata> updateDatabaseDdl(
348346
.setOperationId(MoreObjects.firstNonNull(updateId, ""))
349347
.build();
350348
GrpcCallContext context = newCallContext(null, databaseName);
351-
return databaseStub.updateDatabaseDdlOperationCallable().futureCall(request, context);
349+
OperationCallable<UpdateDatabaseDdlRequest, Empty, UpdateDatabaseDdlMetadata> callable = databaseAdminStub.updateDatabaseDdlOperationCallable();
350+
OperationFuture<Empty, UpdateDatabaseDdlMetadata> operationFuture = callable.futureCall(request, context);
351+
try {
352+
operationFuture.getInitialFuture().get();
353+
} catch (InterruptedException e) {
354+
throw SpannerExceptionFactory.newSpannerException(e);
355+
} catch (ExecutionException e) {
356+
Throwable t = e.getCause();
357+
if (t instanceof AlreadyExistsException) {
358+
String operationName =
359+
OPERATION_NAME_TEMPLATE.instantiate("database", databaseName, "operation", updateId);
360+
return callable.resumeFutureCall(operationName, context);
361+
}
362+
}
363+
return operationFuture;
352364
}
353365

354366
@Override
@@ -357,7 +369,7 @@ public void dropDatabase(String databaseName) throws SpannerException {
357369
DropDatabaseRequest.newBuilder().setDatabase(databaseName).build();
358370

359371
GrpcCallContext context = newCallContext(null, databaseName);
360-
get(databaseStub.dropDatabaseCallable().futureCall(request, context));
372+
get(databaseAdminStub.dropDatabaseCallable().futureCall(request, context));
361373
}
362374

363375
@Override
@@ -368,7 +380,7 @@ public Database getDatabase(String databaseName) throws SpannerException {
368380
.build();
369381

370382
GrpcCallContext context = newCallContext(null, databaseName);
371-
return get(databaseStub.getDatabaseCallable().futureCall(request, context));
383+
return get(databaseAdminStub.getDatabaseCallable().futureCall(request, context));
372384
}
373385

374386
@Override
@@ -377,15 +389,15 @@ public List<String> getDatabaseDdl(String databaseName) throws SpannerException
377389
GetDatabaseDdlRequest.newBuilder().setDatabase(databaseName).build();
378390

379391
GrpcCallContext context = newCallContext(null, databaseName);
380-
return get(databaseStub.getDatabaseDdlCallable().futureCall(request, context))
392+
return get(databaseAdminStub.getDatabaseDdlCallable().futureCall(request, context))
381393
.getStatementsList();
382394
}
383395

384396
@Override
385397
public Operation getOperation(String name) throws SpannerException {
386398
GetOperationRequest request = GetOperationRequest.newBuilder().setName(name).build();
387399
GrpcCallContext context = newCallContext(null, name);
388-
return get(databaseStub.getOperationsStub().getOperationCallable()
400+
return get(databaseAdminStub.getOperationsStub().getOperationCallable()
389401
.futureCall(request, context));
390402
}
391403

@@ -400,7 +412,7 @@ public Session createSession(String databaseName, @Nullable Map<String, String>
400412
}
401413
CreateSessionRequest request = requestBuilder.build();
402414
GrpcCallContext context = newCallContext(options, databaseName);
403-
return get(stub.createSessionCallable().futureCall(request, context));
415+
return get(spannerStub.createSessionCallable().futureCall(request, context));
404416
}
405417

406418
@Override
@@ -409,15 +421,15 @@ public void deleteSession(String sessionName, @Nullable Map<Option, ?> options)
409421
DeleteSessionRequest request =
410422
DeleteSessionRequest.newBuilder().setName(sessionName).build();
411423
GrpcCallContext context = newCallContext(options, sessionName);
412-
get(stub.deleteSessionCallable().futureCall(request, context));
424+
get(spannerStub.deleteSessionCallable().futureCall(request, context));
413425
}
414426

415427
@Override
416428
public StreamingCall read(
417429
ReadRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options) {
418430
GrpcCallContext context = newCallContext(options, request.getSession());
419431
SpannerResponseObserver responseObserver = new SpannerResponseObserver(consumer);
420-
stub.streamingReadCallable().call(request, responseObserver, context);
432+
spannerStub.streamingReadCallable().call(request, responseObserver, context);
421433
final StreamController controller = responseObserver.getController();
422434
return new StreamingCall() {
423435
@Override
@@ -439,7 +451,7 @@ public StreamingCall executeQuery(
439451
ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options) {
440452
GrpcCallContext context = newCallContext(options, request.getSession());
441453
SpannerResponseObserver responseObserver = new SpannerResponseObserver(consumer);
442-
stub.executeStreamingSqlCallable().call(request, responseObserver, context);
454+
spannerStub.executeStreamingSqlCallable().call(request, responseObserver, context);
443455
final StreamController controller = responseObserver.getController();
444456
return new StreamingCall() {
445457
@Override
@@ -460,35 +472,35 @@ public void cancel(String message) {
460472
public Transaction beginTransaction(
461473
BeginTransactionRequest request, @Nullable Map<Option, ?> options) throws SpannerException {
462474
GrpcCallContext context = newCallContext(options, request.getSession());
463-
return get(stub.beginTransactionCallable().futureCall(request, context));
475+
return get(spannerStub.beginTransactionCallable().futureCall(request, context));
464476
}
465477

466478
@Override
467479
public CommitResponse commit(CommitRequest commitRequest, @Nullable Map<Option, ?> options)
468480
throws SpannerException {
469481
GrpcCallContext context = newCallContext(options, commitRequest.getSession());
470-
return get(stub.commitCallable().futureCall(commitRequest, context));
482+
return get(spannerStub.commitCallable().futureCall(commitRequest, context));
471483
}
472484

473485
@Override
474486
public void rollback(RollbackRequest request, @Nullable Map<Option, ?> options)
475487
throws SpannerException {
476488
GrpcCallContext context = newCallContext(options, request.getSession());
477-
get(stub.rollbackCallable().futureCall(request, context));
489+
get(spannerStub.rollbackCallable().futureCall(request, context));
478490
}
479491

480492
@Override
481493
public PartitionResponse partitionQuery(
482494
PartitionQueryRequest request, @Nullable Map<Option, ?> options) throws SpannerException {
483495
GrpcCallContext context = newCallContext(options, request.getSession());
484-
return get(stub.partitionQueryCallable().futureCall(request, context));
496+
return get(spannerStub.partitionQueryCallable().futureCall(request, context));
485497
}
486498

487499
@Override
488500
public PartitionResponse partitionRead(
489501
PartitionReadRequest request, @Nullable Map<Option, ?> options) throws SpannerException {
490502
GrpcCallContext context = newCallContext(options, request.getSession());
491-
return get(stub.partitionReadCallable().futureCall(request, context));
503+
return get(spannerStub.partitionReadCallable().futureCall(request, context));
492504
}
493505

494506
/** Gets the result of an async RPC call, handling any exceptions encountered. */
@@ -516,9 +528,9 @@ private GrpcCallContext newCallContext(@Nullable Map<Option, ?> options, String
516528
}
517529

518530
public void shutdown() {
519-
this.stub.close();
520-
this.instanceStub.close();
521-
this.databaseStub.close();
531+
this.spannerStub.close();
532+
this.instanceAdminStub.close();
533+
this.databaseAdminStub.close();
522534
}
523535

524536
/**

google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseAdminClientImplTest.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -115,21 +115,25 @@ public void updateDatabaseDdl() throws Exception {
115115
assertThat(op.getName()).isEqualTo(opName);
116116
}
117117

118-
@Ignore("More work needs to be done")
119118
@Test
120-
// TODO(hzyi)
121-
// Changing the surface to OperationFuture breaks updateDatabaseDdl in the case
122-
// that there is already a longrunning operation running. Disabling this test for
123-
// this PR and I will fix this in the next PR.
124119
public void updateDatabaseDdlOpAlreadyExists() throws Exception {
125-
String opName = DB_NAME + "/operations/myop";
126-
String opId = "myop";
120+
String originalOpName = DB_NAME + "/operations/originalop";
121+
String originalOpId = "originalop";
127122
List<String> ddl = ImmutableList.of();
128-
when(rpc.updateDatabaseDdl(DB_NAME, ddl, opId))
129-
.thenThrow(SpannerExceptionFactory.newSpannerException(ErrorCode.ALREADY_EXISTS, ""));
123+
OperationFuture<Empty, UpdateDatabaseDdlMetadata> originalOp =
124+
OperationFutureUtil.immediateOperationFuture(
125+
originalOpName, Empty.getDefaultInstance(), UpdateDatabaseDdlMetadata.getDefaultInstance());
126+
127+
String newOpName = DB_NAME + "/operations/newop";
128+
String newOpId = "newop";
129+
OperationFuture<Empty, UpdateDatabaseDdlMetadata> newop =
130+
OperationFutureUtil.immediateOperationFuture(
131+
newOpName, Empty.getDefaultInstance(), UpdateDatabaseDdlMetadata.getDefaultInstance());
132+
133+
when(rpc.updateDatabaseDdl(DB_NAME, ddl, newOpId)).thenReturn(originalOp);
130134
OperationFuture<Void, UpdateDatabaseDdlMetadata> op =
131-
client.updateDatabaseDdl(INSTANCE_ID, DB_ID, ddl, opId);
132-
assertThat(op.getName()).isEqualTo(opName);
135+
client.updateDatabaseDdl(INSTANCE_ID, DB_ID, ddl, newOpId);
136+
assertThat(op.getName()).isEqualTo(originalOpName);
133137
}
134138

135139
@Test

google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITDatabaseAdminTest.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -108,12 +108,7 @@ public void databaseOperations() throws Exception {
108108
db = dbAdminClient.getDatabase(testHelper.getInstanceId().getInstance(), dbId);
109109
}
110110

111-
@Ignore("More work needs to be done")
112111
@Test
113-
// TODO(hzyi)
114-
// Changing the surface to OperationFuture breaks updateDatabaseDdl in the case
115-
// that there is already a longrunning operation running. Disabling this test for
116-
// this PR and I will fix this in the next PR.
117112
public void updateDdlRetry() throws Exception {
118113
String dbId = testHelper.getUniqueDatabaseId();
119114
String instanceId = testHelper.getInstanceId().getInstance();
@@ -127,6 +122,8 @@ public void updateDdlRetry() throws Exception {
127122
dbAdminClient.updateDatabaseDdl(instanceId, dbId, ImmutableList.of(statement2), "myop");
128123
OperationFuture<Void, UpdateDatabaseDdlMetadata> op2 =
129124
dbAdminClient.updateDatabaseDdl(instanceId, dbId, ImmutableList.of(statement2), "myop");
125+
op1.get();
126+
op2.get();
130127
assertThat(op1.getMetadata().get()).isEqualTo(op2.getMetadata().get());
131128
}
132129

0 commit comments

Comments
 (0)