Skip to content

Commit 6d7f6e5

Browse files
pyalexadchia
andauthored
Validating historical features against reference dataset with "great expectations" profiler (feast-dev#2243)
* Validating historical features against reference dataset Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * fixes after rebase Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * fixes after rebase Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * update ci requirements Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * some reverts Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * format Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * move ValidationReference proto message Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * grammar in docs Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * Update docs/reference/dqm.md Co-authored-by: Danny Chiao <d.chiao@gmail.com> Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * rebase & resolve conflicts Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * remove RetrievalJobWithValidation Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * Update docs/reference/dqm.md Co-authored-by: Danny Chiao <d.chiao@gmail.com> Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * fix type hint Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * fix function flow Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * more docstrings Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * update docs Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * improve docs Signed-off-by: pyalex <moskalenko.alexey@gmail.com> Co-authored-by: Danny Chiao <d.chiao@gmail.com>
1 parent e2bbbfd commit 6d7f6e5

19 files changed

Lines changed: 1358 additions & 120 deletions

File tree

docs/reference/dqm.md

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
# Data Quality Monitoring
2+
3+
Data Quality Monitoring (DQM) is a Feast module aimed to help users to validate their data with the user-curated set of rules.
4+
Validation could be applied during:
5+
* Historical retrieval (training dataset generation)
6+
* [planned] Writing features into an online store
7+
* [planned] Reading features from an online store
8+
9+
Its goal is to address several complex data problems, namely:
10+
* Data consistency - new training datasets can be significantly different from previous datasets. This might require a change in model architecture.
11+
* Issues/bugs in the upstream pipeline - bugs in upstream pipelines can cause invalid values to overwrite existing valid values in an online store.
12+
* Training/serving skew - distribution shift could significantly decrease the performance of the model.
13+
14+
> To monitor data quality, we check that the characteristics of the tested dataset (aka the tested dataset's profile) are "equivalent" to the characteristics of the reference dataset.
15+
> How exactly profile equivalency should be measured is up to the user.
16+
17+
### Overview
18+
19+
The validation process consists of the following steps:
20+
1. User prepares reference dataset (currently only [saved datasets](../getting-started/concepts/dataset.md) from historical retrieval are supported).
21+
2. User defines profiler function, which should produce profile by given dataset (currently only profilers based on [Great Expectations](https://docs.greatexpectations.io) are allowed).
22+
3. Validation of tested dataset is performed with reference dataset and profiler provided as parameters.
23+
24+
### Preparations
25+
Feast with Great Expectations support can be installed via
26+
```shell
27+
pip install 'feast[ge]'
28+
```
29+
30+
### Dataset profile
31+
Currently, Feast supports only [Great Expectation's](https://greatexpectations.io/) [ExpectationSuite](https://legacy.docs.greatexpectations.io/en/latest/autoapi/great_expectations/core/expectation_suite/index.html#great_expectations.core.expectation_suite.ExpectationSuite)
32+
as dataset's profile. Hence, the user needs to define a function (profiler) that would receive a dataset and return an [ExpectationSuite](https://legacy.docs.greatexpectations.io/en/latest/autoapi/great_expectations/core/expectation_suite/index.html#great_expectations.core.expectation_suite.ExpectationSuite).
33+
34+
Great Expectations supports automatic profiling as well as manually specifying expectations:
35+
```python
36+
from great_expectations.dataset import Dataset
37+
from great_expectations.core.expectation_suite import ExpectationSuite
38+
39+
from feast.dqm.profilers.ge_profiler import ge_profiler
40+
41+
@ge_profiler
42+
def automatic_profiler(dataset: Dataset) -> ExpectationSuite:
43+
from great_expectations.profile.user_configurable_profiler import UserConfigurableProfiler
44+
45+
return UserConfigurableProfiler(
46+
profile_dataset=dataset,
47+
ignored_columns=['conv_rate'],
48+
value_set_threshold='few'
49+
).build_suite()
50+
```
51+
However, from our experience capabilities of automatic profiler are quite limited. So we would recommend crafting your own expectations:
52+
```python
53+
@ge_profiler
54+
def manual_profiler(dataset: Dataset) -> ExpectationSuite:
55+
dataset.expect_column_max_to_be_between("column", 1, 2)
56+
return dataset.get_expectation_suite()
57+
```
58+
59+
60+
61+
### Validating Training Dataset
62+
During retrieval of historical features, `validation_reference` can be passed as a parameter to methods `.to_df(validation_reference=...)` or `.to_arrow(validation_reference=...)` of RetrievalJob.
63+
If parameter is provided Feast will run validation once dataset is materialized. In case if validation successful materialized dataset is returned.
64+
Otherwise, `feast.dqm.errors.ValidationFailed` exception would be raised. It will consist of all details for expectations that didn't pass.
65+
66+
```python
67+
from feast import FeatureStore
68+
69+
fs = FeatureStore(".")
70+
71+
job = fs.get_historical_features(...)
72+
job.to_df(
73+
validation_reference=fs
74+
.get_saved_dataset("my_reference_dataset")
75+
.as_reference(profiler=manual_profiler)
76+
)
77+
```
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
//
2+
// Copyright 2021 The Feast Authors
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// https://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//
16+
17+
18+
syntax = "proto3";
19+
20+
package feast.core;
21+
option java_package = "feast.proto.core";
22+
option java_outer_classname = "ValidationProfile";
23+
option go_package = "github.com/feast-dev/feast/sdk/go/protos/feast/core";
24+
25+
import "google/protobuf/timestamp.proto";
26+
import "feast/core/SavedDataset.proto";
27+
28+
message GEValidationProfiler {
29+
message UserDefinedProfiler {
30+
// The python-syntax function body (serialized by dill)
31+
bytes body = 1;
32+
}
33+
34+
UserDefinedProfiler profiler = 1;
35+
}
36+
37+
message GEValidationProfile {
38+
// JSON-serialized ExpectationSuite object
39+
bytes expectation_suite = 1;
40+
}
41+
42+
message ValidationReference {
43+
SavedDataset dataset = 1;
44+
45+
oneof profiler {
46+
GEValidationProfiler ge_profiler = 2;
47+
}
48+
}

sdk/python/feast/dqm/__init__.py

Whitespace-only changes.

sdk/python/feast/dqm/errors.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
from typing import TYPE_CHECKING
2+
3+
if TYPE_CHECKING:
4+
from .profilers.profiler import ValidationReport
5+
6+
7+
class ValidationFailed(Exception):
8+
def __init__(self, validation_report: "ValidationReport"):
9+
self.validation_report = validation_report
10+
11+
@property
12+
def report(self) -> "ValidationReport":
13+
return self.validation_report

sdk/python/feast/dqm/profilers/__init__.py

Whitespace-only changes.
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
import json
2+
from typing import Any, Callable, Dict, List
3+
4+
import dill
5+
import great_expectations as ge
6+
import numpy as np
7+
import pandas as pd
8+
from great_expectations.core import ExpectationSuite
9+
from great_expectations.dataset import PandasDataset
10+
from great_expectations.profile.base import ProfilerTypeMapping
11+
12+
from feast.dqm.profilers.profiler import (
13+
Profile,
14+
Profiler,
15+
ValidationError,
16+
ValidationReport,
17+
)
18+
from feast.protos.feast.core.ValidationProfile_pb2 import (
19+
GEValidationProfile as GEValidationProfileProto,
20+
)
21+
from feast.protos.feast.core.ValidationProfile_pb2 import (
22+
GEValidationProfiler as GEValidationProfilerProto,
23+
)
24+
25+
26+
def _prepare_dataset(dataset: PandasDataset) -> PandasDataset:
27+
dataset_copy = dataset.copy(deep=True)
28+
29+
for column in dataset.columns:
30+
if dataset.expect_column_values_to_be_in_type_list(
31+
column, type_list=sorted(list(ProfilerTypeMapping.DATETIME_TYPE_NAMES))
32+
).success:
33+
# GE cannot parse Timestamp or other pandas datetime time
34+
dataset_copy[column] = dataset[column].dt.strftime("%Y-%m-%dT%H:%M:%S")
35+
36+
if dataset[column].dtype == np.float32:
37+
# GE converts expectation arguments into native Python float
38+
# This could cause error on comparison => so better to convert to double prematurely
39+
dataset_copy[column] = dataset[column].astype(np.float64)
40+
41+
return dataset_copy
42+
43+
44+
class GEProfile(Profile):
45+
"""
46+
GEProfile is an implementation of abstract Profile for integration with Great Expectations.
47+
It executes validation by applying expectations from ExpectationSuite instance to a given dataset.
48+
"""
49+
50+
expectation_suite: ExpectationSuite
51+
52+
def __init__(self, expectation_suite: ExpectationSuite):
53+
self.expectation_suite = expectation_suite
54+
55+
def validate(self, df: pd.DataFrame) -> "GEValidationReport":
56+
"""
57+
Validate provided dataframe against GE expectation suite.
58+
1. Pandas dataframe is converted into PandasDataset (GE type)
59+
2. Some fixes applied to the data to avoid crashes inside GE (see _prepare_dataset)
60+
3. Each expectation from ExpectationSuite instance tested against resulting dataset
61+
62+
Return GEValidationReport, which parses great expectation's schema into list of generic ValidationErrors.
63+
"""
64+
dataset = PandasDataset(df)
65+
66+
dataset = _prepare_dataset(dataset)
67+
68+
results = ge.validate(
69+
dataset, expectation_suite=self.expectation_suite, result_format="COMPLETE"
70+
)
71+
return GEValidationReport(results)
72+
73+
def to_proto(self):
74+
return GEValidationProfileProto(
75+
expectation_suite=json.dumps(self.expectation_suite.to_json_dict()).encode()
76+
)
77+
78+
@classmethod
79+
def from_proto(cls, proto: GEValidationProfileProto) -> "GEProfile":
80+
return GEProfile(
81+
expectation_suite=ExpectationSuite(**json.loads(proto.expectation_suite))
82+
)
83+
84+
def __repr__(self):
85+
expectations = json.dumps(
86+
[e.to_json_dict() for e in self.expectation_suite.expectations], indent=2
87+
)
88+
return f"<GEProfile with expectations: {expectations}>"
89+
90+
91+
class GEProfiler(Profiler):
92+
"""
93+
GEProfiler is an implementation of abstract Profiler for integration with Great Expectations.
94+
It wraps around user defined profiler that should accept dataset (in a form of pandas dataframe)
95+
and return ExpectationSuite.
96+
"""
97+
98+
def __init__(
99+
self, user_defined_profiler: Callable[[pd.DataFrame], ExpectationSuite]
100+
):
101+
self.user_defined_profiler = user_defined_profiler
102+
103+
def analyze_dataset(self, df: pd.DataFrame) -> Profile:
104+
"""
105+
Generate GEProfile with ExpectationSuite (set of expectations)
106+
from a given pandas dataframe by applying user defined profiler.
107+
108+
Some fixes are also applied to the dataset (see _prepare_dataset function) to make it compatible with GE.
109+
110+
Return GEProfile
111+
"""
112+
dataset = PandasDataset(df)
113+
114+
dataset = _prepare_dataset(dataset)
115+
116+
return GEProfile(expectation_suite=self.user_defined_profiler(dataset))
117+
118+
def to_proto(self):
119+
return GEValidationProfilerProto(
120+
profiler=GEValidationProfilerProto.UserDefinedProfiler(
121+
body=dill.dumps(self.user_defined_profiler, recurse=True)
122+
)
123+
)
124+
125+
@classmethod
126+
def from_proto(cls, proto: GEValidationProfilerProto) -> "GEProfiler":
127+
return GEProfiler(user_defined_profiler=dill.loads(proto.profiler.body))
128+
129+
130+
class GEValidationReport(ValidationReport):
131+
def __init__(self, validation_result: Dict[Any, Any]):
132+
self._validation_result = validation_result
133+
134+
@property
135+
def is_success(self) -> bool:
136+
return self._validation_result["success"]
137+
138+
@property
139+
def errors(self) -> List["ValidationError"]:
140+
return [
141+
ValidationError(
142+
check_name=res.expectation_config.expectation_type,
143+
column_name=res.expectation_config.kwargs["column"],
144+
check_config=res.expectation_config.kwargs,
145+
missing_count=res["result"].get("missing_count"),
146+
missing_percent=res["result"].get("missing_percent"),
147+
)
148+
for res in self._validation_result["results"]
149+
if not res["success"]
150+
]
151+
152+
def __repr__(self):
153+
failed_expectations = [
154+
res.to_json_dict()
155+
for res in self._validation_result["results"]
156+
if not res["success"]
157+
]
158+
return json.dumps(failed_expectations, indent=2)
159+
160+
161+
def ge_profiler(func):
162+
return GEProfiler(user_defined_profiler=func)
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
import abc
2+
from typing import Any, List, Optional
3+
4+
import pandas as pd
5+
6+
7+
class Profile:
8+
@abc.abstractmethod
9+
def validate(self, dataset: pd.DataFrame) -> "ValidationReport":
10+
"""
11+
Run set of rules / expectations from current profile against given dataset.
12+
13+
Return ValidationReport
14+
"""
15+
...
16+
17+
@abc.abstractmethod
18+
def to_proto(self):
19+
...
20+
21+
@classmethod
22+
@abc.abstractmethod
23+
def from_proto(cls, proto) -> "Profile":
24+
...
25+
26+
27+
class Profiler:
28+
@abc.abstractmethod
29+
def analyze_dataset(self, dataset: pd.DataFrame) -> Profile:
30+
"""
31+
Generate Profile object with dataset's characteristics (with rules / expectations)
32+
from given dataset (as pandas dataframe).
33+
"""
34+
...
35+
36+
@abc.abstractmethod
37+
def to_proto(self):
38+
...
39+
40+
@classmethod
41+
@abc.abstractmethod
42+
def from_proto(cls, proto) -> "Profiler":
43+
...
44+
45+
46+
class ValidationReport:
47+
@property
48+
@abc.abstractmethod
49+
def is_success(self) -> bool:
50+
"""
51+
Return whether validation was successful
52+
"""
53+
...
54+
55+
@property
56+
@abc.abstractmethod
57+
def errors(self) -> List["ValidationError"]:
58+
"""
59+
Return list of ValidationErrors if validation failed (is_success = false)
60+
"""
61+
...
62+
63+
64+
class ValidationError:
65+
check_name: str
66+
column_name: str
67+
68+
check_config: Optional[Any]
69+
70+
missing_count: Optional[int]
71+
missing_percent: Optional[float]
72+
73+
def __init__(
74+
self,
75+
check_name: str,
76+
column_name: str,
77+
check_config: Optional[Any] = None,
78+
missing_count: Optional[int] = None,
79+
missing_percent: Optional[float] = None,
80+
):
81+
self.check_name = check_name
82+
self.column_name = column_name
83+
self.check_config = check_config
84+
self.missing_count = missing_count
85+
self.missing_percent = missing_percent
86+
87+
def __repr__(self):
88+
return f"<ValidationError {self.check_name}:{self.column_name}>"

0 commit comments

Comments
 (0)