Skip to content

Commit 311db46

Browse files
authored
implement data validation scala to avoid python dependencies (#148)
* implement data validation scala to avoid python dependencies Signed-off-by: KeshavSharma <keshav.sharma@gojek.com> * fix formatting Signed-off-by: KeshavSharma <keshav.sharma@gojek.com> * reuse spark session from existing class Signed-off-by: KeshavSharma <keshav.sharma@gojek.com> * dont parse expectation config in rowvalidator Signed-off-by: KeshavSharma <keshav.sharma@gojek.com> Co-authored-by: KeshavSharma <keshav.sharma@gojek.com>
1 parent 141a312 commit 311db46

File tree

6 files changed

+479
-9
lines changed

6 files changed

+479
-9
lines changed

spark/ingestion/src/main/scala/feast/ingestion/BatchPipeline.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,9 @@ object BatchPipeline extends BasePipeline {
4040
config: IngestionJobConfig
4141
): Option[StreamingQuery] = {
4242
val featureTable = config.featureTable
43-
val rowValidator = new RowValidator(featureTable, config.source.eventTimestampColumn)
44-
val metrics = new IngestionPipelineMetrics
43+
val rowValidator =
44+
new RowValidator(featureTable, config.source.eventTimestampColumn, config.expectationSpec)
45+
val metrics = new IngestionPipelineMetrics
4546

4647
val input = config.source match {
4748
case source: BQSource =>

spark/ingestion/src/main/scala/feast/ingestion/IngestionJob.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,9 @@ object IngestionJob {
6767
featureTable = ft,
6868
streamingTriggeringSecs = ft.labels.getOrElse("_streaming_trigger_secs", "0").toInt,
6969
validationConfig =
70-
ft.labels.get("_validation").map(parseJSON(_).camelizeKeys.extract[ValidationConfig])
70+
ft.labels.get("_validation").map(parseJSON(_).camelizeKeys.extract[ValidationConfig]),
71+
expectationSpec =
72+
ft.labels.get("_expectations").map(parseJSON(_).camelizeKeys.extract[ExpectationSpec])
7173
)
7274
})
7375
.required()

spark/ingestion/src/main/scala/feast/ingestion/IngestionJobConfig.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,15 @@ case class ValidationConfig(
120120
includeArchivePath: String
121121
)
122122

123+
case class ExpectationSpec(
124+
expectations: List[Expectation]
125+
)
126+
127+
case class Expectation(
128+
expectationType: String,
129+
kwargs: Map[String, String]
130+
)
131+
123132
case class IngestionJobConfig(
124133
mode: Modes = Modes.Offline,
125134
featureTable: FeatureTable = null,
@@ -132,6 +141,7 @@ case class IngestionJobConfig(
132141
stencilURL: Option[String] = None,
133142
streamingTriggeringSecs: Int = 0,
134143
validationConfig: Option[ValidationConfig] = None,
144+
expectationSpec: Option[ExpectationSpec] = None,
135145
doNotIngestInvalidRows: Boolean = false,
136146
checkpointPath: Option[String] = None
137147
)

spark/ingestion/src/main/scala/feast/ingestion/StreamingPipeline.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,9 @@ object StreamingPipeline extends BasePipeline with Serializable {
5454
): Option[StreamingQuery] = {
5555
import sparkSession.implicits._
5656

57-
val featureTable = config.featureTable
58-
val rowValidator = new RowValidator(featureTable, config.source.eventTimestampColumn)
57+
val featureTable = config.featureTable
58+
val rowValidator =
59+
new RowValidator(featureTable, config.source.eventTimestampColumn, config.expectationSpec)
5960
val metrics = new IngestionPipelineMetrics
6061
val streamingMetrics = new StreamingMetrics
6162

spark/ingestion/src/main/scala/feast/ingestion/validation/RowValidator.scala

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,16 @@
1616
*/
1717
package feast.ingestion.validation
1818

19-
import feast.ingestion.FeatureTable
19+
import feast.ingestion.{FeatureTable, ExpectationSpec, Expectation}
2020
import org.apache.spark.sql.Column
21-
import org.apache.spark.sql.functions.col
21+
import org.apache.spark.sql.functions.{col, lit}
22+
23+
class RowValidator(
24+
featureTable: FeatureTable,
25+
timestampColumn: String,
26+
expectationSpec: Option[ExpectationSpec]
27+
) extends Serializable {
2228

23-
class RowValidator(featureTable: FeatureTable, timestampColumn: String) extends Serializable {
2429
def allEntitiesPresent: Column =
2530
featureTable.entities.map(e => col(e.name).isNotNull).reduce(_.&&(_))
2631

@@ -30,6 +35,36 @@ class RowValidator(featureTable: FeatureTable, timestampColumn: String) extends
3035
def timestampPresent: Column =
3136
col(timestampColumn).isNotNull
3237

38+
def expectColumnValuesToBeBetween(expectation: Expectation): Column = {
39+
val minValue: Option[String] = expectation.kwargs.get("minValue")
40+
val maxValue: Option[String] = expectation.kwargs.get("maxValue")
41+
42+
(minValue, maxValue) match {
43+
case (Some(min), Some(max)) => col(expectation.kwargs("column")).between(min, max)
44+
case (Some(min), None) => col(expectation.kwargs("column")).>=(min)
45+
case (None, Some(max)) => col(expectation.kwargs("column")).<=(max)
46+
case _ => lit(true)
47+
}
48+
}
49+
50+
def validate(expectation: Expectation): Column = {
51+
expectation.expectationType match {
52+
case "expect_column_values_to_not_be_null" => col(expectation.kwargs("column")).isNotNull
53+
case "expect_column_values_to_be_between" => expectColumnValuesToBeBetween(expectation)
54+
case _ => lit(true)
55+
}
56+
}
57+
58+
def validationChecks: Column = {
59+
60+
expectationSpec match {
61+
case Some(value) if value.expectations.isEmpty => lit(true)
62+
case Some(value) =>
63+
value.expectations.map(expectation => validate(expectation)).reduce(_.&&(_))
64+
case None => lit(true)
65+
}
66+
}
67+
3368
def allChecks: Column =
34-
allEntitiesPresent && timestampPresent
69+
allEntitiesPresent && timestampPresent && validationChecks
3570
}

0 commit comments

Comments
 (0)