Skip to content

Commit 17edb99

Browse files
pyalexoavdeev
andauthored
Python UDF in Ingestion being used for feature validation (#1234)
* first draft Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * restore udf in ingestion job Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * it tests for pandas udf Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * e2e test Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * skip udf build Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * $skipITs Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * $skipITs Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * add reporting to scalatest Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * platform specific libs path Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * continue on error Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * build pylibs for all platforms Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * build pylibs for all platforms Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * pull python path from env Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * freeze dataproc python version Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * use python from config Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * add pyarrow to package Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * drop dataclasses to support 3.6 Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * support python 3.6 Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * set spark.yarn.isPython=true for emr Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * pass isPython through spark-submit conf Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * some cleanup Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * lint-java Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * Update infra/scripts/build-ingestion-py-dependencies.sh Co-authored-by: Oleg Avdeev <oleg.v.avdeev@gmail.com> Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * revert batch sources fix Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> Co-authored-by: Oleg Avdeev <oleg.v.avdeev@gmail.com>
1 parent 4928df8 commit 17edb99

38 files changed

+965
-106
lines changed

.github/workflows/complete.yml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,10 @@ jobs:
128128
java-version: '11'
129129
java-package: jdk
130130
architecture: x64
131+
- uses: actions/setup-python@v2
132+
with:
133+
python-version: '3.6'
134+
architecture: 'x64'
131135
- uses: actions/cache@v2
132136
with:
133137
path: ~/.m2/repository
@@ -136,6 +140,13 @@ jobs:
136140
${{ runner.os }}-it-maven-
137141
- name: Run integration tests
138142
run: make test-java-integration
143+
- name: Save report
144+
uses: actions/upload-artifact@v2
145+
if: failure()
146+
with:
147+
name: it-report
148+
path: spark/ingestion/target/test-reports/TestSuite.txt
149+
retention-days: 5
139150

140151
tests-docker-compose:
141152
needs:

.github/workflows/master_only.yml

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,4 +76,31 @@ jobs:
7676
VERSION=${RELEASE_VERSION:1}
7777
make build-java-no-tests REVISION=${VERSION}
7878
gsutil cp ./spark/ingestion/target/feast-ingestion-spark-${VERSION}.jar gs://${PUBLISH_BUCKET}/spark/ingestion/
79-
fi
79+
fi
80+
81+
publish-ingestion-pylibs:
82+
strategy:
83+
matrix:
84+
os: [ ubuntu-latest, macos-latest ]
85+
python-version: [ 3.6, 3.7, 3.8 ]
86+
env:
87+
PUBLISH_BUCKET: feast-jobs
88+
runs-on: ${{ matrix.os }}
89+
steps:
90+
- uses: actions/checkout@v2
91+
- uses: GoogleCloudPlatform/github-actions/setup-gcloud@master
92+
with:
93+
version: '290.0.1'
94+
export_default_credentials: true
95+
project_id: ${{ secrets.GCP_PROJECT_ID }}
96+
service_account_key: ${{ secrets.GCP_SA_KEY }}
97+
- name: Set up Python
98+
uses: actions/setup-python@v2
99+
with:
100+
python-version: ${{ matrix.python-version }}
101+
- name: Create libs archive
102+
env:
103+
PY_VERSION: ${{ matrix.python-version }}
104+
run: |
105+
export PLATFORM=$(python -c 'import platform; print(platform.system().lower())')
106+
./infra/scripts/build-ingestion-py-dependencies.sh "py${PY_VERSION}-$PLATFORM" gs://${PUBLISH_BUCKET}/spark/validation/

Makefile

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,19 +45,19 @@ lint-java:
4545
${MVN} --no-transfer-progress spotless:check
4646

4747
test-java:
48-
${MVN} --no-transfer-progress test
48+
${MVN} --no-transfer-progress -DskipITs=true test
4949

5050
test-java-integration:
5151
${MVN} --no-transfer-progress -Dmaven.javadoc.skip=true -Dgpg.skip -DskipUTs=true clean verify
5252

5353
test-java-with-coverage:
54-
${MVN} --no-transfer-progress test jacoco:report-aggregate
54+
${MVN} --no-transfer-progress -DskipITs=true test jacoco:report-aggregate
5555

5656
build-java:
5757
${MVN} clean verify
5858

5959
build-java-no-tests:
60-
${MVN} --no-transfer-progress -Dmaven.javadoc.skip=true -Dgpg.skip -DskipUTs=true -Drevision=${REVISION} clean package
60+
${MVN} --no-transfer-progress -Dmaven.javadoc.skip=true -Dgpg.skip -DskipUTs=true -DskipITs=true -Drevision=${REVISION} clean package
6161

6262
# Python SDK
6363

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
#!/usr/bin/env bash
2+
set -euo pipefail
3+
PLATFORM=$1
4+
DESTINATION=$2
5+
PACKAGES=${PACKAGES:-"great-expectations==0.13.2 pyarrow==2.0.0"}
6+
7+
tmp_dir=$(mktemp -d)
8+
9+
pip3 install -t ${tmp_dir}/libs $PACKAGES
10+
11+
cd $tmp_dir
12+
tar -czf pylibs-ge-$PLATFORM.tar.gz libs/
13+
if [[ $DESTINATION == gs* ]]; then
14+
gsutil cp pylibs-ge-$PLATFORM.tar.gz $GS_DESTINATION
15+
else
16+
mv pylibs-ge-$PLATFORM.tar.gz $DESTINATION
17+
fi

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@
105105
<parent.basedir>${maven.multiModuleProjectDirectory}</parent.basedir>
106106

107107
<skipUTs>false</skipUTs>
108+
<skipITs>false</skipITs>
108109
<feast.auth.providers.http.client.package.name>feast.common.auth.providers.http.client</feast.auth.providers.http.client.package.name>
109110
</properties>
110111

sdk/python/feast/constants.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,10 @@ class ConfigOptions(metaclass=ConfigMeta):
222222
#: https://github.com/gojekfarm/stencil
223223
STENCIL_URL: str = ""
224224

225+
#: If set to true rows that do not pass custom validation (see feast.contrib.validation)
226+
#: won't be saved to Online Storage
227+
INGESTION_DROP_INVALID_ROWS = "False"
228+
225229
#: EMR cluster to run Feast Spark Jobs in
226230
EMR_CLUSTER_ID: Optional[str] = None
227231

sdk/python/feast/contrib/__init__.py

Whitespace-only changes.

sdk/python/feast/contrib/validation/__init__.py

Whitespace-only changes.
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import io
2+
3+
try:
4+
from pyspark import cloudpickle
5+
except ImportError:
6+
raise ImportError("pyspark must be installed to enable validation functionality")
7+
8+
9+
def serialize_udf(fun, return_type) -> bytes:
10+
buffer = io.BytesIO()
11+
command = (fun, return_type)
12+
cloudpickle.dump(command, buffer)
13+
return buffer.getvalue()
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
import io
2+
import json
3+
from typing import TYPE_CHECKING
4+
from urllib.parse import urlparse
5+
6+
import pandas as pd
7+
8+
from feast.constants import ConfigOptions
9+
from feast.contrib.validation.base import serialize_udf
10+
from feast.staging.storage_client import get_staging_client
11+
12+
try:
13+
from great_expectations.core import ExpectationSuite
14+
from great_expectations.dataset import PandasDataset
15+
except ImportError:
16+
raise ImportError(
17+
"great_expectations must be installed to enable validation functionality. "
18+
"Please install feast[validation]"
19+
)
20+
21+
try:
22+
from pyspark.sql.types import BooleanType
23+
except ImportError:
24+
raise ImportError(
25+
"pyspark must be installed to enable validation functionality. "
26+
"Please install feast[validation]"
27+
)
28+
29+
30+
if TYPE_CHECKING:
31+
from feast import Client, FeatureTable
32+
33+
34+
GE_PACKED_ARCHIVE = "https://storage.googleapis.com/feast-jobs/spark/validation/pylibs-ge-%(platform)s.tar.gz"
35+
_UNSET = object()
36+
37+
38+
class ValidationUDF:
39+
def __init__(self, name: str, pickled_code: bytes):
40+
self.name = name
41+
self.pickled_code = pickled_code
42+
43+
44+
def create_validation_udf(name: str, expectations: ExpectationSuite) -> ValidationUDF:
45+
"""
46+
Wraps your expectations into Spark UDF.
47+
48+
Expectations should be generated & validated using training dataset:
49+
>>> from great_expectations.dataset import PandasDataset
50+
>>> ds = PandasDataset.from_dataset(you_training_df)
51+
>>> ds.expect_column_values_to_be_between('column', 0, 100)
52+
53+
>>> expectations = ds.get_expectation_suite()
54+
55+
Important: you expectations should pass on training dataset, only successful checks
56+
will be converted and stored in ExpectationSuite.
57+
58+
Now you can create UDF that will validate data during ingestion:
59+
>>> create_validation_udf("myValidation", expectations)
60+
61+
:param name
62+
:param expectations: collection of expectation gathered on training dataset
63+
:return: ValidationUDF with serialized code
64+
"""
65+
66+
def udf(df: pd.DataFrame) -> pd.Series:
67+
ds = PandasDataset.from_dataset(df)
68+
result = ds.validate(expectations, result_format="COMPLETE")
69+
valid_rows = pd.Series([True] * df.shape[0])
70+
71+
for check in result.results:
72+
if check.success:
73+
continue
74+
75+
if check.exception_info["raised_exception"]:
76+
# ToDo: probably we should mark all rows as invalid
77+
continue
78+
79+
valid_rows.iloc[check.result["unexpected_index_list"]] = False
80+
81+
return valid_rows
82+
83+
pickled_code = serialize_udf(udf, BooleanType())
84+
return ValidationUDF(name, pickled_code)
85+
86+
87+
def apply_validation(
88+
client: "Client",
89+
feature_table: "FeatureTable",
90+
udf: ValidationUDF,
91+
validation_window_secs: int,
92+
include_py_libs=_UNSET,
93+
):
94+
"""
95+
Uploads validation udf code to staging location &
96+
stores path to udf code and required python libraries as FeatureTable labels.
97+
"""
98+
include_py_libs = (
99+
include_py_libs if include_py_libs is not _UNSET else GE_PACKED_ARCHIVE
100+
)
101+
102+
staging_location = client._config.get(ConfigOptions.SPARK_STAGING_LOCATION).rstrip(
103+
"/"
104+
)
105+
staging_scheme = urlparse(staging_location).scheme
106+
staging_client = get_staging_client(staging_scheme)
107+
108+
pickled_code_fp = io.BytesIO(udf.pickled_code)
109+
remote_path = f"{staging_location}/udfs/{udf.name}.pickle"
110+
staging_client.upload_fileobj(
111+
pickled_code_fp, f"{udf.name}.pickle", remote_uri=urlparse(remote_path)
112+
)
113+
114+
feature_table.labels.update(
115+
{
116+
"_validation": json.dumps(
117+
dict(
118+
name=udf.name,
119+
pickled_code_path=remote_path,
120+
include_archive_path=include_py_libs,
121+
)
122+
),
123+
"_streaming_trigger_secs": str(validation_window_secs),
124+
}
125+
)
126+
client.apply_feature_table(feature_table)

0 commit comments

Comments
 (0)