Skip to content
This repository was archived by the owner on Mar 12, 2026. It is now read-only.

Commit d42ca5b

Browse files
author
mrrtree
authored
fix: clear route while error occurs (#53)
* fix: clear route while error occurs * fix: grpc metric leak
1 parent 40c2983 commit d42ca5b

8 files changed

Lines changed: 38 additions & 296 deletions

File tree

ceresdb-grpc/src/main/java/io/ceresdb/rpc/GrpcClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -262,8 +262,8 @@ private long onReceived(final boolean onError) {
262262
MetricsUtil.timer(REQ_RT, method.getFullMethodName(), address).update(duration, TimeUnit.MILLISECONDS);
263263

264264
if (onError) {
265-
MetricsUtil.meter(REQ_FAILED, method).mark();
266-
MetricsUtil.meter(REQ_FAILED, method, address).mark();
265+
MetricsUtil.meter(REQ_FAILED, method.getFullMethodName()).mark();
266+
MetricsUtil.meter(REQ_FAILED, method.getFullMethodName(), address).mark();
267267
}
268268

269269
return duration;

ceresdb-protocol/src/main/java/io/ceresdb/QueryClient.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -160,24 +160,26 @@ private CompletableFuture<Result<SqlQueryOk, Err>> query0(final SqlQueryRequest
160160
}
161161

162162
final Err err = r.getErr();
163-
LOG.warn("Failed to read from {}, retries={}, err={}.", Utils.DB_NAME, retries, err);
164-
if (retries > this.opts.getMaxRetries()) {
165-
LOG.error("Retried {} times still failed.", retries);
166-
return Utils.completedCf(r);
167-
}
163+
LOG.warn("Failed to read from {}, err={}.", Utils.DB_NAME, err);
168164

169165
// Should refresh route table
170166
final Set<String> toRefresh = err.stream() //
171167
.filter(Utils::shouldRefreshRouteTable) //
172168
.flatMap(e -> e.getFailedTables().stream()) //
173169
.collect(Collectors.toSet());
170+
this.routerClient.clearRouteCacheBy(toRefresh);
174171

175-
if (toRefresh.isEmpty()) {
172+
// Should not retry
173+
if (Utils.shouldNotRetry(err)) {
176174
return Utils.completedCf(r);
177175
}
178176

179177
// Async to refresh route info
180-
return this.routerClient.routeRefreshFor(req.getReqCtx(), toRefresh)
178+
if (retries > this.opts.getMaxRetries()) {
179+
LOG.error("Retried {} times still failed.", retries);
180+
return Utils.completedCf(r);
181+
}
182+
return this.routerClient.routeFor(req.getReqCtx(), toRefresh)
181183
.thenComposeAsync(routes -> query0(req, ctx, retries + 1), this.asyncPool);
182184
}, this.asyncPool);
183185
}

ceresdb-protocol/src/main/java/io/ceresdb/WriteClient.java

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -234,39 +234,41 @@ private CompletableFuture<Result<WriteOk, Err>> write0(final RequestContext reqC
234234
}
235235

236236
final Err err = r.getErr();
237-
LOG.warn("Failed to write to {}, retries={}, err={}.", Utils.DB_NAME, retries, err);
238-
if (retries + 1 > this.opts.getMaxRetries()) {
239-
LOG.error("Retried {} times still failed.", retries);
240-
return Utils.completedCf(r);
241-
}
237+
LOG.warn("Failed to write to {}, err={}.", Utils.DB_NAME, err);
242238

243239
// Should refresh route table
244240
final Set<String> toRefresh = err.stream() //
245241
.filter(Utils::shouldRefreshRouteTable) //
246242
.flatMap(e -> e.getFailedWrites().stream()) //
247243
.map(Point::getTable) //
248244
.collect(Collectors.toSet());
245+
this.routerClient.clearRouteCacheBy(toRefresh);
249246

