Skip to content

Commit e579a01

Browse files
authored
Persist information on whether an endpoint is in cluster mode or not (#146)
Signed-off-by: Khor Shu Heng <khor.heng@gojek.com> Co-authored-by: Khor Shu Heng <khor.heng@gojek.com>
1 parent fef7c57 commit e579a01

File tree

1 file changed

+13
-12
lines changed

1 file changed

+13
-12
lines changed

spark/ingestion/src/main/scala/feast/ingestion/stores/redis/RedisSinkRelation.scala

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,19 @@ class RedisSinkRelation(override val sqlContext: SQLContext, config: SparkRedisC
5454

5555
val sparkConf: SparkConf = sqlContext.sparkContext.getConf
5656

57+
lazy val endpoint: RedisEndpoint = RedisEndpoint(
58+
host = sparkConf.get("spark.redis.host"),
59+
port = sparkConf.get("spark.redis.port").toInt,
60+
password = sparkConf.get("spark.redis.password", "")
61+
)
62+
63+
lazy val properties: RedisWriteProperties = RedisWriteProperties(
64+
maxJitterSeconds = sparkConf.get("spark.redis.properties.maxJitter").toInt,
65+
pipelineSize = sparkConf.get("spark.redis.properties.pipelineSize").toInt
66+
)
67+
68+
lazy val isClusterMode: Boolean = checkIfInClusterMode(endpoint)
69+
5770
def newJedisClient(endpoint: RedisEndpoint): Jedis = {
5871
val jedis = new Jedis(endpoint.host, endpoint.port)
5972
if (endpoint.password.nonEmpty) {
@@ -78,18 +91,6 @@ class RedisSinkRelation(override val sqlContext: SQLContext, config: SparkRedisC
7891
.localCheckpoint()
7992
else data
8093

81-
val endpoint = RedisEndpoint(
82-
host = sparkConf.get("spark.redis.host"),
83-
port = sparkConf.get("spark.redis.port").toInt,
84-
password = sparkConf.get("spark.redis.password", "")
85-
)
86-
val properties = RedisWriteProperties(
87-
maxJitterSeconds = sparkConf.get("spark.redis.properties.maxJitter").toInt,
88-
pipelineSize = sparkConf.get("spark.redis.properties.pipelineSize").toInt
89-
)
90-
91-
val isClusterMode = checkIfInClusterMode(endpoint)
92-
9394
dataToStore.foreachPartition { partition: Iterator[Row] =>
9495
java.security.Security.setProperty("networkaddress.cache.ttl", "3");
9596
java.security.Security.setProperty("networkaddress.cache.negative.ttl", "0");

0 commit comments

Comments
 (0)