Skip to content

Commit 7ae4d2e

Browse files
authored
fix: retry CreateSession also when waitForMinSessions is zero (#4360)
* fix: retry CreateSession also when waitForMinSessions is zero The creation of a session at startup would only be retried if waitForMinSessions was set. This changed introduces retries for CreateSession at startup if the error code is potentially transient. The number of retries is set to maximum 10. * chore: address review comment + fix test
1 parent fdbeb0e commit 7ae4d2e

File tree

3 files changed

+366
-5
lines changed

3 files changed

+366
-5
lines changed

java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,15 @@
5353
* transactions.
5454
*/
5555
final class MultiplexedSessionDatabaseClient extends AbstractMultiplexedSessionDatabaseClient {
56+
/**
57+
* The maximum number of attempts that the client will try to execute CreateSession for the
58+
* initial multiplexed session. This value is only used for the very first multiplexed session
59+
* that is created, and it is only used if the application has not set a waitForMinSessions value.
60+
* If waitForMinSessions has been set, then the client will retry until the duration in
61+
* waitForMinSessions has been reached.
62+
*/
63+
private static final int MAX_INITIAL_CREATE_SESSION_ATTEMPTS = 10;
64+
5665
@VisibleForTesting
5766
static final Statement DETERMINE_DIALECT_STATEMENT =
5867
Statement.newBuilder(
@@ -226,14 +235,19 @@ public void close() {
226235
final SettableApiFuture<SessionReference> initialSessionReferenceFuture =
227236
SettableApiFuture.create();
228237
this.multiplexedSessionReference = new AtomicReference<>(initialSessionReferenceFuture);
229-
asyncCreateMultiplexedSession(initialSessionReferenceFuture);
238+
239+
Duration waitDuration =
240+
sessionClient.getSpanner().getOptions().getSessionPoolOptions().getWaitForMinSessions();
241+
int initialAttempts =
242+
waitDuration == null || waitDuration.isZero() ? MAX_INITIAL_CREATE_SESSION_ATTEMPTS : 1;
243+
asyncCreateMultiplexedSession(initialSessionReferenceFuture, initialAttempts);
230244
maybeWaitForSessionCreation(
231245
sessionClient.getSpanner().getOptions().getSessionPoolOptions(),
232246
initialSessionReferenceFuture);
233247
}
234248

235249
private void asyncCreateMultiplexedSession(
236-
SettableApiFuture<SessionReference> sessionReferenceFuture) {
250+
SettableApiFuture<SessionReference> sessionReferenceFuture, int remainingAttempts) {
237251
this.sessionClient.asyncCreateMultiplexedSession(
238252
new SessionConsumer() {
239253
@Override
@@ -263,7 +277,15 @@ public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount
263277
MultiplexedSessionDatabaseClient.this.resourceNotFoundException.set(
264278
(ResourceNotFoundException) spannerException);
265279
}
280+
// Set the exception to trigger an error for all waiters.
281+
// Then retry the session creation if the error is (potentially) transient.
266282
sessionReferenceFuture.setException(t);
283+
if (remainingAttempts > 1
284+
&& RETRYABLE_ERROR_CODES.contains(spannerException.getErrorCode())) {
285+
final SettableApiFuture<SessionReference> future = SettableApiFuture.create();
286+
MultiplexedSessionDatabaseClient.this.multiplexedSessionReference.set(future);
287+
asyncCreateMultiplexedSession(future, remainingAttempts - 1);
288+
}
267289
}
268290
});
269291
}
@@ -283,7 +305,7 @@ private void maybeWaitForSessionCreation(
283305
// If any exception is thrown, then retry the multiplexed session creation
284306
if (sessionReferenceFuture == null) {
285307
sessionReferenceFuture = SettableApiFuture.create();
286-
asyncCreateMultiplexedSession(sessionReferenceFuture);
308+
asyncCreateMultiplexedSession(sessionReferenceFuture, 1);
287309
this.multiplexedSessionReference.set(sessionReferenceFuture);
288310
}
289311
try {
Lines changed: 299 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,299 @@
1+
/*
2+
* Copyright 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.spanner;
18+
19+
import static org.junit.Assert.assertEquals;
20+
import static org.junit.Assert.assertNotNull;
21+
import static org.junit.Assert.assertTrue;
22+
23+
import com.google.cloud.NoCredentials;
24+
import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime;
25+
import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult;
26+
import com.google.cloud.spanner.connection.AbstractMockServerTest;
27+
import com.google.cloud.spanner.connection.RandomResultSetGenerator;
28+
import com.google.common.collect.ImmutableList;
29+
import com.google.spanner.v1.BeginTransactionRequest;
30+
import com.google.spanner.v1.CommitRequest;
31+
import com.google.spanner.v1.ReadRequest;
32+
import io.grpc.ManagedChannelBuilder;
33+
import io.grpc.Status;
34+
import java.util.concurrent.atomic.AtomicBoolean;
35+
import org.junit.BeforeClass;
36+
import org.junit.Test;
37+
import org.junit.runner.RunWith;
38+
import org.junit.runners.JUnit4;
39+
40+
@RunWith(JUnit4.class)
41+
public class ExcludeFromChangeStreamTest extends AbstractMockServerTest {
42+
43+
@BeforeClass
44+
public static void setupReadResult() {
45+
RandomResultSetGenerator generator = new RandomResultSetGenerator(10);
46+
mockSpanner.putStatementResult(
47+
StatementResult.query(
48+
Statement.of("SELECT my-column FROM my-table WHERE 1=1"), generator.generate()));
49+
}
50+
51+
private Spanner createSpanner() {
52+
return SpannerOptions.newBuilder()
53+
.setProjectId("fake-project")
54+
.setHost("http://localhost:" + getPort())
55+
.setCredentials(NoCredentials.getInstance())
56+
.setChannelConfigurator(ManagedChannelBuilder::usePlaintext)
57+
.build()
58+
.getService();
59+
}
60+
61+
@Test
62+
public void testStandardTransaction() {
63+
try (Spanner spanner = createSpanner()) {
64+
for (int i = 0; i < 10; i++) {
65+
DatabaseClient client =
66+
spanner.getDatabaseClient(
67+
DatabaseId.of("fake-project", "fake-instance", "fake-database"));
68+
client
69+
.readWriteTransaction(Options.tag("some-tag"), Options.excludeTxnFromChangeStreams())
70+
.run(
71+
transaction -> {
72+
try (ResultSet resultSet =
73+
transaction.read("my-table", KeySet.all(), ImmutableList.of("my-column"))) {
74+
while (resultSet.next()) {}
75+
}
76+
transaction.buffer(
77+
Mutation.newInsertOrUpdateBuilder("my-table")
78+
.set("my-column")
79+
.to(1L)
80+
.build());
81+
return null;
82+
});
83+
assertEquals(0, mockSpanner.countRequestsOfType(BeginTransactionRequest.class));
84+
assertEquals(1, mockSpanner.countRequestsOfType(ReadRequest.class));
85+
assertEquals(1, mockSpanner.countRequestsOfType(CommitRequest.class));
86+
87+
ReadRequest readRequest = mockSpanner.getRequestsOfType(ReadRequest.class).get(0);
88+
assertTrue(readRequest.hasTransaction());
89+
assertTrue(readRequest.getTransaction().hasBegin());
90+
assertTrue(readRequest.getTransaction().getBegin().hasReadWrite());
91+
assertTrue(readRequest.getTransaction().getBegin().getExcludeTxnFromChangeStreams());
92+
93+
CommitRequest commitRequest = mockSpanner.getRequestsOfType(CommitRequest.class).get(0);
94+
assertNotNull(commitRequest.getTransactionId());
95+
96+
mockSpanner.clearRequests();
97+
}
98+
}
99+
}
100+
101+
@Test
102+
public void testTransactionAbortedDuringRead() {
103+
try (Spanner spanner = createSpanner()) {
104+
for (int i = 0; i < 10; i++) {
105+
DatabaseClient client =
106+
spanner.getDatabaseClient(
107+
DatabaseId.of("fake-project", "fake-instance", "fake-database"));
108+
AtomicBoolean hasAborted = new AtomicBoolean(false);
109+
client
110+
.readWriteTransaction(Options.tag("some-tag"), Options.excludeTxnFromChangeStreams())
111+
.run(
112+
transaction -> {
113+
if (hasAborted.compareAndSet(false, true)) {
114+
mockSpanner.abortNextStatement();
115+
}
116+
try (ResultSet resultSet =
117+
transaction.read("my-table", KeySet.all(), ImmutableList.of("my-column"))) {
118+
while (resultSet.next()) {}
119+
}
120+
transaction.buffer(
121+
Mutation.newInsertOrUpdateBuilder("my-table")
122+
.set("my-column")
123+
.to(1L)
124+
.build());
125+
return null;
126+
});
127+
assertEquals(1, mockSpanner.countRequestsOfType(BeginTransactionRequest.class));
128+
assertEquals(2, mockSpanner.countRequestsOfType(ReadRequest.class));
129+
assertEquals(1, mockSpanner.countRequestsOfType(CommitRequest.class));
130+
131+
BeginTransactionRequest beginRequest =
132+
mockSpanner.getRequestsOfType(BeginTransactionRequest.class).get(0);
133+
assertTrue(beginRequest.getOptions().hasReadWrite());
134+
assertTrue(beginRequest.getOptions().getExcludeTxnFromChangeStreams());
135+
136+
ReadRequest firstReadRequest = mockSpanner.getRequestsOfType(ReadRequest.class).get(0);
137+
assertTrue(firstReadRequest.hasTransaction());
138+
assertTrue(firstReadRequest.getTransaction().hasBegin());
139+
assertTrue(firstReadRequest.getTransaction().getBegin().hasReadWrite());
140+
assertTrue(firstReadRequest.getTransaction().getBegin().getExcludeTxnFromChangeStreams());
141+
142+
ReadRequest secondReadRequest = mockSpanner.getRequestsOfType(ReadRequest.class).get(1);
143+
assertTrue(secondReadRequest.hasTransaction());
144+
assertTrue(secondReadRequest.getTransaction().hasId());
145+
146+
CommitRequest commitRequest = mockSpanner.getRequestsOfType(CommitRequest.class).get(0);
147+
assertNotNull(commitRequest.getTransactionId());
148+
149+
mockSpanner.clearRequests();
150+
}
151+
}
152+
}
153+
154+
@Test
155+
public void testTransactionAbortedDuringCommit() {
156+
try (Spanner spanner = createSpanner()) {
157+
for (int i = 0; i < 10; i++) {
158+
DatabaseClient client =
159+
spanner.getDatabaseClient(
160+
DatabaseId.of("fake-project", "fake-instance", "fake-database"));
161+
AtomicBoolean hasAborted = new AtomicBoolean(false);
162+
client
163+
.readWriteTransaction(Options.tag("some-tag"), Options.excludeTxnFromChangeStreams())
164+
.run(
165+
transaction -> {
166+
try (ResultSet resultSet =
167+
transaction.read("my-table", KeySet.all(), ImmutableList.of("my-column"))) {
168+
while (resultSet.next()) {}
169+
}
170+
if (hasAborted.compareAndSet(false, true)) {
171+
mockSpanner.abortNextStatement();
172+
}
173+
transaction.buffer(
174+
Mutation.newInsertOrUpdateBuilder("my-table")
175+
.set("my-column")
176+
.to(1L)
177+
.build());
178+
return null;
179+
});
180+
assertEquals(0, mockSpanner.countRequestsOfType(BeginTransactionRequest.class));
181+
assertEquals(2, mockSpanner.countRequestsOfType(ReadRequest.class));
182+
assertEquals(2, mockSpanner.countRequestsOfType(CommitRequest.class));
183+
184+
ReadRequest firstReadRequest = mockSpanner.getRequestsOfType(ReadRequest.class).get(0);
185+
assertTrue(firstReadRequest.hasTransaction());
186+
assertTrue(firstReadRequest.getTransaction().hasBegin());
187+
assertTrue(firstReadRequest.getTransaction().getBegin().hasReadWrite());
188+
assertTrue(firstReadRequest.getTransaction().getBegin().getExcludeTxnFromChangeStreams());
189+
190+
ReadRequest secondReadRequest = mockSpanner.getRequestsOfType(ReadRequest.class).get(1);
191+
assertTrue(secondReadRequest.hasTransaction());
192+
assertTrue(secondReadRequest.getTransaction().hasBegin());
193+
assertTrue(secondReadRequest.getTransaction().getBegin().hasReadWrite());
194+
assertTrue(secondReadRequest.getTransaction().getBegin().getExcludeTxnFromChangeStreams());
195+
196+
for (CommitRequest commitRequest : mockSpanner.getRequestsOfType(CommitRequest.class)) {
197+
assertNotNull(commitRequest.getTransactionId());
198+
}
199+
mockSpanner.clearRequests();
200+
}
201+
}
202+
}
203+
204+
@Test
205+
public void testReadReturnsUnavailable() {
206+
207+
try (Spanner spanner = createSpanner()) {
208+
for (int i = 0; i < 10; i++) {
209+
mockSpanner.setStreamingReadExecutionTime(
210+
SimulatedExecutionTime.ofException(Status.UNAVAILABLE.asRuntimeException()));
211+
DatabaseClient client =
212+
spanner.getDatabaseClient(
213+
DatabaseId.of("fake-project", "fake-instance", "fake-database"));
214+
client
215+
.readWriteTransaction(Options.tag("some-tag"), Options.excludeTxnFromChangeStreams())
216+
.run(
217+
transaction -> {
218+
try (ResultSet resultSet =
219+
transaction.read("my-table", KeySet.all(), ImmutableList.of("my-column"))) {
220+
while (resultSet.next()) {}
221+
}
222+
transaction.buffer(
223+
Mutation.newInsertOrUpdateBuilder("my-table")
224+
.set("my-column")
225+
.to(1L)
226+
.build());
227+
return null;
228+
});
229+
assertEquals(0, mockSpanner.countRequestsOfType(BeginTransactionRequest.class));
230+
assertEquals(2, mockSpanner.countRequestsOfType(ReadRequest.class));
231+
assertEquals(1, mockSpanner.countRequestsOfType(CommitRequest.class));
232+
233+
ReadRequest firstReadRequest = mockSpanner.getRequestsOfType(ReadRequest.class).get(0);
234+
assertTrue(firstReadRequest.hasTransaction());
235+
assertTrue(firstReadRequest.getTransaction().hasBegin());
236+
assertTrue(firstReadRequest.getTransaction().getBegin().hasReadWrite());
237+
assertTrue(firstReadRequest.getTransaction().getBegin().getExcludeTxnFromChangeStreams());
238+
239+
ReadRequest secondReadRequest = mockSpanner.getRequestsOfType(ReadRequest.class).get(1);
240+
assertTrue(secondReadRequest.hasTransaction());
241+
assertTrue(secondReadRequest.getTransaction().hasBegin());
242+
assertTrue(secondReadRequest.getTransaction().getBegin().hasReadWrite());
243+
assertTrue(secondReadRequest.getTransaction().getBegin().getExcludeTxnFromChangeStreams());
244+
245+
CommitRequest commitRequest = mockSpanner.getRequestsOfType(CommitRequest.class).get(0);
246+
assertNotNull(commitRequest.getTransactionId());
247+
248+
mockSpanner.clearRequests();
249+
}
250+
}
251+
}
252+
253+
@Test
254+
public void testReadReturnsUnavailableHalfway() {
255+
try (Spanner spanner = createSpanner()) {
256+
for (int i = 0; i < 10; i++) {
257+
mockSpanner.setStreamingReadExecutionTime(
258+
SimulatedExecutionTime.ofStreamException(Status.UNAVAILABLE.asRuntimeException(), 2));
259+
260+
DatabaseClient client =
261+
spanner.getDatabaseClient(
262+
DatabaseId.of("fake-project", "fake-instance", "fake-database"));
263+
client
264+
.readWriteTransaction(Options.tag("some-tag"), Options.excludeTxnFromChangeStreams())
265+
.run(
266+
transaction -> {
267+
try (ResultSet resultSet =
268+
transaction.read("my-table", KeySet.all(), ImmutableList.of("my-column"))) {
269+
while (resultSet.next()) {}
270+
}
271+
transaction.buffer(
272+
Mutation.newInsertOrUpdateBuilder("my-table")
273+
.set("my-column")
274+
.to(1L)
275+
.build());
276+
return null;
277+
});
278+
assertEquals(0, mockSpanner.countRequestsOfType(BeginTransactionRequest.class));
279+
assertEquals(2, mockSpanner.countRequestsOfType(ReadRequest.class));
280+
assertEquals(1, mockSpanner.countRequestsOfType(CommitRequest.class));
281+
282+
ReadRequest firstReadRequest = mockSpanner.getRequestsOfType(ReadRequest.class).get(0);
283+
assertTrue(firstReadRequest.hasTransaction());
284+
assertTrue(firstReadRequest.getTransaction().hasBegin());
285+
assertTrue(firstReadRequest.getTransaction().getBegin().hasReadWrite());
286+
assertTrue(firstReadRequest.getTransaction().getBegin().getExcludeTxnFromChangeStreams());
287+
288+
ReadRequest secondReadRequest = mockSpanner.getRequestsOfType(ReadRequest.class).get(1);
289+
assertTrue(secondReadRequest.hasTransaction());
290+
assertTrue(secondReadRequest.getTransaction().hasId());
291+
292+
CommitRequest commitRequest = mockSpanner.getRequestsOfType(CommitRequest.class).get(0);
293+
assertNotNull(commitRequest.getTransactionId());
294+
295+
mockSpanner.clearRequests();
296+
}
297+
}
298+
}
299+
}

0 commit comments

Comments
 (0)