Skip to content

Commit 9bfdc21

Browse files
honnixclaude
andauthored
feat: add retry with exponential backoff for artifact downloads (#326)
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent a8c6416 commit 9bfdc21

7 files changed

Lines changed: 276 additions & 156 deletions

File tree

.claude/rules/git-signoff.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# Git Sign-off
2+
3+
All commits in this repo must be signed off. Always use `--signoff` (or `-s`) when creating commits.

jflyte-utils/src/main/java/org/flyte/jflyte/utils/FlyteAdminClient.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,14 @@
3232
import io.grpc.ManagedChannel;
3333
import io.grpc.ManagedChannelBuilder;
3434
import io.grpc.Status;
35+
import io.grpc.Status.Code;
3536
import io.grpc.StatusRuntimeException;
3637
import java.util.List;
3738
import java.util.Map;
39+
import java.util.Set;
3840
import java.util.function.Function;
41+
import java.util.stream.Collectors;
42+
import java.util.stream.Stream;
3943
import javax.annotation.Nullable;
4044
import org.flyte.api.v1.LaunchPlan;
4145
import org.flyte.api.v1.LaunchPlanIdentifier;
@@ -58,13 +62,17 @@ public class FlyteAdminClient implements AutoCloseable {
5862
static final String TRIGGERING_PRINCIPAL = "sdk";
5963
static final int USER_TRIGGERED_EXECUTION_NESTING = 0;
6064

65+
private static final Set<Code> GRPC_RETRYABLE_CODES =
66+
Stream.of(Code.UNAVAILABLE, Code.DEADLINE_EXCEEDED, Code.INTERNAL)
67+
.collect(Collectors.toSet());
68+
6169
private final AdminServiceGrpc.AdminServiceBlockingStub stub;
6270
private final ManagedChannel channel;
63-
private final GrpcRetries retries;
71+
private final Retries retries;
6472

6573
@VisibleForTesting
6674
FlyteAdminClient(
67-
AdminServiceGrpc.AdminServiceBlockingStub stub, ManagedChannel channel, GrpcRetries retries) {
75+
AdminServiceGrpc.AdminServiceBlockingStub stub, ManagedChannel channel, Retries retries) {
6876
this.stub = stub;
6977
this.channel = channel;
7078
this.retries = retries;
@@ -79,7 +87,7 @@ public static FlyteAdminClient create(
7987
}
8088

8189
ManagedChannel originChannel = builder.build();
82-
GrpcRetries retries = GrpcRetries.create();
90+
Retries retries = createGrpcRetries();
8391
if (tokenSource == null) {
8492
// In case of no tokenSource, no need to intercept the grpc call.
8593
return new FlyteAdminClient(
@@ -236,7 +244,7 @@ private <T, RespT> T fetchLatestResource(
236244
return deserializeFn.apply(list.get(0));
237245
}
238246

239-
private <T> void idempotentCreate(String label, Object id, GrpcRetries.Retryable<T> retryable) {
247+
private <T> void idempotentCreate(String label, Object id, Retries.Retryable<T> retryable) {
240248
try {
241249
// create operation is idempotent, so it's fine to retry
242250
T response = retries.retry(retryable);
@@ -252,6 +260,18 @@ private <T> void idempotentCreate(String label, Object id, GrpcRetries.Retryable
252260
}
253261
}
254262

263+
private static Retries createGrpcRetries() {
264+
return Retries.create(
265+
/* maxRetries= */ 10,
266+
/* maxDelayMilliseconds= */ 5_000L,
267+
/* initialDelayMilliseconds= */ 250L,
268+
/* sleeper= */ Thread::sleep,
269+
e ->
270+
e instanceof StatusRuntimeException
271+
&& GRPC_RETRYABLE_CODES.contains(
272+
((StatusRuntimeException) e).getStatus().getCode()));
273+
}
274+
255275
@Override
256276
public void close() {
257277
if (channel != null) {

jflyte-utils/src/main/java/org/flyte/jflyte/utils/GrpcRetries.java

Lines changed: 0 additions & 112 deletions
This file was deleted.

jflyte-utils/src/main/java/org/flyte/jflyte/utils/PackageLoader.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.nio.channels.ReadableByteChannel;
2323
import java.nio.file.Files;
2424
import java.nio.file.Path;
25+
import java.nio.file.StandardCopyOption;
2526
import java.util.HashMap;
2627
import java.util.List;
2728
import java.util.Map;
@@ -85,17 +86,33 @@ private static CompletableFuture<Void> handleArtifact(
8586
() -> handleArtifact(fileSystems, artifact, tmp), executorService);
8687
}
8788

89+
private static final Retries IO_RETRIES =
90+
Retries.create(
91+
/* maxRetries= */ 5,
92+
/* maxDelayMilliseconds= */ 10_000L,
93+
/* initialDelayMilliseconds= */ 1_000L,
94+
/* sleeper= */ Thread::sleep,
95+
e -> e instanceof IOException);
96+
8897
private static void handleArtifact(
8998
Map<String, FileSystem> fileSystems, Artifact artifact, Path tmp) {
9099
Path path = tmp.resolve(artifact.name());
91100
FileSystem fileSystem = FileSystemLoader.getFileSystem(fileSystems, artifact.location());
92101

93-
try (ReadableByteChannel reader = fileSystem.reader(artifact.location())) {
94-
LOG.debug("Copied {} to {}", artifact.location(), path);
102+
try {
103+
IO_RETRIES.retryChecked(
104+
() -> {
105+
try (ReadableByteChannel reader = fileSystem.reader(artifact.location())) {
106+
LOG.debug("Copying {} to {}", artifact.location(), path);
95107

96-
Files.copy(Channels.newInputStream(reader), path);
108+
Files.copy(
109+
Channels.newInputStream(reader), path, StandardCopyOption.REPLACE_EXISTING);
110+
}
111+
});
97112
} catch (IOException e) {
98113
throw new UncheckedIOException(e);
114+
} catch (Exception e) {
115+
throw new RuntimeException(e);
99116
}
100117
}
101118
}
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* Copyright 2020-2026 Flyte Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing,
11+
* software distributed under the License is distributed on an
12+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
13+
* KIND, either express or implied. See the License for the
14+
* specific language governing permissions and limitations
15+
* under the License.
16+
*/
17+
package org.flyte.jflyte.utils;
18+
19+
import com.google.auto.value.AutoValue;
20+
import com.google.errorprone.annotations.Var;
21+
import java.util.concurrent.Callable;
22+
import java.util.function.Predicate;
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
26+
/** Tiny utility to retry operations with exponential backoff. */
27+
@AutoValue
28+
abstract class Retries {
29+
private static final Logger LOG = LoggerFactory.getLogger(Retries.class);
30+
31+
public abstract int maxRetries();
32+
33+
public abstract long maxDelayMilliseconds();
34+
35+
public abstract long initialDelayMilliseconds();
36+
37+
abstract Sleeper sleeper();
38+
39+
abstract Predicate<Exception> retryableCheck();
40+
41+
public interface Sleeper {
42+
void sleep(long millis) throws InterruptedException;
43+
}
44+
45+
/** Like Callable, but doesn't throw a checked exception. */
46+
interface Retryable<T> extends Callable<T> {
47+
@Override
48+
T call();
49+
}
50+
51+
interface CheckedRunnable {
52+
void run() throws Exception;
53+
}
54+
55+
/**
56+
* Retry a callable that only throws unchecked exceptions. Uses a sneaky-throw to delegate to
57+
* {@link #retryInternal} without catching and re-wrapping: the compiler infers {@code E} as
58+
* {@code RuntimeException}, so no checked exception is declared at call sites, while the original
59+
* exception propagates unchanged at runtime thanks to type erasure.
60+
*
61+
* @param retryable The retry function.
62+
* @return The result of the retry.
63+
* @param <T> The type T of the return resolved object.
64+
* @param <E> Inferred as RuntimeException at call sites; enables sneaky-throw.
65+
*/
66+
@SuppressWarnings("unchecked")
67+
public <T, E extends Exception> T retry(Retryable<T> retryable) throws E {
68+
try {
69+
return retryInternal(retryable);
70+
} catch (Exception e) {
71+
throw (E) e;
72+
}
73+
}
74+
75+
/**
76+
* Retry a runnable that may throw checked exceptions.
77+
*
78+
* @param runnable The retry function.
79+
*/
80+
public void retryChecked(CheckedRunnable runnable) throws Exception {
81+
retryInternal(
82+
() -> {
83+
runnable.run();
84+
return null;
85+
});
86+
}
87+
88+
private <T> T retryInternal(Callable<T> callable) throws Exception {
89+
@Var int attempt = 0;
90+
91+
do {
92+
try {
93+
return callable.call();
94+
} catch (Exception e) {
95+
if (attempt < maxRetries() && retryableCheck().test(e)) {
96+
long delay =
97+
Math.min(maxDelayMilliseconds(), (1L << attempt) * initialDelayMilliseconds());
98+
LOG.warn("Retrying in " + delay + " ms", e);
99+
100+
try {
101+
sleeper().sleep(delay);
102+
} catch (InterruptedException interrupted) {
103+
Thread.currentThread().interrupt();
104+
throw new RuntimeException(e);
105+
}
106+
107+
attempt++;
108+
} else {
109+
throw e;
110+
}
111+
}
112+
} while (true);
113+
}
114+
115+
static Retries create(
116+
int maxRetries,
117+
long maxDelayMilliseconds,
118+
long initialDelayMilliseconds,
119+
Sleeper sleeper,
120+
Predicate<Exception> retryableCheck) {
121+
return new AutoValue_Retries(
122+
maxRetries, maxDelayMilliseconds, initialDelayMilliseconds, sleeper, retryableCheck);
123+
}
124+
}

0 commit comments

Comments
 (0)