Skip to content

Commit de050ab

Browse files
Jwrederpathade
authored andcommitted
fix(bigquery): Enable list inference for parquet loads in offline_write_batch
When pushing features with array/list types (e.g. STRING_LIST) to BigQuery via offline_write_batch, the data arrives as empty arrays because BigQuery's parquet loader does not infer list structure by default. Set parquet_options.enable_list_inference = True on the LoadJobConfig so array columns are written correctly. Fixes feast-dev#5845 Signed-off-by: Jonathan Wrede <wrede.jonathan00@gmail.com> Signed-off-by: RutujaPathade <73137503+RutujaPathade@users.noreply.github.com>
1 parent c9e368b commit de050ab

2 files changed

Lines changed: 65 additions & 0 deletions

File tree

sdk/python/feast/infra/offline_stores/bigquery.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -434,11 +434,15 @@ def offline_write_batch(
434434
location=config.offline_store.location,
435435
)
436436

437+
parquet_options = bigquery.ParquetOptions()
438+
parquet_options.enable_list_inference = True
439+
437440
job_config = bigquery.LoadJobConfig(
438441
source_format=bigquery.SourceFormat.PARQUET,
439442
schema=arrow_schema_to_bq_schema(pa_schema),
440443
create_disposition=config.offline_store.table_create_disposition,
441444
write_disposition="WRITE_APPEND", # Default but included for clarity
445+
parquet_options=parquet_options,
442446
)
443447

444448
with tempfile.TemporaryFile() as parquet_temp_file:

sdk/python/tests/unit/infra/offline_stores/test_bigquery.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,3 +200,64 @@ def test_table_property_unaffected_by_query_priority(self):
200200
timestamp_field="ts",
201201
)
202202
assert source.table == "project.dataset.write_target"
203+
204+
205+
class TestOfflineWriteBatch:
206+
@patch("feast.infra.offline_stores.bigquery._get_bigquery_client")
207+
def test_offline_write_batch_enables_list_inference(self, mock_get_client):
208+
"""LoadJobConfig must set parquet_options.enable_list_inference = True
209+
so that BigQuery correctly interprets PyArrow list columns from parquet.
210+
"""
211+
from unittest.mock import MagicMock
212+
213+
source = BigQuerySource(
214+
name="test",
215+
table="project.dataset.table",
216+
timestamp_field="ts",
217+
)
218+
fv = MagicMock()
219+
fv.batch_source = source
220+
221+
pa_schema = pyarrow.schema(
222+
[
223+
pyarrow.field("entity_id", pyarrow.string()),
224+
pyarrow.field("tags", pyarrow.list_(pyarrow.string())),
225+
pyarrow.field("ts", pyarrow.timestamp("us", tz="UTC")),
226+
]
227+
)
228+
pa_table = pyarrow.table(
229+
{
230+
"entity_id": ["e1"],
231+
"tags": [["a", "b"]],
232+
"ts": [datetime(2024, 1, 1, tzinfo=timezone.utc)],
233+
},
234+
schema=pa_schema,
235+
)
236+
237+
mock_client = MagicMock()
238+
mock_get_client.return_value = mock_client
239+
mock_client.load_table_from_file.return_value = MagicMock()
240+
241+
config = RepoConfig(
242+
registry="gs://test/registry.db",
243+
project="test",
244+
provider="gcp",
245+
offline_store=BigQueryOfflineStoreConfig(project_id="test-project"),
246+
online_store=SqliteOnlineStoreConfig(),
247+
)
248+
249+
with patch(
250+
"feast.infra.offline_stores.offline_utils.get_pyarrow_schema_from_batch_source",
251+
return_value=(pa_schema, pa_table.column_names),
252+
):
253+
BigQueryOfflineStore.offline_write_batch(
254+
config=config,
255+
feature_view=fv,
256+
table=pa_table,
257+
progress=None,
258+
)
259+
260+
call_kwargs = mock_client.load_table_from_file.call_args
261+
job_config = call_kwargs[1]["job_config"]
262+
assert job_config.parquet_options is not None
263+
assert job_config.parquet_options.enable_list_inference is True

0 commit comments

Comments
 (0)