@@ -54,6 +54,19 @@ class RedisSinkRelation(override val sqlContext: SQLContext, config: SparkRedisC
5454
5555 val sparkConf : SparkConf = sqlContext.sparkContext.getConf
5656
57+ lazy val endpoint : RedisEndpoint = RedisEndpoint (
58+ host = sparkConf.get(" spark.redis.host" ),
59+ port = sparkConf.get(" spark.redis.port" ).toInt,
60+ password = sparkConf.get(" spark.redis.password" , " " )
61+ )
62+
63+ lazy val properties : RedisWriteProperties = RedisWriteProperties (
64+ maxJitterSeconds = sparkConf.get(" spark.redis.properties.maxJitter" ).toInt,
65+ pipelineSize = sparkConf.get(" spark.redis.properties.pipelineSize" ).toInt
66+ )
67+
68+ lazy val isClusterMode : Boolean = checkIfInClusterMode(endpoint)
69+
5770 def newJedisClient (endpoint : RedisEndpoint ): Jedis = {
5871 val jedis = new Jedis (endpoint.host, endpoint.port)
5972 if (endpoint.password.nonEmpty) {
@@ -78,18 +91,6 @@ class RedisSinkRelation(override val sqlContext: SQLContext, config: SparkRedisC
7891 .localCheckpoint()
7992 else data
8093
81- val endpoint = RedisEndpoint (
82- host = sparkConf.get(" spark.redis.host" ),
83- port = sparkConf.get(" spark.redis.port" ).toInt,
84- password = sparkConf.get(" spark.redis.password" , " " )
85- )
86- val properties = RedisWriteProperties (
87- maxJitterSeconds = sparkConf.get(" spark.redis.properties.maxJitter" ).toInt,
88- pipelineSize = sparkConf.get(" spark.redis.properties.pipelineSize" ).toInt
89- )
90-
91- val isClusterMode = checkIfInClusterMode(endpoint)
92-
9394 dataToStore.foreachPartition { partition : Iterator [Row ] =>
9495 java.security.Security .setProperty(" networkaddress.cache.ttl" , " 3" );
9596 java.security.Security .setProperty(" networkaddress.cache.negative.ttl" , " 0" );
0 commit comments