Skip to content

Commit ed7e74f

Browse files
authored
Add Redis write properties (#125)
* Add Redis write properties Signed-off-by: Khor Shu Heng <khor.heng@gojek.com> * Fix pandas udf test Signed-off-by: Khor Shu Heng <khor.heng@gojek.com> Co-authored-by: Khor Shu Heng <khor.heng@gojek.com>
1 parent d71c460 commit ed7e74f

File tree

7 files changed

+38
-17
lines changed

7 files changed

+38
-17
lines changed

spark/ingestion/src/main/scala/feast/ingestion/BasePipeline.scala

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,17 +33,14 @@ object BasePipeline {
3333
val conf = new SparkConf()
3434

3535
jobConfig.store match {
36-
case RedisConfig(host, port, password, ssl) if password.isEmpty =>
37-
conf
38-
.set("spark.redis.host", host)
39-
.set("spark.redis.port", port.toString)
40-
.set("spark.redis.ssl", ssl.toString)
41-
case RedisConfig(host, port, password, ssl) if password.nonEmpty =>
36+
case RedisConfig(host, port, password, ssl, properties) =>
4237
conf
4338
.set("spark.redis.host", host)
4439
.set("spark.redis.port", port.toString)
4540
.set("spark.redis.password", password)
4641
.set("spark.redis.ssl", ssl.toString)
42+
.set("spark.redis.properties.maxJitter", properties.maxJitterSeconds.toString)
43+
.set("spark.redis.properties.pipelineSize", properties.pipelineSize.toString)
4744
case BigTableConfig(projectId, instanceId) =>
4845
conf
4946
.set("spark.bigtable.projectId", projectId)

spark/ingestion/src/main/scala/feast/ingestion/IngestionJobConfig.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,15 @@ object Modes extends Enumeration {
2626

2727
abstract class StoreConfig
2828

29-
case class RedisConfig(host: String, port: Int, password: String, ssl: Boolean) extends StoreConfig
30-
case class BigTableConfig(projectId: String, instanceId: String) extends StoreConfig
29+
case class RedisConfig(
30+
host: String,
31+
port: Int,
32+
password: String = "",
33+
ssl: Boolean = false,
34+
properties: RedisWriteProperties = RedisWriteProperties()
35+
) extends StoreConfig
36+
case class RedisWriteProperties(maxJitterSeconds: Int = 3600, pipelineSize: Int = 250)
37+
case class BigTableConfig(projectId: String, instanceId: String) extends StoreConfig
3138
case class CassandraConfig(
3239
connection: CassandraConnection,
3340
keyspace: String,

spark/ingestion/src/main/scala/feast/ingestion/stores/redis/RedisSinkRelation.scala

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package feast.ingestion.stores.redis
1818

1919
import java.{sql, util}
2020
import com.google.protobuf.Timestamp
21+
import feast.ingestion.RedisWriteProperties
2122
import feast.ingestion.utils.TypeConversion
2223
import org.apache.commons.codec.digest.DigestUtils
2324
import org.apache.spark.{SparkConf, SparkEnv}
@@ -82,6 +83,10 @@ class RedisSinkRelation(override val sqlContext: SQLContext, config: SparkRedisC
8283
port = sparkConf.get("spark.redis.port").toInt,
8384
password = sparkConf.get("spark.redis.password", "")
8485
)
86+
val properties = RedisWriteProperties(
87+
maxJitterSeconds = sparkConf.get("spark.redis.properties.maxJitter").toInt,
88+
pipelineSize = sparkConf.get("spark.redis.properties.pipelineSize").toInt
89+
)
8590

8691
val isClusterMode = checkIfInClusterMode(endpoint)
8792

@@ -96,7 +101,7 @@ class RedisSinkRelation(override val sqlContext: SQLContext, config: SparkRedisC
96101
}
97102

98103
// grouped iterator to only allocate memory for a portion of rows
99-
partition.grouped(config.iteratorGroupingSize).foreach { batch =>
104+
partition.grouped(properties.pipelineSize).foreach { batch =>
100105
// group by key and keep only latest row per each key
101106
val rowsWithKey: Map[String, Row] =
102107
compactRowsToLatestTimestamp(batch.map(row => dataKeyId(row) -> row)).toMap
@@ -112,7 +117,7 @@ class RedisSinkRelation(override val sqlContext: SQLContext, config: SparkRedisC
112117
val expiryTimestampByKey = keys
113118
.zip(storedValues)
114119
.map { case (key, storedValue) =>
115-
(key, newExpiryTimestamp(rowsWithKey(key), storedValue))
120+
(key, newExpiryTimestamp(rowsWithKey(key), storedValue, properties.maxJitterSeconds))
116121
}
117122
.toMap
118123

@@ -183,9 +188,14 @@ class RedisSinkRelation(override val sqlContext: SQLContext, config: SparkRedisC
183188
}
184189
}
185190

191+
def applyJitter(expiry: Long, maxJitter: Int): Long = {
192+
if (maxJitter > 0) (scala.util.Random.nextInt(maxJitter).toLong * 1000) + expiry else expiry
193+
}
194+
186195
private def newExpiryTimestamp(
187196
row: Row,
188-
value: util.Map[Array[Byte], Array[Byte]]
197+
value: util.Map[Array[Byte], Array[Byte]],
198+
maxJitterSeconds: Int
189199
): Option[java.sql.Timestamp] = {
190200
val currentMaxExpiry: Option[Long] = value.asScala.toMap
191201
.map { case (key, value) =>
@@ -205,9 +215,9 @@ class RedisSinkRelation(override val sqlContext: SQLContext, config: SparkRedisC
205215

206216
(currentMaxExpiry, rowExpiry) match {
207217
case (_, None) => None
208-
case (None, Some(expiry)) => Some(new sql.Timestamp(expiry))
218+
case (None, Some(expiry)) => Some(new sql.Timestamp(applyJitter(expiry, maxJitterSeconds)))
209219
case (Some(currentExpiry), Some(newExpiry)) =>
210-
Some(new sql.Timestamp(currentExpiry max newExpiry))
220+
Some(new sql.Timestamp(currentExpiry max applyJitter(newExpiry, maxJitterSeconds)))
211221
}
212222

213223
}

spark/ingestion/src/main/scala/feast/ingestion/stores/redis/SparkRedisConfig.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ case class SparkRedisConfig(
2121
projectName: String,
2222
entityColumns: Array[String],
2323
timestampColumn: String,
24-
iteratorGroupingSize: Int = 1000,
2524
timestampPrefix: String = "_ts",
2625
repartitionByEntity: Boolean = true,
2726
maxAge: Long = 0,

spark/ingestion/src/test/scala/feast/ingestion/BatchPipelineIT.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package feast.ingestion
1818

1919
import com.dimafeng.testcontainers.{ForAllTestContainer, GenericContainer}
20-
import com.google.protobuf.util.Timestamps
2120
import feast.ingestion.helpers.DataHelper._
2221
import feast.ingestion.helpers.RedisStorageHelper._
2322
import feast.ingestion.helpers.TestRow
@@ -32,7 +31,6 @@ import org.scalacheck._
3231
import redis.clients.jedis.Jedis
3332

3433
import java.nio.file.Paths
35-
import java.sql.Timestamp
3634
import java.time.Instant
3735
import java.time.temporal.ChronoUnit
3836
import scala.collection.JavaConverters._
@@ -52,6 +50,8 @@ class BatchPipelineIT extends SparkSpec with ForAllTestContainer {
5250
.set("spark.redis.port", container.mappedPort(6379).toString)
5351
.set("spark.redis.password", password)
5452
.set("spark.metrics.conf.*.sink.statsd.port", statsDStub.port.toString)
53+
.set("spark.redis.properties.maxJitter", "0")
54+
.set("spark.redis.properties.pipelineSize", "250")
5555

5656
trait Scope {
5757
val jedis = new Jedis("localhost", container.mappedPort(6379))
@@ -84,6 +84,8 @@ class BatchPipelineIT extends SparkSpec with ForAllTestContainer {
8484
Field("feature2", ValueType.Enum.FLOAT)
8585
)
8686
),
87+
store =
88+
RedisConfig("localhost", 6379, properties = RedisWriteProperties(maxJitterSeconds = 0)),
8789
startTime = DateTime.parse("2020-08-01"),
8890
endTime = DateTime.parse("2020-09-01"),
8991
metrics = Some(StatsDConfig(host = "localhost", port = statsDStub.port))

spark/ingestion/src/test/scala/feast/ingestion/PandasUDF.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ class PandasUDF extends SparkSpec with ForAllTestContainer {
4747
override def withSparkConfOverrides(conf: SparkConf): SparkConf = conf
4848
.set("spark.redis.host", container.host)
4949
.set("spark.redis.port", container.mappedPort(6379).toString)
50+
.set("spark.redis.properties.maxJitter", "0")
51+
.set("spark.redis.properties.pipelineSize", "250")
5052

5153
trait Scope {
5254
implicit def testRowEncoder: Encoder[TestRow] = ExpressionEncoder()

spark/ingestion/src/test/scala/feast/ingestion/StreamingPipelineIT.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ class StreamingPipelineIT extends SparkSpec with ForAllTestContainer {
5656
.set("spark.redis.host", redisContainer.host)
5757
.set("spark.redis.port", redisContainer.mappedPort(6379).toString)
5858
.set("spark.sql.streaming.checkpointLocation", generateTempPath("checkpoint"))
59+
.set("spark.redis.properties.maxJitter", "0")
60+
.set("spark.redis.properties.pipelineSize", "250")
5961

6062
trait KafkaPublisher {
6163
val props = new Properties()
@@ -91,7 +93,9 @@ class StreamingPipelineIT extends SparkSpec with ForAllTestContainer {
9193
features = Seq(
9294
Field("unique_drivers", ValueType.Enum.INT64)
9395
)
94-
)
96+
),
97+
store =
98+
RedisConfig("localhost", 6379, properties = RedisWriteProperties(maxJitterSeconds = 0))
9599
)
96100

97101
def encodeEntityKey(row: TestMessage, featureTable: FeatureTable): Array[Byte] = {

0 commit comments

Comments
 (0)