Skip to content

Commit 4c1cc09

Browse files
authored
In Historical Retrieval from BQ join between source & entities is performed inside BQ (#1110)
* create bq view with join query Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * import batch_source fixture Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * bq dataset fixture Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * bq dataset fixture Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * use bq dataset fixture through request Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * move bq package to standalone launcher Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * revert extra options Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * e2e fail fast Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * fix dataset id Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * fix bq source Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * serving to wait core is running Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * delete contents Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * fix staging functions Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * add bq jar Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * add created_timestamp Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * test ingestion from bq view Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * make test-integration executable Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * add tests timeout Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * fix bq staging test Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * fix test online e2e Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * format Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * wait until bq table created Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * fix verify Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * add explicit data sources to historical retrieval via JS Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * debug Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * correct dataframe ingested Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * debug Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * boundaries from original df Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * cleanup & online retrieval timeout Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * bq replacement moved to launcher Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * add passing it test to pass checks Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * add timeout to expected arguments Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * rerun client fixture on auth switch Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * enable_auth tombstone Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * update default timeout Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com> * check entities mapping Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
1 parent 2a2d2b5 commit 4c1cc09

23 files changed

Lines changed: 469 additions & 184 deletions

File tree

infra/scripts/test-end-to-end-gcp.sh

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ python -m pip install --upgrade pip setuptools wheel
1010
make install-python
1111
python -m pip install -qr tests/requirements.txt
1212

13-
su -p postgres -c "PATH=$PATH HOME=/tmp pytest tests/e2e/ \
13+
su -p postgres -c "PATH=$PATH HOME=/tmp pytest -v tests/e2e/ \
1414
--feast-version develop --env=gcloud --dataproc-cluster-name feast-e2e \
1515
--dataproc-project kf-feast --dataproc-region us-central1 \
16-
--redis-url 10.128.0.105:6379 --redis-cluster --kafka-brokers 10.128.0.103:9094"
16+
--redis-url 10.128.0.105:6379 --redis-cluster --kafka-brokers 10.128.0.103:9094 \
17+
--bq-project kf-feast"

infra/scripts/test-end-to-end.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,4 @@ python -m pip install --upgrade pip setuptools wheel
77
make install-python
88
python -m pip install -qr tests/requirements.txt
99

10-
su -p postgres -c "PATH=$PATH HOME=/tmp pytest tests/e2e/ --feast-version develop"
10+
su -p postgres -c "PATH=$PATH HOME=/tmp pytest -v tests/e2e/ --feast-version develop"

infra/scripts/test-integration.sh

100644100755
File mode changed.

protos/feast/core/JobService.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,9 @@ message GetHistoricalFeaturesRequest {
128128
// Export to AWS S3 - s3://path/to/features
129129
// Export to GCP GCS - gs://path/to/features
130130
string output_location = 4;
131+
132+
// Specify format name for output, eg. parquet
133+
string output_format = 5;
131134
}
132135

133136
message GetHistoricalFeaturesResponse {

sdk/python/feast/client.py

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,10 @@
1515
import multiprocessing
1616
import os
1717
import shutil
18-
import tempfile
1918
import uuid
2019
from datetime import datetime
2120
from itertools import groupby
2221
from typing import Any, Dict, List, Optional, Union
23-
from urllib.parse import urlparse
2422

2523
import grpc
2624
import pandas as pd
@@ -101,7 +99,11 @@
10199
GetOnlineFeaturesRequestV2,
102100
)
103101
from feast.serving.ServingService_pb2_grpc import ServingServiceStub
104-
from feast.staging.storage_client import get_staging_client
102+
from feast.staging.entities import (
103+
stage_entities_to_bq,
104+
stage_entities_to_fs,
105+
table_reference_from_string,
106+
)
105107

106108
_logger = logging.getLogger(__name__)
107109

@@ -855,6 +857,7 @@ def get_online_features(
855857
entity_rows=_infer_online_entity_rows(entity_rows),
856858
project=project if project is not None else self.project,
857859
),
860+
timeout=self._config.getint(CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY),
858861
metadata=self._get_grpc_metadata(),
859862
)
860863
except grpc.RpcError as e:
@@ -879,8 +882,11 @@ def get_historical_features(
879882
"feature_table:feature" where "feature_table" & "feature" refer to
880883
the feature and feature table names respectively.
881884
entity_source (Union[pd.DataFrame, FileSource, BigQuerySource]): Source for the entity rows.
882-
If entity_source is a Panda DataFrame, the dataframe will be exported to the staging
883-
location as parquet file. It is also assumed that the column event_timestamp is present
885+
If entity_source is a Panda DataFrame, the dataframe will be staged
886+
to become accessible by spark workers.
887+
If one of feature tables' source is in BigQuery - entities will be upload to BQ.
888+
Otherwise to remote file storage (derived from configured staging location).
889+
It is also assumed that the column event_timestamp is present
884890
in the dataframe, and is of type datetime without timezone information.
885891
886892
The user needs to make sure that the source (or staging location, if entity_source is
@@ -916,25 +922,27 @@ def get_historical_features(
916922
str(uuid.uuid4()),
917923
)
918924
output_format = self._config.get(CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_FORMAT)
925+
feature_sources = [
926+
feature_table.batch_source for feature_table in feature_tables
927+
]
919928

920929
if isinstance(entity_source, pd.DataFrame):
921-
staging_location = self._config.get(CONFIG_SPARK_STAGING_LOCATION)
922-
entity_staging_uri = urlparse(
923-
os.path.join(staging_location, str(uuid.uuid4()))
924-
)
925-
staging_client = get_staging_client(entity_staging_uri.scheme)
926-
with tempfile.NamedTemporaryFile() as df_export_path:
927-
entity_source.to_parquet(df_export_path.name)
928-
bucket = (
929-
None
930-
if entity_staging_uri.scheme == "file"
931-
else entity_staging_uri.netloc
930+
if any(isinstance(source, BigQuerySource) for source in feature_sources):
931+
first_bq_source = [
932+
source
933+
for source in feature_sources
934+
if isinstance(source, BigQuerySource)
935+
][0]
936+
source_ref = table_reference_from_string(
937+
first_bq_source.bigquery_options.table_ref
932938
)
933-
staging_client.upload_file(
934-
df_export_path.name, bucket, entity_staging_uri.path.lstrip("/")
939+
entity_source = stage_entities_to_bq(
940+
entity_source, source_ref.project, source_ref.dataset_id
935941
)
936-
entity_source = FileSource(
937-
"event_timestamp", ParquetFormat(), entity_staging_uri.geturl(),
942+
else:
943+
entity_source = stage_entities_to_fs(
944+
entity_source,
945+
staging_location=self._config.get(CONFIG_SPARK_STAGING_LOCATION),
938946
)
939947

940948
if self._use_job_service:
@@ -943,6 +951,7 @@ def get_historical_features(
943951
feature_refs=feature_refs,
944952
entity_source=entity_source.to_proto(),
945953
project=project,
954+
output_format=output_format,
946955
output_location=output_location,
947956
),
948957
**self._extra_grpc_params(),
@@ -955,11 +964,7 @@ def get_historical_features(
955964
)
956965
else:
957966
return start_historical_feature_retrieval_job(
958-
self,
959-
entity_source,
960-
feature_tables,
961-
output_format,
962-
os.path.join(output_location, str(uuid.uuid4())),
967+
self, entity_source, feature_tables, output_format, output_location,
963968
)
964969

965970
def get_historical_features_df(

sdk/python/feast/job_service.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from feast.core import JobService_pb2_grpc
88
from feast.core.JobService_pb2 import (
99
CancelJobResponse,
10+
GetHistoricalFeaturesRequest,
1011
GetHistoricalFeaturesResponse,
1112
GetJobResponse,
1213
)
@@ -20,6 +21,7 @@
2021
SparkJobStatus,
2122
StreamIngestionJob,
2223
)
24+
from feast.pyspark.launcher import start_historical_feature_retrieval_job
2325
from feast.third_party.grpc.health.v1 import HealthService_pb2_grpc
2426
from feast.third_party.grpc.health.v1.HealthService_pb2 import (
2527
HealthCheckResponse,
@@ -64,13 +66,16 @@ def StartOfflineToOnlineIngestionJob(self, request, context):
6466
context.set_details("Method not implemented!")
6567
raise NotImplementedError("Method not implemented!")
6668

67-
def GetHistoricalFeatures(self, request, context):
69+
def GetHistoricalFeatures(self, request: GetHistoricalFeaturesRequest, context):
6870
"""Produce a training dataset, return a job id that will provide a file reference"""
69-
job = self.client.get_historical_features(
70-
request.feature_refs,
71+
job = start_historical_feature_retrieval_job(
72+
client=self.client,
7173
entity_source=DataSource.from_proto(request.entity_source),
72-
project=request.project,
73-
output_location=request.output_location,
74+
feature_tables=self.client._get_feature_tables_from_feature_refs(
75+
list(request.feature_refs), request.project
76+
),
77+
output_format=request.output_format,
78+
output_path=request.output_location,
7479
)
7580

7681
output_file_uri = job.get_output_file_uri(block=False)

sdk/python/feast/pyspark/historical_feature_retrieval_job.py

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -149,25 +149,31 @@ def spark_format(self) -> str:
149149
def spark_path(self) -> str:
150150
return f"{self.project}:{self.dataset}.{self.table}"
151151

152+
@property
153+
def spark_read_options(self) -> Dict[str, str]:
154+
return {**super().spark_read_options, "viewsEnabled": "true"}
155+
152156

153157
def _source_from_dict(dct: Dict) -> Source:
154158
if "file" in dct.keys():
155159
return FileSource(
156-
FileSource.PROTO_FORMAT_TO_SPARK[dct["file"]["format"]["json_class"]],
157-
dct["file"]["path"],
158-
dct["file"]["event_timestamp_column"],
159-
dct["file"].get("created_timestamp_column"),
160-
dct["file"].get("field_mapping"),
161-
dct["file"].get("options"),
160+
format=FileSource.PROTO_FORMAT_TO_SPARK[
161+
dct["file"]["format"]["json_class"]
162+
],
163+
path=dct["file"]["path"],
164+
event_timestamp_column=dct["file"]["event_timestamp_column"],
165+
created_timestamp_column=dct["file"].get("created_timestamp_column"),
166+
field_mapping=dct["file"].get("field_mapping"),
167+
options=dct["file"].get("options"),
162168
)
163169
else:
164170
return BigQuerySource(
165-
dct["bq"]["project"],
166-
dct["bq"]["dataset"],
167-
dct["bq"]["table"],
168-
dct["bq"].get("field_mapping", {}),
169-
dct["bq"]["event_timestamp_column"],
170-
dct["bq"].get("created_timestamp_column"),
171+
project=dct["bq"]["project"],
172+
dataset=dct["bq"]["dataset"],
173+
table=dct["bq"]["table"],
174+
field_mapping=dct["bq"].get("field_mapping", {}),
175+
event_timestamp_column=dct["bq"]["event_timestamp_column"],
176+
created_timestamp_column=dct["bq"].get("created_timestamp_column"),
171177
)
172178

173179

sdk/python/feast/pyspark/launcher.py

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
StreamIngestionJob,
3636
StreamIngestionJobParameters,
3737
)
38+
from feast.staging.entities import create_bq_view_of_joined_features_and_entities
3839
from feast.staging.storage_client import get_staging_client
3940
from feast.value_type import ValueType
4041

@@ -106,7 +107,11 @@ def _source_to_argument(source: DataSource):
106107
return {"file": properties}
107108

108109
if isinstance(source, BigQuerySource):
109-
properties["table_ref"] = source.bigquery_options.table_ref
110+
project, dataset_and_table = source.bigquery_options.table_ref.split(":")
111+
dataset, table = dataset_and_table.split(".")
112+
properties["project"] = project
113+
properties["dataset"] = dataset
114+
properties["table"] = table
110115
return {"bq": properties}
111116

112117
if isinstance(source, KafkaSource):
@@ -171,13 +176,17 @@ def start_historical_feature_retrieval_job(
171176
output_path: str,
172177
) -> RetrievalJob:
173178
launcher = resolve_launcher(client._config)
179+
feature_sources = [
180+
_source_to_argument(
181+
replace_bq_table_with_joined_view(feature_table, entity_source)
182+
)
183+
for feature_table in feature_tables
184+
]
185+
174186
return launcher.historical_feature_retrieval(
175187
RetrievalJobParameters(
176188
entity_source=_source_to_argument(entity_source),
177-
feature_tables_sources=[
178-
_source_to_argument(feature_table.batch_source)
179-
for feature_table in feature_tables
180-
],
189+
feature_tables_sources=feature_sources,
181190
feature_tables=[
182191
_feature_table_to_argument(client, feature_table)
183192
for feature_table in feature_tables
@@ -188,6 +197,35 @@ def start_historical_feature_retrieval_job(
188197
)
189198

190199

200+
def replace_bq_table_with_joined_view(
201+
feature_table: FeatureTable, entity_source: Union[FileSource, BigQuerySource],
202+
) -> Union[FileSource, BigQuerySource]:
203+
"""
204+
Applies optimization to historical retrieval. Instead of pulling all data from Batch Source,
205+
with this optimization we join feature values & entities on Data Warehouse side (improving data locality).
206+
Several conditions should be met to enable this optimization:
207+
* entities are staged to BigQuery
208+
* feature values are in in BigQuery
209+
* Entity columns are not mapped (ToDo: fix this limitation)
210+
:return: replacement for feature source
211+
"""
212+
if not isinstance(feature_table.batch_source, BigQuerySource):
213+
return feature_table.batch_source
214+
215+
if not isinstance(entity_source, BigQuerySource):
216+
return feature_table.batch_source
217+
218+
if any(
219+
entity in feature_table.batch_source.field_mapping
220+
for entity in feature_table.entities
221+
):
222+
return feature_table.batch_source
223+
224+
return create_bq_view_of_joined_features_and_entities(
225+
feature_table.batch_source, entity_source, feature_table.entities,
226+
)
227+
228+
191229
def _download_jar(remote_jar: str) -> str:
192230
remote_jar_parts = urlparse(remote_jar)
193231

sdk/python/feast/pyspark/launchers/gcloud/dataproc.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,8 @@ class DataprocClusterLauncher(JobLauncher):
104104
addition to the Feast SDK.
105105
"""
106106

107+
EXTERNAL_JARS = ["gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"]
108+
107109
def __init__(
108110
self, cluster_name: str, staging_location: str, region: str, project_id: str,
109111
):
@@ -157,7 +159,7 @@ def dataproc_submit(self, job_params: SparkJobParameters) -> Operation:
157159
job_config.update(
158160
{
159161
"spark_job": {
160-
"jar_file_uris": [main_file_uri],
162+
"jar_file_uris": [main_file_uri] + self.EXTERNAL_JARS,
161163
"main_class": job_params.get_class_name(),
162164
"args": job_params.get_arguments(),
163165
}
@@ -168,6 +170,7 @@ def dataproc_submit(self, job_params: SparkJobParameters) -> Operation:
168170
{
169171
"pyspark_job": {
170172
"main_python_file_uri": main_file_uri,
173+
"jar_file_uris": self.EXTERNAL_JARS,
171174
"args": job_params.get_arguments(),
172175
}
173176
}

sdk/python/feast/pyspark/launchers/standalone/local.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,8 @@ class StandaloneClusterLauncher(JobLauncher):
148148
Submits jobs to a standalone Spark cluster in client mode.
149149
"""
150150

151+
BQ_CONNECTOR_VERSION = "2.12:0.17.3"
152+
151153
def __init__(self, master_url: str, spark_home: str = None):
152154
"""
153155
This launcher executes the spark-submit script in a subprocess. The subprocess
@@ -184,6 +186,23 @@ def spark_submit(
184186
if ui_port:
185187
submission_cmd.extend(["--conf", f"spark.ui.port={ui_port}"])
186188

189+
# Workaround for https://github.com/apache/spark/pull/26552
190+
# Fix running spark job with bigquery connector (w/ shadowing) on JDK 9+
191+
submission_cmd.extend(
192+
[
193+
"--conf",
194+
"spark.executor.extraJavaOptions="
195+
"-Dcom.google.cloud.spark.bigquery.repackaged.io.netty.tryReflectionSetAccessible=true -Duser.timezone=GMT",
196+
"--conf",
197+
"spark.driver.extraJavaOptions="
198+
"-Dcom.google.cloud.spark.bigquery.repackaged.io.netty.tryReflectionSetAccessible=true -Duser.timezone=GMT",
199+
"--conf",
200+
"spark.sql.session.timeZone=UTC", # ignore local timezone
201+
"--packages",
202+
f"com.google.cloud.spark:spark-bigquery-with-dependencies_{self.BQ_CONNECTOR_VERSION}",
203+
]
204+
)
205+
187206
if job_params.get_extra_options():
188207
submission_cmd.extend(job_params.get_extra_options().split(" "))
189208

0 commit comments

Comments
 (0)