Skip to content

Commit dd36544

Browse files
authored
Report observed value for aggregated checks in pre-ingestion feature validation (feast-dev#1278)
* report observed value for aggregation checks Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * fix kubectl version in ci docker Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * fix kubectl version in ci docker Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * fix kubectl version in ci docker Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
1 parent 56d8959 commit dd36544

4 files changed

Lines changed: 34 additions & 22 deletions

File tree

infra/docker/ci/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ RUN PROTOC_ZIP=protoc-${PROTOC_VERSION}-linux-x86_64.zip && \
7777
RUN curl -sL https://aka.ms/InstallAzureCLIDeb | bash
7878

7979
# Install kubectl
80-
RUN apt-get install -y kubectl=1.20.1-00
80+
RUN apt-get install -y kubectl=1.20.2-00
8181

8282
# Install helm
8383
RUN curl -fsSL -o get_helm.sh https://raw.githubusercontent.com/helm/helm/master/scripts/get-helm-3 && \

sdk/python/feast/contrib/validation/ge.py

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -109,11 +109,9 @@ def udf(df: pd.DataFrame) -> pd.Series:
109109
if check.success:
110110
continue
111111

112-
unexpected_count = (
113-
check.result["unexpected_count"]
114-
if "unexpected_count" in check.result
115-
else df.shape[0]
116-
)
112+
if check.exception_info["raised_exception"]:
113+
# ToDo: probably we should mark all rows as invalid
114+
continue
117115

118116
check_kwargs = check.expectation_config.kwargs
119117
check_kwargs.pop("result_format", None)
@@ -126,21 +124,31 @@ def udf(df: pd.DataFrame) -> pd.Series:
126124
]
127125
)
128126

129-
reporter.increment(
130-
"feast_feature_validation_check_failed",
131-
value=unexpected_count,
132-
tags=[
133-
f"feature_table:{os.getenv('FEAST_INGESTION_FEATURE_TABLE', 'unknown')}",
134-
f"project:{os.getenv('FEAST_INGESTION_PROJECT_NAME', 'default')}",
135-
f"check:{check_name}",
136-
],
137-
)
138-
139-
if check.exception_info["raised_exception"]:
140-
# ToDo: probably we should mark all rows as invalid
141-
continue
127+
if "unexpected_count" in check.result:
128+
reporter.increment(
129+
"feast_feature_validation_check_failed",
130+
value=check.result["unexpected_count"],
131+
tags=[
132+
f"feature_table:{os.getenv('FEAST_INGESTION_FEATURE_TABLE', 'unknown')}",
133+
f"project:{os.getenv('FEAST_INGESTION_PROJECT_NAME', 'default')}",
134+
f"check:{check_name}",
135+
],
136+
)
142137

143-
valid_rows.iloc[check.result["unexpected_index_list"]] = False
138+
valid_rows.iloc[check.result["unexpected_index_list"]] = False
139+
140+
elif "observed_value" in check.result:
141+
reporter.increment(
142+
"feast_feature_validation_observed_value",
143+
value=int(
144+
check.result["observed_value"] * 100
145+
), # storing as decimal with precision 2
146+
tags=[
147+
f"feature_table:{os.getenv('FEAST_INGESTION_FEATURE_TABLE', 'unknown')}",
148+
f"project:{os.getenv('FEAST_INGESTION_PROJECT_NAME', 'default')}",
149+
f"check:{check_name}",
150+
],
151+
)
144152

145153
return valid_rows
146154

sdk/python/feast/job_service.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from typing import Dict, List, Tuple, cast
99

1010
import grpc
11+
from google.api_core.exceptions import FailedPrecondition
1112
from google.protobuf.timestamp_pb2 import Timestamp
1213

1314
import feast
@@ -334,7 +335,10 @@ def ensure_stream_ingestion_jobs(client: feast.Client, all_projects: bool):
334335
logging.info(
335336
f"Cancelling a stream ingestion job with job_hash={job_hash} job_id={job.get_id()} status={job.get_status()}"
336337
)
337-
job.cancel()
338+
try:
339+
job.cancel()
340+
except FailedPrecondition as exc:
341+
logging.warning(f"Job canceling failed with exception {exc}")
338342

339343
for job_hash in job_hashes_to_start:
340344
# Any job that we wish to start should be among expected table refs map

sdk/python/feast/staging/entities.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ def create_bq_view_of_joined_features_and_entities(
123123
view.view_query = JOIN_TEMPLATE.format(
124124
entities=entities_ref,
125125
source=source_ref,
126-
entity_key=",".join([f"source.{e} = entities.{e}" for e in entity_names]),
126+
entity_key=" AND ".join([f"source.{e} = entities.{e}" for e in entity_names]),
127127
)
128128
view.expires = datetime.now() + timedelta(days=1)
129129
bq_client.create_table(view)

0 commit comments

Comments
 (0)