@@ -234,39 +234,41 @@ private CompletableFuture<Result<WriteOk, Err>> write0(final RequestContext reqC
234234 }
235235
236236 final Err err = r .getErr ();
237- LOG .warn ("Failed to write to {}, retries={}, err={}." , Utils .DB_NAME , retries , err );
238- if (retries + 1 > this .opts .getMaxRetries ()) {
239- LOG .error ("Retried {} times still failed." , retries );
240- return Utils .completedCf (r );
241- }
237+ LOG .warn ("Failed to write to {}, err={}." , Utils .DB_NAME , err );
242238
243239 // Should refresh route table
244240 final Set <String > toRefresh = err .stream () //
245241 .filter (Utils ::shouldRefreshRouteTable ) //
246242 .flatMap (e -> e .getFailedWrites ().stream ()) //
247243 .map (Point ::getTable ) //
248244 .collect (Collectors .toSet ());
245+ this .routerClient .clearRouteCacheBy (toRefresh );
249246
250247 // Should retry
251248 final List <Point > pointsToRetry = err .stream () //
252249 .filter (Utils ::shouldRetry ) //
253250 .flatMap (e -> e .getFailedWrites ().stream ()) //
254251 .collect (Collectors .toList ());
252+ if (pointsToRetry .isEmpty ()) {
253+ return Utils .completedCf (r );
254+ }
255255
256- // Should not retry
257- final Optional < Err > noRetryErr = err . stream () //
258- . filter ( Utils :: shouldNotRetry ) //
259- . reduce ( Err :: combine );
256+ if ( retries + 1 > this . opts . getMaxRetries ()) {
257+ LOG . error ( "Retried {} times still failed." , retries );
258+ return Utils . completedCf ( r );
259+ }
260260
261- // Async refresh route info
262- final CompletableFuture <Result <WriteOk , Err >> rwf = this .routerClient
263- .routeRefreshFor (reqCtx , toRefresh )
261+ final CompletableFuture <Result <WriteOk , Err >> rwf = this .routerClient .routeFor (reqCtx , toRefresh )
264262 // Even for some data that does not require a refresh of the routing table,
265263 // we still wait until the routing table is flushed successfully before
266264 // retrying it, in order to give the server a break.
267265 .thenComposeAsync (routes -> write0 (reqCtx , pointsToRetry , ctx , retries + 1 ),
268266 this .asyncPool );
269267
268+ // Should not retry
269+ final Optional <Err > noRetryErr = err .stream () //
270+ .filter (Utils ::shouldNotRetry ) //
271+ .reduce (Err ::combine );
270272 return noRetryErr .isPresent () ?
271273 rwf .thenApplyAsync (ret -> Utils .combineResult (noRetryErr .get ().mapToResult (), ret ),
272274 this .asyncPool ) :
0 commit comments