From 57c16cf4e47e24d0e3f8e4e8b0c5ba90774c8983 Mon Sep 17 00:00:00 2001 From: khorshuheng Date: Tue, 21 Jun 2022 16:07:27 +0800 Subject: [PATCH] Refactor validation spec Signed-off-by: khorshuheng --- .../feast/ingestion/IngestionJobConfig.scala | 6 +- .../ingestion/validation/Expectation.scala | 79 +++++++++++++++++++ .../ingestion/validation/RowValidator.scala | 24 +----- 3 files changed, 82 insertions(+), 27 deletions(-) create mode 100644 spark/ingestion/src/main/scala/feast/ingestion/validation/Expectation.scala diff --git a/spark/ingestion/src/main/scala/feast/ingestion/IngestionJobConfig.scala b/spark/ingestion/src/main/scala/feast/ingestion/IngestionJobConfig.scala index 6ae9d99..52faddc 100644 --- a/spark/ingestion/src/main/scala/feast/ingestion/IngestionJobConfig.scala +++ b/spark/ingestion/src/main/scala/feast/ingestion/IngestionJobConfig.scala @@ -17,6 +17,7 @@ package feast.ingestion import feast.ingestion.Modes.Modes +import feast.ingestion.validation.Expectation import org.joda.time.DateTime object Modes extends Enumeration { @@ -124,11 +125,6 @@ case class ExpectationSpec( expectations: List[Expectation] ) -case class Expectation( - expectationType: String, - kwargs: Map[String, String] -) - case class IngestionJobConfig( mode: Modes = Modes.Offline, featureTable: FeatureTable = null, diff --git a/spark/ingestion/src/main/scala/feast/ingestion/validation/Expectation.scala b/spark/ingestion/src/main/scala/feast/ingestion/validation/Expectation.scala new file mode 100644 index 0000000..3f5709b --- /dev/null +++ b/spark/ingestion/src/main/scala/feast/ingestion/validation/Expectation.scala @@ -0,0 +1,79 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2022 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.validation + +import org.apache.spark.sql.Column +import org.apache.spark.sql.functions.{col, lit} +import org.json4s.{CustomSerializer, DefaultFormats, Extraction, Formats, JObject, JValue} + +trait Expectation { + + def validate: Column +} + +case class ExpectColumnValuesToNotBeNull(columnName: String) extends Expectation { + override def validate: Column = col(columnName).isNotNull +} + +case class ExpectColumnValuesToBeBetween( + columnName: String, + minValue: Option[Int], + maxValue: Option[Int] +) extends Expectation { + override def validate: Column = { + (minValue, maxValue) match { + case (Some(min), Some(max)) => col(columnName).between(min, max) + case (Some(min), None) => col(columnName).>=(min) + case (None, Some(max)) => col(columnName).<=(max) + case _ => lit(true) + } + } +} + +object Expectation { + implicit val format: Formats = DefaultFormats + + def extractColumn(kwargs: JValue): String = { + (kwargs \ "column").extract[String] + } + + def apply(expectationType: String, kwargs: JValue): Expectation = { + expectationType match { + case "expect_column_values_to_not_be_null" => + ExpectColumnValuesToNotBeNull(extractColumn(kwargs)) + case "expect_column_values_to_be_between" => + val column = extractColumn(kwargs) + val minValue = (kwargs \ "minValue").toSome.map(_.extract[Int]) + val maxValue = (kwargs \ "maxValue").toSome.map(_.extract[Int]) + ExpectColumnValuesToBeBetween(column, minValue, maxValue) + } + } +} + +object ExpectationCodec + extends CustomSerializer[Expectation](implicit format => + ( + { case x: JObject => + val eType: String = (x \ "expectationType").extract[String] + val kwargs: JValue = (x \ "kwargs") + Expectation(eType, kwargs) + }, + { case x: Expectation => + Extraction.decompose(x) + } + ) + ) diff --git a/spark/ingestion/src/main/scala/feast/ingestion/validation/RowValidator.scala b/spark/ingestion/src/main/scala/feast/ingestion/validation/RowValidator.scala index ba5358a..ef638dd 100644 --- a/spark/ingestion/src/main/scala/feast/ingestion/validation/RowValidator.scala +++ b/spark/ingestion/src/main/scala/feast/ingestion/validation/RowValidator.scala @@ -16,7 +16,7 @@ */ package feast.ingestion.validation -import feast.ingestion.{FeatureTable, ExpectationSpec, Expectation} +import feast.ingestion.{FeatureTable, ExpectationSpec} import org.apache.spark.sql.Column import org.apache.spark.sql.functions.{col, lit} @@ -35,32 +35,12 @@ class RowValidator( def timestampPresent: Column = col(timestampColumn).isNotNull - def expectColumnValuesToBeBetween(expectation: Expectation): Column = { - val minValue: Option[String] = expectation.kwargs.get("minValue") - val maxValue: Option[String] = expectation.kwargs.get("maxValue") - - (minValue, maxValue) match { - case (Some(min), Some(max)) => col(expectation.kwargs("column")).between(min, max) - case (Some(min), None) => col(expectation.kwargs("column")).>=(min) - case (None, Some(max)) => col(expectation.kwargs("column")).<=(max) - case _ => lit(true) - } - } - - def validate(expectation: Expectation): Column = { - expectation.expectationType match { - case "expect_column_values_to_not_be_null" => col(expectation.kwargs("column")).isNotNull - case "expect_column_values_to_be_between" => expectColumnValuesToBeBetween(expectation) - case _ => lit(true) - } - } - def validationChecks: Column = { expectationSpec match { case Some(value) if value.expectations.isEmpty => lit(true) case Some(value) => - value.expectations.map(expectation => validate(expectation)).reduce(_.&&(_)) + value.expectations.map(_.validate).reduce(_.&&(_)) case None => lit(true) } }