2020import feast .ingestion .values .FailedElement ;
2121import feast .retry .BackOffExecutor ;
2222import feast .retry .Retriable ;
23+ import java .io .IOException ;
24+ import java .util .ArrayList ;
25+ import java .util .List ;
2326import org .apache .avro .reflect .Nullable ;
2427import org .apache .beam .sdk .coders .AvroCoder ;
2528import org .apache .beam .sdk .coders .DefaultCoder ;
3841import redis .clients .jedis .Response ;
3942import redis .clients .jedis .exceptions .JedisConnectionException ;
4043
41- import java .io .IOException ;
42- import java .util .ArrayList ;
43- import java .util .List ;
44-
4544public class RedisCustomIO {
4645
4746 private static final int DEFAULT_BATCH_SIZE = 1000 ;
@@ -164,7 +163,8 @@ public void setScore(@Nullable Long score) {
164163 }
165164
166165 /** ServingStoreWrite data to a Redis server. */
167- public static class Write extends PTransform <PCollection <RedisMutation >, PCollection <FailedElement >> {
166+ public static class Write
167+ extends PTransform <PCollection <RedisMutation >, PCollection <FailedElement >> {
168168
169169 private WriteDoFn dofn ;
170170
@@ -202,9 +202,10 @@ public static class WriteDoFn extends DoFn<RedisMutation, FailedElement> {
202202 WriteDoFn (StoreProto .Store .RedisConfig redisConfig ) {
203203 this .host = redisConfig .getHost ();
204204 this .port = redisConfig .getPort ();
205- long backoffMs = redisConfig .getInitialBackoffMs () > 0 ? redisConfig .getInitialBackoffMs () : 1 ;
206- this .backOffExecutor = new BackOffExecutor (redisConfig .getMaxRetries (),
207- Duration .millis (backoffMs ));
205+ long backoffMs =
206+ redisConfig .getInitialBackoffMs () > 0 ? redisConfig .getInitialBackoffMs () : 1 ;
207+ this .backOffExecutor =
208+ new BackOffExecutor (redisConfig .getMaxRetries (), Duration .millis (backoffMs ));
208209 }
209210
210211 public WriteDoFn withBatchSize (int batchSize ) {
@@ -233,47 +234,50 @@ public void startBundle() {
233234 }
234235
235236 private void executeBatch () throws Exception {
236- backOffExecutor .execute (new Retriable () {
237- @ Override
238- public void execute () {
239- pipeline .multi ();
240- mutations .forEach (mutation -> {
241- writeRecord (mutation );
242- if (mutation .getExpiryMillis () != null && mutation .getExpiryMillis () > 0 ) {
243- pipeline .pexpire (mutation .getKey (), mutation .getExpiryMillis ());
237+ backOffExecutor .execute (
238+ new Retriable () {
239+ @ Override
240+ public void execute () {
241+ pipeline .multi ();
242+ mutations .forEach (
243+ mutation -> {
244+ writeRecord (mutation );
245+ if (mutation .getExpiryMillis () != null && mutation .getExpiryMillis () > 0 ) {
246+ pipeline .pexpire (mutation .getKey (), mutation .getExpiryMillis ());
247+ }
248+ });
249+ pipeline .exec ();
250+ pipeline .sync ();
251+ mutations .clear ();
244252 }
245- });
246- pipeline .exec ();
247- pipeline .sync ();
248- mutations .clear ();
249- }
250253
251- @ Override
252- public Boolean isExceptionRetriable (Exception e ) {
253- return e instanceof JedisConnectionException ;
254- }
254+ @ Override
255+ public Boolean isExceptionRetriable (Exception e ) {
256+ return e instanceof JedisConnectionException ;
257+ }
255258
256- @ Override
257- public void cleanUpAfterFailure () {
258- try {
259- pipeline .close ();
260- } catch (IOException e ) {
261- log .error (String .format ("Error while closing pipeline: %s" , e .getMessage ()));
262- }
263- jedis = new Jedis (host , port , timeout );
264- pipeline = jedis .pipelined ();
265- }
266- });
259+ @ Override
260+ public void cleanUpAfterFailure () {
261+ try {
262+ pipeline .close ();
263+ } catch (IOException e ) {
264+ log .error (String .format ("Error while closing pipeline: %s" , e .getMessage ()));
265+ }
266+ jedis = new Jedis (host , port , timeout );
267+ pipeline = jedis .pipelined ();
268+ }
269+ });
267270 }
268271
269- private FailedElement toFailedElement (RedisMutation mutation , Exception exception , String jobName ) {
272+ private FailedElement toFailedElement (
273+ RedisMutation mutation , Exception exception , String jobName ) {
270274 return FailedElement .newBuilder ()
271- .setJobName (jobName )
272- .setTransformName ("RedisCustomIO" )
273- .setPayload (mutation .getValue ().toString ())
274- .setErrorMessage (exception .getMessage ())
275- .setStackTrace (ExceptionUtils .getStackTrace (exception ))
276- .build ();
275+ .setJobName (jobName )
276+ .setTransformName ("RedisCustomIO" )
277+ .setPayload (mutation .getValue ().toString ())
278+ .setErrorMessage (exception .getMessage ())
279+ .setStackTrace (ExceptionUtils .getStackTrace (exception ))
280+ .build ();
277281 }
278282
279283 @ ProcessElement
@@ -284,11 +288,12 @@ public void processElement(ProcessContext context) {
284288 try {
285289 executeBatch ();
286290 } catch (Exception e ) {
287- mutations .forEach (failedMutation -> {
288- FailedElement failedElement = toFailedElement (
289- failedMutation , e , context .getPipelineOptions ().getJobName ());
290- context .output (failedElement );
291- });
291+ mutations .forEach (
292+ failedMutation -> {
293+ FailedElement failedElement =
294+ toFailedElement (failedMutation , e , context .getPipelineOptions ().getJobName ());
295+ context .output (failedElement );
296+ });
292297 mutations .clear ();
293298 }
294299 }
@@ -315,16 +320,18 @@ private Response<?> writeRecord(RedisMutation mutation) {
315320 }
316321
317322 @ FinishBundle
318- public void finishBundle (FinishBundleContext context ) throws IOException , InterruptedException {
319- if (mutations .size () > 0 ) {
323+ public void finishBundle (FinishBundleContext context )
324+ throws IOException , InterruptedException {
325+ if (mutations .size () > 0 ) {
320326 try {
321327 executeBatch ();
322328 } catch (Exception e ) {
323- mutations .forEach (failedMutation -> {
324- FailedElement failedElement = toFailedElement (
325- failedMutation , e , context .getPipelineOptions ().getJobName ());
326- context .output (failedElement , Instant .now (), GlobalWindow .INSTANCE );
327- });
329+ mutations .forEach (
330+ failedMutation -> {
331+ FailedElement failedElement =
332+ toFailedElement (failedMutation , e , context .getPipelineOptions ().getJobName ());
333+ context .output (failedElement , Instant .now (), GlobalWindow .INSTANCE );
334+ });
328335 mutations .clear ();
329336 }
330337 }
0 commit comments