Skip to content

Commit b5dad5a

Browse files
committed
Retry connection failure while instantiating connection provider
Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>
1 parent 2871ba6 commit b5dad5a

File tree

3 files changed

+120
-1
lines changed

3 files changed

+120
-1
lines changed
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
* Copyright 2018-2022 The Feast Authors
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* https://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package feast.ingestion.errorhanders
18+
19+
import scala.annotation.tailrec
20+
import scala.concurrent.duration.Duration
21+
22+
object RetryStrategy {
23+
24+
@tailrec
25+
def fixedBackOff[T <: Exception, U](retryInterval: Duration, maxRetries: Int)(
26+
fn: => Either[T, U]
27+
): U = {
28+
fn match {
29+
case Right(x) => x
30+
case Left(_) if maxRetries > 0 => {
31+
Thread.sleep(retryInterval.toMillis)
32+
fixedBackOff(retryInterval, maxRetries - 1)(fn)
33+
}
34+
case Left(e) => throw (e)
35+
}
36+
}
37+
}

spark/ingestion/src/main/scala/feast/ingestion/stores/redis/ClusterPipelineProvider.scala

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,14 @@
1616
*/
1717
package feast.ingestion.stores.redis
1818

19+
import feast.ingestion.errorhanders.RetryStrategy
20+
import redis.clients.jedis.exceptions.JedisClusterOperationException
1921
import redis.clients.jedis.{ClusterPipeline, DefaultJedisClientConfig, HostAndPort}
2022
import redis.clients.jedis.providers.ClusterConnectionProvider
2123

2224
import scala.collection.JavaConverters._
25+
import scala.concurrent.duration.DurationInt
26+
import scala.util.{Failure, Success, Try}
2327

2428
/**
2529
* Provide pipeline for Redis cluster.
@@ -31,7 +35,17 @@ case class ClusterPipelineProvider(endpoint: RedisEndpoint) extends PipelineProv
3135
.builder()
3236
.password(endpoint.password)
3337
.build()
34-
val provider = new ClusterConnectionProvider(nodes, DEFAULT_CLIENT_CONFIG)
38+
val MAX_RECONNECTION_ATTEMPT = 2
39+
val RETRY_INTERVAL = 2.seconds
40+
val provider = RetryStrategy.fixedBackOff(RETRY_INTERVAL, MAX_RECONNECTION_ATTEMPT)(getProvider)
41+
42+
def getProvider: Either[JedisClusterOperationException, ClusterConnectionProvider] = {
43+
Try { new ClusterConnectionProvider(nodes, DEFAULT_CLIENT_CONFIG) } match {
44+
case Success(provider) => Right(provider)
45+
case Failure(e: JedisClusterOperationException) => Left(e)
46+
case Failure(e) => throw e
47+
}
48+
}
3549

3650
/**
3751
* @return a cluster pipeline
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
* Copyright 2018-2022 The Feast Authors
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* https://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package feast.ingestion.errorhandlers
18+
19+
import feast.ingestion.UnitSpec
20+
import feast.ingestion.errorhanders.RetryStrategy
21+
22+
import scala.concurrent.duration.DurationInt
23+
import scala.util.{Failure, Try}
24+
25+
class RetryStrategySpec extends UnitSpec {
26+
27+
case class SomeException(message: String) extends Exception
28+
29+
"function that always succeed" should "not be retried" in {
30+
var i = 0
31+
32+
def alwaysSucceedFunc: Either[SomeException, Int] = {
33+
i += 1
34+
Right(0)
35+
}
36+
val result = RetryStrategy.fixedBackOff(1.second, 2)(alwaysSucceedFunc)
37+
i should be(1)
38+
result should be(0)
39+
}
40+
41+
"function that always failed" should "be retried up to the maximum attempt and throw exception" in {
42+
var i = 0
43+
44+
def alwaysFailFunc: Either[SomeException, Int] = {
45+
i += 1
46+
Left(SomeException("error"))
47+
}
48+
val result = Try { RetryStrategy.fixedBackOff(10.milli, 2)(alwaysFailFunc) }
49+
i should be(3)
50+
result should matchPattern { case Failure(_: SomeException) => }
51+
}
52+
53+
"function that succeeds when retried" should "immediately return value when succeeded" in {
54+
var i = 0
55+
56+
def succeedWhenRetriedFunc: Either[SomeException, Int] = {
57+
i += 1
58+
if (i < 2) {
59+
Left(SomeException("error"))
60+
} else {
61+
Right(0)
62+
}
63+
}
64+
val result = RetryStrategy.fixedBackOff(1.second, 2)(succeedWhenRetriedFunc)
65+
i should be(2)
66+
result should be(0)
67+
}
68+
}

0 commit comments

Comments
 (0)