Skip to content

Commit 8a0f53b

Browse files
khorshuhengfeast-ci-bot
authored andcommitted
Handle retry for redis io flow (feast-dev#274)
1 parent 3d44ad7 commit 8a0f53b

File tree

6 files changed

+265
-55
lines changed

6 files changed

+265
-55
lines changed

ingestion/src/main/java/feast/ingestion/transform/WriteToStore.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,13 +89,20 @@ public PDone expand(PCollection<FeatureRow> input) {
8989
switch (storeType) {
9090
case REDIS:
9191
RedisConfig redisConfig = getStore().getRedisConfig();
92-
input
92+
PCollection<FailedElement> redisWriteResult = input
9393
.apply(
9494
"FeatureRowToRedisMutation",
9595
ParDo.of(new FeatureRowToRedisMutationDoFn(getFeatureSets())))
9696
.apply(
9797
"WriteRedisMutationToRedis",
98-
RedisCustomIO.write(redisConfig.getHost(), redisConfig.getPort()));
98+
RedisCustomIO.write(redisConfig));
99+
if (options.getDeadLetterTableSpec() != null) {
100+
redisWriteResult.apply(
101+
WriteFailedElementToBigQuery.newBuilder()
102+
.setTableSpec(options.getDeadLetterTableSpec())
103+
.setJsonSchema(ResourceUtil.getDeadletterTableSchemaJson())
104+
.build());
105+
}
99106
break;
100107
case BIGQUERY:
101108
BigQueryConfig bigqueryConfig = getStore().getBigqueryConfig();
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package feast.retry;
2+
3+
import org.apache.beam.sdk.util.BackOff;
4+
import org.apache.beam.sdk.util.BackOffUtils;
5+
import org.apache.beam.sdk.util.FluentBackoff;
6+
import org.apache.beam.sdk.util.Sleeper;
7+
import org.joda.time.Duration;
8+
9+
import java.io.IOException;
10+
import java.io.Serializable;
11+
12+
public class BackOffExecutor implements Serializable {
13+
14+
private static FluentBackoff backoff;
15+
16+
public BackOffExecutor(Integer maxRetries, Duration initialBackOff) {
17+
backoff = FluentBackoff.DEFAULT
18+
.withMaxRetries(maxRetries)
19+
.withInitialBackoff(initialBackOff);
20+
}
21+
22+
public void execute(Retriable retriable) throws Exception {
23+
Sleeper sleeper = Sleeper.DEFAULT;
24+
BackOff backOff = backoff.backoff();
25+
while(true) {
26+
try {
27+
retriable.execute();
28+
break;
29+
} catch (Exception e) {
30+
if(retriable.isExceptionRetriable(e) && BackOffUtils.next(sleeper, backOff)) {
31+
retriable.cleanUpAfterFailure();
32+
} else {
33+
throw e;
34+
}
35+
}
36+
}
37+
}
38+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package feast.retry;
2+
3+
public interface Retriable {
4+
void execute();
5+
Boolean isExceptionRetriable(Exception e);
6+
void cleanUpAfterFailure();
7+
}

ingestion/src/main/java/feast/store/serving/redis/RedisCustomIO.java

Lines changed: 100 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,31 @@
1616
*/
1717
package feast.store.serving.redis;
1818

19+
import feast.core.StoreProto;
20+
import feast.ingestion.values.FailedElement;
21+
import feast.retry.BackOffExecutor;
22+
import feast.retry.Retriable;
1923
import org.apache.avro.reflect.Nullable;
2024
import org.apache.beam.sdk.coders.AvroCoder;
2125
import org.apache.beam.sdk.coders.DefaultCoder;
2226
import org.apache.beam.sdk.transforms.DoFn;
2327
import org.apache.beam.sdk.transforms.PTransform;
2428
import org.apache.beam.sdk.transforms.ParDo;
29+
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
2530
import org.apache.beam.sdk.values.PCollection;
26-
import org.apache.beam.sdk.values.PDone;
31+
import org.apache.commons.lang3.exception.ExceptionUtils;
32+
import org.joda.time.Duration;
33+
import org.joda.time.Instant;
2734
import org.slf4j.Logger;
2835
import org.slf4j.LoggerFactory;
2936
import redis.clients.jedis.Jedis;
3037
import redis.clients.jedis.Pipeline;
3138
import redis.clients.jedis.Response;
39+
import redis.clients.jedis.exceptions.JedisConnectionException;
40+
41+
import java.io.IOException;
42+
import java.util.ArrayList;
43+
import java.util.List;
3244

3345
public class RedisCustomIO {
3446

@@ -39,8 +51,8 @@ public class RedisCustomIO {
3951

4052
private RedisCustomIO() {}
4153

42-
public static Write write(String host, int port) {
43-
return new Write(host, port);
54+
public static Write write(StoreProto.Store.RedisConfig redisConfig) {
55+
return new Write(redisConfig);
4456
}
4557

4658
public enum Method {
@@ -152,12 +164,12 @@ public void setScore(@Nullable Long score) {
152164
}
153165

154166
/** ServingStoreWrite data to a Redis server. */
155-
public static class Write extends PTransform<PCollection<RedisMutation>, PDone> {
167+
public static class Write extends PTransform<PCollection<RedisMutation>, PCollection<FailedElement>> {
156168

157169
private WriteDoFn dofn;
158170

159-
private Write(String host, int port) {
160-
this.dofn = new WriteDoFn(host, port);
171+
private Write(StoreProto.Store.RedisConfig redisConfig) {
172+
this.dofn = new WriteDoFn(redisConfig);
161173
}
162174

163175
public Write withBatchSize(int batchSize) {
@@ -171,24 +183,28 @@ public Write withTimeout(int timeout) {
171183
}
172184

173185
@Override
174-
public PDone expand(PCollection<RedisMutation> input) {
175-
input.apply(ParDo.of(dofn));
176-
return PDone.in(input.getPipeline());
186+
public PCollection<FailedElement> expand(PCollection<RedisMutation> input) {
187+
return input.apply(ParDo.of(dofn));
177188
}
178189

179-
public static class WriteDoFn extends DoFn<RedisMutation, Void> {
190+
public static class WriteDoFn extends DoFn<RedisMutation, FailedElement> {
180191

181192
private final String host;
182-
private int port;
193+
private final int port;
194+
private final BackOffExecutor backOffExecutor;
195+
private final List<RedisMutation> mutations = new ArrayList<>();
196+
183197
private Jedis jedis;
184198
private Pipeline pipeline;
185-
private int batchCount;
186199
private int batchSize = DEFAULT_BATCH_SIZE;
187200
private int timeout = DEFAULT_TIMEOUT;
188201

189-
WriteDoFn(String host, int port) {
190-
this.host = host;
191-
this.port = port;
202+
WriteDoFn(StoreProto.Store.RedisConfig redisConfig) {
203+
this.host = redisConfig.getHost();
204+
this.port = redisConfig.getPort();
205+
long backoffMs = redisConfig.getInitialBackoffMs() > 0 ? redisConfig.getInitialBackoffMs() : 1;
206+
this.backOffExecutor = new BackOffExecutor(redisConfig.getMaxRetries(),
207+
Duration.millis(backoffMs));
192208
}
193209

194210
public WriteDoFn withBatchSize(int batchSize) {
@@ -212,24 +228,69 @@ public void setup() {
212228

213229
@StartBundle
214230
public void startBundle() {
231+
mutations.clear();
215232
pipeline = jedis.pipelined();
216-
pipeline.multi();
217-
batchCount = 0;
233+
}
234+
235+
private void executeBatch() throws Exception {
236+
backOffExecutor.execute(new Retriable() {
237+
@Override
238+
public void execute() {
239+
pipeline.multi();
240+
mutations.forEach(mutation -> {
241+
writeRecord(mutation);
242+
if (mutation.getExpiryMillis() != null && mutation.getExpiryMillis() > 0) {
243+
pipeline.pexpire(mutation.getKey(), mutation.getExpiryMillis());
244+
}
245+
});
246+
pipeline.exec();
247+
pipeline.sync();
248+
mutations.clear();
249+
}
250+
251+
@Override
252+
public Boolean isExceptionRetriable(Exception e) {
253+
return e instanceof JedisConnectionException;
254+
}
255+
256+
@Override
257+
public void cleanUpAfterFailure() {
258+
try {
259+
pipeline.close();
260+
} catch (IOException e) {
261+
log.error(String.format("Error while closing pipeline: %s", e.getMessage()));
262+
}
263+
jedis = new Jedis(host, port, timeout);
264+
pipeline = jedis.pipelined();
265+
}
266+
});
267+
}
268+
269+
private FailedElement toFailedElement(RedisMutation mutation, Exception exception, String jobName) {
270+
return FailedElement.newBuilder()
271+
.setJobName(jobName)
272+
.setTransformName("RedisCustomIO")
273+
.setPayload(mutation.getValue().toString())
274+
.setErrorMessage(exception.getMessage())
275+
.setStackTrace(ExceptionUtils.getStackTrace(exception))
276+
.build();
218277
}
219278

220279
@ProcessElement
221280
public void processElement(ProcessContext context) {
222281
RedisMutation mutation = context.element();
223-
writeRecord(mutation);
224-
if (mutation.getExpiryMillis() != null && mutation.getExpiryMillis() > 0) {
225-
pipeline.pexpire(mutation.getKey(), mutation.getExpiryMillis());
226-
}
227-
batchCount++;
228-
if (batchCount >= batchSize) {
229-
pipeline.exec();
230-
pipeline.sync();
231-
pipeline.multi();
232-
batchCount = 0;
282+
mutations.add(mutation);
283+
if (mutations.size() >= batchSize) {
284+
try {
285+
executeBatch();
286+
} catch (Exception e) {
287+
mutations.forEach(failedMutation -> {
288+
FailedElement failedElement = toFailedElement(
289+
failedMutation, e, context.getPipelineOptions().getJobName());
290+
context.output(failedElement);
291+
});
292+
mutations.clear();
293+
}
233294
}
234295
}
235296

@@ -254,12 +315,19 @@ private Response<?> writeRecord(RedisMutation mutation) {
254315
}
255316

256317
@FinishBundle
257-
public void finishBundle() {
258-
if (pipeline.isInMulti()) {
259-
pipeline.exec();
260-
pipeline.sync();
318+
public void finishBundle(FinishBundleContext context) throws IOException, InterruptedException {
319+
if(mutations.size() > 0) {
320+
try {
321+
executeBatch();
322+
} catch (Exception e) {
323+
mutations.forEach(failedMutation -> {
324+
FailedElement failedElement = toFailedElement(
325+
failedMutation, e, context.getPipelineOptions().getJobName());
326+
context.output(failedElement, Instant.now(), GlobalWindow.INSTANCE);
327+
});
328+
mutations.clear();
329+
}
261330
}
262-
batchCount = 0;
263331
}
264332

265333
@Teardown

0 commit comments

Comments
 (0)