250247
// Should retry
251248
final List<Point> pointsToRetry = err.stream() //
252249
.filter(Utils::shouldRetry) //
253250
.flatMap(e -> e.getFailedWrites().stream()) //
254251
.collect(Collectors.toList());
252+
if (pointsToRetry.isEmpty()) {
253+
return Utils.completedCf(r);
254+
}
255255

256-
// Should not retry
257-
final Optional<Err> noRetryErr = err.stream() //
258-
.filter(Utils::shouldNotRetry) //
259-
.reduce(Err::combine);
256+
if (retries + 1 > this.opts.getMaxRetries()) {
257+
LOG.error("Retried {} times still failed.", retries);
258+
return Utils.completedCf(r);
259+
}
260260

261-
// Async refresh route info
262-
final CompletableFuture<Result<WriteOk, Err>> rwf = this.routerClient
263-
.routeRefreshFor(reqCtx, toRefresh)
261+
final CompletableFuture<Result<WriteOk, Err>> rwf = this.routerClient.routeFor(reqCtx, toRefresh)
264262
// Even for some data that does not require a refresh of the routing table,
265263
// we still wait until the routing table is flushed successfully before
266264
// retrying it, in order to give the server a break.
267265
.thenComposeAsync(routes -> write0(reqCtx, pointsToRetry, ctx, retries + 1),
268266
this.asyncPool);
269267

268+
// Should not retry
269+
final Optional<Err> noRetryErr = err.stream() //
270+
.filter(Utils::shouldNotRetry) //
271+
.reduce(Err::combine);
270272
return noRetryErr.isPresent() ?
271273
rwf.thenApplyAsync(ret -> Utils.combineResult(noRetryErr.get().mapToResult(), ret),
272274
this.asyncPool) :

ceresdb-protocol/src/main/java/io/ceresdb/models/Result.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,9 @@
1313
*/
1414
public final class Result<Ok, Err> {
1515

16-
public static final int SUCCESS = 200;
17-
public static final int INVALID_ROUTE = 302;
18-
public static final int SHOULD_RETRY = 310;
19-
public static final int FLOW_CONTROL = 503;
16+
public static final int SUCCESS = 200;
17+
public static final int BAD_REQUEST = 400;
18+
public static final int FLOW_CONTROL = 503;
2019

2120
private final Ok ok;
2221
private final Err err;

ceresdb-protocol/src/main/java/io/ceresdb/util/Utils.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -332,15 +332,11 @@ public static boolean shouldNotRetry(final Err err) {
332332
}
333333

334334
public static boolean shouldRetry(final Err err) {
335-
if (err == null) {
336-
return false;
337-
}
338-
final int errCode = err.getCode();
339-
return errCode == Result.INVALID_ROUTE || errCode == Result.SHOULD_RETRY;
335+
return false;
340336
}
341337

342338
public static boolean shouldRefreshRouteTable(final Err err) {
343-
return err.getCode() == Result.INVALID_ROUTE;
339+
return err != null;
344340
}
345341

346342
public static <V> Observer<V> toUnaryObserver(final CompletableFuture<V> future) {

ceresdb-protocol/src/test/java/io/ceresdb/CeresDBClientTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public void after() {
5757

5858
@Test(expected = IllegalStateException.class)
5959
public void withoutInitTest() {
60-
final List<Point> points = TestUtil.newTablePoints("test_table1_not_init");
60+
final List<Point> points = TestUtil.newTableTwoPoints("test_table1_not_init");
6161
this.client.write(new WriteRequest(points));
6262
}
6363

@@ -80,7 +80,7 @@ public void instancesTest() {
8080
public void helloWorldTest() throws ExecutionException, InterruptedException {
8181
initAndMockWriteClient();
8282

83-
final List<Point> points = TestUtil.newTablePoints("test_table1");
83+
final List<Point> points = TestUtil.newTableTwoPoints("test_table1");
8484

8585
Mockito.when(this.writeClient.write(new WriteRequest(Mockito.anyList()), Mockito.any())) //
8686
.thenReturn(Utils.completedCf(WriteOk.ok(2, 0, null).mapToResult()));

0 commit comments

Comments
 (0)