Skip to content

Commit 501e27b

Browse files
committed
Instantiate Jedis client lazily and only once per JVM process
Signed-off-by: khorshuheng <khor.heng@gojek.com>
1 parent 311db46 commit 501e27b

File tree

1 file changed

+6
-7
lines changed

1 file changed

+6
-7
lines changed

spark/ingestion/src/main/scala/feast/ingestion/stores/redis/RedisSinkRelation.scala

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,12 @@ class RedisSinkRelation(override val sqlContext: SQLContext, config: SparkRedisC
6767

6868
lazy val isClusterMode: Boolean = checkIfInClusterMode(endpoint)
6969

70+
lazy val pipelineProvider: PipelineProvider = if (isClusterMode) {
71+
ClusterPipelineProvider(endpoint)
72+
} else {
73+
SingleNodePipelineProvider(newJedisClient(endpoint))
74+
}
75+
7076
def newJedisClient(endpoint: RedisEndpoint): Jedis = {
7177
val jedis = new Jedis(endpoint.host, endpoint.port)
7278
if (endpoint.password.nonEmpty) {
@@ -95,12 +101,6 @@ class RedisSinkRelation(override val sqlContext: SQLContext, config: SparkRedisC
95101
java.security.Security.setProperty("networkaddress.cache.ttl", "3");
96102
java.security.Security.setProperty("networkaddress.cache.negative.ttl", "0");
97103

98-
val pipelineProvider = if (isClusterMode) {
99-
ClusterPipelineProvider(endpoint)
100-
} else {
101-
SingleNodePipelineProvider(newJedisClient(endpoint))
102-
}
103-
104104
// grouped iterator to only allocate memory for a portion of rows
105105
partition.grouped(properties.pipelineSize).foreach { batch =>
106106
// group by key and keep only latest row per each key
@@ -146,7 +146,6 @@ class RedisSinkRelation(override val sqlContext: SQLContext, config: SparkRedisC
146146
}
147147
writePipeline.close()
148148
}
149-
pipelineProvider.close()
150149
}
151150
dataToStore.unpersist()
152151
}

0 commit comments

Comments
 (0)