1616 */
1717package feast .store .serving .redis ;
1818
19+ import feast .core .StoreProto ;
20+ import feast .ingestion .values .FailedElement ;
21+ import feast .retry .BackOffExecutor ;
22+ import feast .retry .Retriable ;
1923import org .apache .avro .reflect .Nullable ;
2024import org .apache .beam .sdk .coders .AvroCoder ;
2125import org .apache .beam .sdk .coders .DefaultCoder ;
2226import org .apache .beam .sdk .transforms .DoFn ;
2327import org .apache .beam .sdk .transforms .PTransform ;
2428import org .apache .beam .sdk .transforms .ParDo ;
29+ import org .apache .beam .sdk .transforms .windowing .GlobalWindow ;
2530import org .apache .beam .sdk .values .PCollection ;
26- import org .apache .beam .sdk .values .PDone ;
31+ import org .apache .commons .lang3 .exception .ExceptionUtils ;
32+ import org .joda .time .Duration ;
33+ import org .joda .time .Instant ;
2734import org .slf4j .Logger ;
2835import org .slf4j .LoggerFactory ;
2936import redis .clients .jedis .Jedis ;
3037import redis .clients .jedis .Pipeline ;
3138import redis .clients .jedis .Response ;
39+ import redis .clients .jedis .exceptions .JedisConnectionException ;
40+
41+ import java .io .IOException ;
42+ import java .util .ArrayList ;
43+ import java .util .List ;
3244
3345public class RedisCustomIO {
3446
@@ -39,8 +51,8 @@ public class RedisCustomIO {
3951
4052 private RedisCustomIO () {}
4153
42- public static Write write (String host , int port ) {
43- return new Write (host , port );
54+ public static Write write (StoreProto . Store . RedisConfig redisConfig ) {
55+ return new Write (redisConfig );
4456 }
4557
4658 public enum Method {
@@ -152,12 +164,12 @@ public void setScore(@Nullable Long score) {
152164 }
153165
154166 /** ServingStoreWrite data to a Redis server. */
155- public static class Write extends PTransform <PCollection <RedisMutation >, PDone > {
167+ public static class Write extends PTransform <PCollection <RedisMutation >, PCollection < FailedElement > > {
156168
157169 private WriteDoFn dofn ;
158170
159- private Write (String host , int port ) {
160- this .dofn = new WriteDoFn (host , port );
171+ private Write (StoreProto . Store . RedisConfig redisConfig ) {
172+ this .dofn = new WriteDoFn (redisConfig );
161173 }
162174
163175 public Write withBatchSize (int batchSize ) {
@@ -171,24 +183,28 @@ public Write withTimeout(int timeout) {
171183 }
172184
173185 @ Override
174- public PDone expand (PCollection <RedisMutation > input ) {
175- input .apply (ParDo .of (dofn ));
176- return PDone .in (input .getPipeline ());
186+ public PCollection <FailedElement > expand (PCollection <RedisMutation > input ) {
187+ return input .apply (ParDo .of (dofn ));
177188 }
178189
179- public static class WriteDoFn extends DoFn <RedisMutation , Void > {
190+ public static class WriteDoFn extends DoFn <RedisMutation , FailedElement > {
180191
181192 private final String host ;
182- private int port ;
193+ private final int port ;
194+ private final BackOffExecutor backOffExecutor ;
195+ private final List <RedisMutation > mutations = new ArrayList <>();
196+
183197 private Jedis jedis ;
184198 private Pipeline pipeline ;
185- private int batchCount ;
186199 private int batchSize = DEFAULT_BATCH_SIZE ;
187200 private int timeout = DEFAULT_TIMEOUT ;
188201
189- WriteDoFn (String host , int port ) {
190- this .host = host ;
191- this .port = port ;
202+ WriteDoFn (StoreProto .Store .RedisConfig redisConfig ) {
203+ this .host = redisConfig .getHost ();
204+ this .port = redisConfig .getPort ();
205+ long backoffMs = redisConfig .getInitialBackoffMs () > 0 ? redisConfig .getInitialBackoffMs () : 1 ;
206+ this .backOffExecutor = new BackOffExecutor (redisConfig .getMaxRetries (),
207+ Duration .millis (backoffMs ));
192208 }
193209
194210 public WriteDoFn withBatchSize (int batchSize ) {
@@ -212,24 +228,69 @@ public void setup() {
212228
213229 @ StartBundle
214230 public void startBundle () {
231+ mutations .clear ();
215232 pipeline = jedis .pipelined ();
216- pipeline .multi ();
217- batchCount = 0 ;
233+ }
234+
235+ private void executeBatch () throws Exception {
236+ backOffExecutor .execute (new Retriable () {
237+ @ Override
238+ public void execute () {
239+ pipeline .multi ();
240+ mutations .forEach (mutation -> {
241+ writeRecord (mutation );
242+ if (mutation .getExpiryMillis () != null && mutation .getExpiryMillis () > 0 ) {
243+ pipeline .pexpire (mutation .getKey (), mutation .getExpiryMillis ());
244+ }
245+ });
246+ pipeline .exec ();
247+ pipeline .sync ();
248+ mutations .clear ();
249+ }
250+
251+ @ Override
252+ public Boolean isExceptionRetriable (Exception e ) {
253+ return e instanceof JedisConnectionException ;
254+ }
255+
256+ @ Override
257+ public void cleanUpAfterFailure () {
258+ try {
259+ pipeline .close ();
260+ } catch (IOException e ) {
261+ log .error (String .format ("Error while closing pipeline: %s" , e .getMessage ()));
262+ }
263+ jedis = new Jedis (host , port , timeout );
264+ pipeline = jedis .pipelined ();
265+ }
266+ });
267+ }
268+
269+ private FailedElement toFailedElement (RedisMutation mutation , Exception exception , String jobName ) {
270+ return FailedElement .newBuilder ()
271+ .setJobName (jobName )
272+ .setTransformName ("RedisCustomIO" )
273+ .setPayload (mutation .getValue ().toString ())
274+ .setErrorMessage (exception .getMessage ())
275+ .setStackTrace (ExceptionUtils .getStackTrace (exception ))
276+ .build ();
218277 }
219278
220279 @ ProcessElement
221280 public void processElement (ProcessContext context ) {
222281 RedisMutation mutation = context .element ();
223- writeRecord (mutation );
224- if (mutation .getExpiryMillis () != null && mutation .getExpiryMillis () > 0 ) {
225- pipeline .pexpire (mutation .getKey (), mutation .getExpiryMillis ());
226- }
227- batchCount ++;
228- if (batchCount >= batchSize ) {
229- pipeline .exec ();
230- pipeline .sync ();
231- pipeline .multi ();
232- batchCount = 0 ;
282+ mutations .add (mutation );
283+ if (mutations .size () >= batchSize ) {
284+ try {
285+ executeBatch ();
286+ } catch (Exception e ) {
287+ mutations .forEach (failedMutation -> {
288+ FailedElement failedElement = toFailedElement (
289+ failedMutation , e , context .getPipelineOptions ().getJobName ());
290+ context .output (failedElement );
291+ });
292+ mutations .clear ();
293+ }
233294 }
234295 }
235296
@@ -254,12 +315,19 @@ private Response<?> writeRecord(RedisMutation mutation) {
254315 }
255316
256317 @ FinishBundle
257- public void finishBundle () {
258- if (pipeline .isInMulti ()) {
259- pipeline .exec ();
260- pipeline .sync ();
318+ public void finishBundle (FinishBundleContext context ) throws IOException , InterruptedException {
319+ if (mutations .size () > 0 ) {
320+ try {
321+ executeBatch ();
322+ } catch (Exception e ) {
323+ mutations .forEach (failedMutation -> {
324+ FailedElement failedElement = toFailedElement (
325+ failedMutation , e , context .getPipelineOptions ().getJobName ());
326+ context .output (failedElement , Instant .now (), GlobalWindow .INSTANCE );
327+ });
328+ mutations .clear ();
329+ }
261330 }
262- batchCount = 0 ;
263331 }
264332
265333 @ Teardown
0 commit comments