Skip to content

Commit 21f1ef7

Browse files
authored
Fix list feature format for BigQuery offline datasources. (feast-dev#1889)
* Enable test for list features in BQ Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com> * Remove specifc handling of BQ list types Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com> * Enable Parquet list inference for BigQuery Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com> * Enable `use_compliant_nested_type` Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com> * Add potentially missing import Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com> * Upload all data to BQ in ARRAY safe manner Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com> * Handle empty list in `python_value_to_proto_value` Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com>
1 parent af219e7 commit 21f1ef7

6 files changed

Lines changed: 74 additions & 60 deletions

File tree

sdk/python/feast/infra/offline_stores/bigquery.py

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import numpy as np
66
import pandas as pd
77
import pyarrow
8+
import pyarrow.parquet
89
from pydantic import StrictStr
910
from pydantic.typing import Literal
1011
from tenacity import Retrying, retry_if_exception_type, stop_after_delay, wait_fixed
@@ -222,11 +223,8 @@ def to_bigquery(
222223
job_config = bigquery.QueryJobConfig(destination=path)
223224

224225
if not job_config.dry_run and self.on_demand_feature_views is not None:
225-
transformed_df = self.to_df()
226-
job = self.client.load_table_from_dataframe(
227-
transformed_df,
228-
job_config.destination,
229-
job_config=bigquery.LoadJobConfig(),
226+
job = _write_pyarrow_table_to_bq(
227+
self.client, self.to_arrow(), job_config.destination
230228
)
231229
job.result()
232230
print(f"Done writing to '{job_config.destination}'.")
@@ -331,12 +329,7 @@ def _upload_entity_df_and_get_entity_schema(
331329
elif isinstance(entity_df, pd.DataFrame):
332330
# Drop the index so that we dont have unnecessary columns
333331
entity_df.reset_index(drop=True, inplace=True)
334-
335-
# Upload the dataframe into BigQuery, creating a temporary table
336-
job_config = bigquery.LoadJobConfig()
337-
job = client.load_table_from_dataframe(
338-
entity_df, table_name, job_config=job_config
339-
)
332+
job = _write_df_to_bq(client, entity_df, table_name)
340333
block_until_done(client, job)
341334

342335
entity_schema = dict(zip(entity_df.columns, entity_df.dtypes))
@@ -371,6 +364,44 @@ def _get_bigquery_client(project: Optional[str] = None):
371364
return client
372365

373366

367+
def _write_df_to_bq(
368+
client: bigquery.Client, df: pd.DataFrame, table_name: str
369+
) -> bigquery.LoadJob:
370+
# It is complicated to get BQ to understand that we want an ARRAY<value_type>
371+
# https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#parquetoptions
372+
# https://github.com/googleapis/python-bigquery/issues/19
373+
writer = pyarrow.BufferOutputStream()
374+
pyarrow.parquet.write_table(
375+
pyarrow.Table.from_pandas(df), writer, use_compliant_nested_type=True
376+
)
377+
return _write_pyarrow_buffer_to_bq(client, writer.getvalue(), table_name,)
378+
379+
380+
def _write_pyarrow_table_to_bq(
381+
client: bigquery.Client, table: pyarrow.Table, table_name: str
382+
) -> bigquery.LoadJob:
383+
# It is complicated to get BQ to understand that we want an ARRAY<value_type>
384+
# https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#parquetoptions
385+
# https://github.com/googleapis/python-bigquery/issues/19
386+
writer = pyarrow.BufferOutputStream()
387+
pyarrow.parquet.write_table(table, writer, use_compliant_nested_type=True)
388+
return _write_pyarrow_buffer_to_bq(client, writer.getvalue(), table_name,)
389+
390+
391+
def _write_pyarrow_buffer_to_bq(
392+
client: bigquery.Client, buf: pyarrow.Buffer, table_name: str
393+
) -> bigquery.LoadJob:
394+
reader = pyarrow.BufferReader(buf)
395+
396+
parquet_options = bigquery.format_options.ParquetOptions()
397+
parquet_options.enable_list_inference = True
398+
job_config = bigquery.LoadJobConfig()
399+
job_config.source_format = bigquery.SourceFormat.PARQUET
400+
job_config.parquet_options = parquet_options
401+
402+
return client.load_table_from_file(reader, table_name, job_config=job_config,)
403+
404+
374405
# TODO: Optimizations
375406
# * Use GENERATE_UUID() instead of ROW_NUMBER(), or join on entity columns directly
376407
# * Precompute ROW_NUMBER() so that it doesn't have to be recomputed for every query on entity_dataframe

sdk/python/feast/type_map.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -277,11 +277,16 @@ def _python_value_to_proto_value(feast_value_type, value) -> ProtoValue:
277277
def python_value_to_proto_value(
278278
value: Any, feature_type: ValueType = None
279279
) -> ProtoValue:
280-
value_type = (
281-
python_type_to_feast_value_type("", value)
282-
if value is not None
283-
else feature_type
284-
)
280+
value_type = feature_type
281+
if value is not None:
282+
if isinstance(value, (list, np.ndarray)):
283+
value_type = (
284+
feature_type
285+
if len(value) == 0
286+
else python_type_to_feast_value_type("", value)
287+
)
288+
else:
289+
value_type = python_type_to_feast_value_type("", value)
285290
return _python_value_to_proto_value(value_type, value)
286291

287292

sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@
66

77
from feast import BigQuerySource
88
from feast.data_source import DataSource
9-
from feast.infra.offline_stores.bigquery import BigQueryOfflineStoreConfig
9+
from feast.infra.offline_stores.bigquery import (
10+
BigQueryOfflineStoreConfig,
11+
_write_df_to_bq,
12+
)
1013
from tests.integration.feature_repos.universal.data_source_creator import (
1114
DataSourceCreator,
1215
)
@@ -61,15 +64,12 @@ def create_data_source(
6164

6265
self.create_dataset()
6366

64-
job_config = bigquery.LoadJobConfig()
6567
if self.gcp_project not in destination_name:
6668
destination_name = (
6769
f"{self.gcp_project}.{self.project_name}.{destination_name}"
6870
)
6971

70-
job = self.client.load_table_from_dataframe(
71-
df, destination_name, job_config=job_config
72-
)
72+
job = _write_df_to_bq(self.client, df, destination_name)
7373
job.result()
7474

7575
self.tables.append(destination_name)

sdk/python/tests/integration/offline_store/test_historical_retrieval.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919
from feast.feature import Feature
2020
from feast.feature_store import FeatureStore, _validate_feature_refs
2121
from feast.feature_view import FeatureView
22-
from feast.infra.offline_stores.bigquery import BigQueryOfflineStoreConfig
22+
from feast.infra.offline_stores.bigquery import (
23+
BigQueryOfflineStoreConfig,
24+
_write_df_to_bq,
25+
)
2326
from feast.infra.offline_stores.offline_utils import (
2427
DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL,
2528
)
@@ -62,9 +65,8 @@ def stage_driver_hourly_stats_parquet_source(directory, df):
6265

6366
def stage_driver_hourly_stats_bigquery_source(df, table_id):
6467
client = bigquery.Client()
65-
job_config = bigquery.LoadJobConfig()
6668
df.reset_index(drop=True, inplace=True)
67-
job = client.load_table_from_dataframe(df, table_id, job_config=job_config)
69+
job = _write_df_to_bq(client, df, table_id)
6870
job.result()
6971

7072

@@ -99,9 +101,8 @@ def feature_service(name: str, views) -> FeatureService:
99101

100102
def stage_customer_daily_profile_bigquery_source(df, table_id):
101103
client = bigquery.Client()
102-
job_config = bigquery.LoadJobConfig()
103104
df.reset_index(drop=True, inplace=True)
104-
job = client.load_table_from_dataframe(df, table_id, job_config=job_config)
105+
job = _write_df_to_bq(client, df, table_id)
105106
job.result()
106107

107108

@@ -231,9 +232,8 @@ def get_expected_training_df(
231232

232233
def stage_orders_bigquery(df, table_id):
233234
client = bigquery.Client()
234-
job_config = bigquery.LoadJobConfig()
235235
df.reset_index(drop=True, inplace=True)
236-
job = client.load_table_from_dataframe(df, table_id, job_config=job_config)
236+
job = _write_df_to_bq(client, df, table_id)
237237
job.result()
238238

239239

sdk/python/tests/integration/registration/test_universal_types.py

Lines changed: 8 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,6 @@ def populate_test_configs(offline: bool):
3636
# For offline tests, don't need to vary for online store
3737
if offline and test_repo_config.online_store == REDIS_CONFIG:
3838
continue
39-
# TODO(https://github.com/feast-dev/feast/issues/1839): Fix BQ materialization of list features
40-
if (
41-
not offline
42-
and test_repo_config.provider == "gcp"
43-
and feature_is_list is True
44-
):
45-
continue
4639
configs.append(
4740
TypeTestConfig(
4841
entity_type=entity_type,
@@ -255,16 +248,10 @@ def assert_feature_list_types(
255248
"bool": "bool",
256249
}
257250
assert str(historical_features_df.dtypes["value"]) == "object"
258-
if provider == "gcp":
259-
assert (
260-
feature_list_dtype_to_expected_historical_feature_list_dtype[feature_dtype]
261-
in type(historical_features_df.value[0]["list"][0]["item"]).__name__
262-
)
263-
else:
264-
assert (
265-
feature_list_dtype_to_expected_historical_feature_list_dtype[feature_dtype]
266-
in type(historical_features_df.value[0][0]).__name__
267-
)
251+
assert (
252+
feature_list_dtype_to_expected_historical_feature_list_dtype[feature_dtype]
253+
in type(historical_features_df.value[0][0]).__name__
254+
)
268255

269256

270257
def assert_expected_arrow_types(
@@ -287,18 +274,10 @@ def assert_expected_arrow_types(
287274
feature_dtype
288275
]
289276
if feature_is_list:
290-
if provider == "gcp":
291-
assert str(
292-
historical_features_arrow.schema.field_by_name("value").type
293-
) in [
294-
f"struct<list: list<item: struct<item: {arrow_type}>> not null>",
295-
f"struct<list: list<item: struct<item: {arrow_type}>>>",
296-
]
297-
else:
298-
assert (
299-
str(historical_features_arrow.schema.field_by_name("value").type)
300-
== f"list<item: {arrow_type}>"
301-
)
277+
assert (
278+
str(historical_features_arrow.schema.field_by_name("value").type)
279+
== f"list<item: {arrow_type}>"
280+
)
302281
else:
303282
assert (
304283
str(historical_features_arrow.schema.field_by_name("value").type)

sdk/python/tests/utils/data_source_utils.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
from feast import BigQuerySource, FileSource
99
from feast.data_format import ParquetFormat
10+
from feast.infra.offline_stores.bigquery import _write_df_to_bq
1011

1112

1213
@contextlib.contextmanager
@@ -38,9 +39,7 @@ def simple_bq_source_using_table_ref_arg(
3839
client.update_dataset(dataset, ["default_table_expiration_ms"])
3940
table_ref = f"{gcp_project}.{bigquery_dataset}.table_{random.randrange(100, 999)}"
4041

41-
job = client.load_table_from_dataframe(
42-
df, table_ref, job_config=bigquery.LoadJobConfig()
43-
)
42+
job = _write_df_to_bq(client, df, table_ref)
4443
job.result()
4544

4645
return BigQuerySource(

0 commit comments

Comments
 (0)