Skip to content

Commit f1e3072

Browse files
authored
Support pulling query from BigQuery (#1403)
* WIP pull query from BQ Signed-off-by: Jacob Klegar <jacob@tecton.ai> * Add test Signed-off-by: Jacob Klegar <jacob@tecton.ai> * Use read API instead of internal method Signed-off-by: Jacob Klegar <jacob@tecton.ai> * Use separate project per test instance Signed-off-by: Jacob Klegar <jacob@tecton.ai> * Remove extraneous online store config Signed-off-by: Jacob Klegar <jacob@tecton.ai>
1 parent 0dbd905 commit f1e3072

File tree

5 files changed

+146
-109
lines changed

5 files changed

+146
-109
lines changed

sdk/python/feast/data_source.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ def to_proto(self) -> DataSourceProto.BigQueryOptions:
171171
"""
172172

173173
bigquery_options_proto = DataSourceProto.BigQueryOptions(
174-
table_ref=self.table_ref,
174+
table_ref=self.table_ref, query=self.query,
175175
)
176176

177177
return bigquery_options_proto
@@ -461,13 +461,16 @@ def from_proto(data_source):
461461
created_timestamp_column=data_source.created_timestamp_column,
462462
date_partition_column=data_source.date_partition_column,
463463
)
464-
elif data_source.bigquery_options.table_ref:
464+
elif (
465+
data_source.bigquery_options.table_ref or data_source.bigquery_options.query
466+
):
465467
data_source_obj = BigQuerySource(
466468
field_mapping=data_source.field_mapping,
467469
table_ref=data_source.bigquery_options.table_ref,
468470
event_timestamp_column=data_source.event_timestamp_column,
469471
created_timestamp_column=data_source.created_timestamp_column,
470472
date_partition_column=data_source.date_partition_column,
473+
query=data_source.bigquery_options.query,
471474
)
472475
elif (
473476
data_source.kafka_options.bootstrap_servers

sdk/python/feast/feature_store.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -229,10 +229,6 @@ def materialize(
229229
raise NotImplementedError(
230230
"This function is not yet implemented for File data sources"
231231
)
232-
if not feature_view.input.table_ref:
233-
raise NotImplementedError(
234-
f"This function is only implemented for FeatureViews with a table_ref; {feature_view.name} does not have one."
235-
)
236232
(
237233
entity_names,
238234
feature_names,
@@ -241,7 +237,7 @@ def materialize(
241237
) = _run_reverse_field_mapping(feature_view)
242238

243239
offline_store = get_offline_store(self.config)
244-
table = offline_store.pull_latest_from_table(
240+
table = offline_store.pull_latest_from_table_or_query(
245241
feature_view.input,
246242
entity_names,
247243
feature_names,

sdk/python/feast/offline_store.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ class OfflineStore(ABC):
8686

8787
@staticmethod
8888
@abstractmethod
89-
def pull_latest_from_table(
89+
def pull_latest_from_table_or_query(
9090
data_source: DataSource,
9191
entity_names: List[str],
9292
feature_names: List[str],
@@ -115,7 +115,7 @@ def get_historical_features(
115115

116116
class BigQueryOfflineStore(OfflineStore):
117117
@staticmethod
118-
def pull_latest_from_table(
118+
def pull_latest_from_table_or_query(
119119
data_source: DataSource,
120120
entity_names: List[str],
121121
feature_names: List[str],
@@ -125,11 +125,10 @@ def pull_latest_from_table(
125125
end_date: datetime,
126126
) -> pyarrow.Table:
127127
assert isinstance(data_source, BigQuerySource)
128-
table_ref = data_source.table_ref
129-
if table_ref is None:
130-
raise ValueError(
131-
"This function can only be called on a FeatureView with a table_ref"
132-
)
128+
if data_source.table_ref:
129+
from_expression = f"`{data_source.table_ref}`"
130+
else:
131+
from_expression = f"({data_source.query})"
133132

134133
partition_by_entity_string = ", ".join(entity_names)
135134
if partition_by_entity_string != "":
@@ -145,7 +144,7 @@ def pull_latest_from_table(
145144
FROM (
146145
SELECT {field_string},
147146
ROW_NUMBER() OVER({partition_by_entity_string} ORDER BY {timestamp_desc_string}) AS _feast_row
148-
FROM `{table_ref}`
147+
FROM {from_expression}
149148
WHERE {event_timestamp_column} BETWEEN TIMESTAMP('{start_date}') AND TIMESTAMP('{end_date}')
150149
)
151150
WHERE _feast_row = 1
@@ -287,7 +286,7 @@ def build_point_in_time_query(
287286

288287
class FileOfflineStore(OfflineStore):
289288
@staticmethod
290-
def pull_latest_from_table(
289+
def pull_latest_from_table_or_query(
291290
data_source: DataSource,
292291
entity_names: List[str],
293292
feature_names: List[str],

sdk/python/tests/test_bigquery_ingestion.py

Lines changed: 0 additions & 93 deletions
This file was deleted.
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
import time
2+
from datetime import datetime, timedelta
3+
4+
import pandas as pd
5+
import pytest
6+
from google.cloud import bigquery
7+
8+
from feast.data_source import BigQuerySource
9+
from feast.feature import Feature
10+
from feast.feature_store import FeatureStore
11+
from feast.feature_view import FeatureView
12+
from feast.repo_config import RepoConfig
13+
from feast.value_type import ValueType
14+
15+
16+
@pytest.mark.integration
17+
class TestMaterializeFromBigQueryToDatastore:
18+
def setup_method(self):
19+
self.client = bigquery.Client()
20+
self.gcp_project = self.client.project
21+
self.bigquery_dataset = "test_ingestion"
22+
dataset = bigquery.Dataset(f"{self.gcp_project}.{self.bigquery_dataset}")
23+
self.client.create_dataset(dataset, exists_ok=True)
24+
dataset.default_table_expiration_ms = (
25+
1000 * 60 * 60 * 24 * 14
26+
) # 2 weeks in milliseconds
27+
self.client.update_dataset(dataset, ["default_table_expiration_ms"])
28+
29+
def test_bigquery_table_to_datastore_correctness(self):
30+
# create dataset
31+
ts = pd.Timestamp.now(tz="UTC").round("ms")
32+
data = {
33+
"id": [1, 2, 1],
34+
"value": [0.1, 0.2, 0.3],
35+
"ts_1": [ts - timedelta(minutes=2), ts, ts],
36+
"created_ts": [ts, ts, ts],
37+
}
38+
df = pd.DataFrame.from_dict(data)
39+
40+
# load dataset into BigQuery
41+
job_config = bigquery.LoadJobConfig()
42+
table_id = f"{self.gcp_project}.{self.bigquery_dataset}.table_correctness_{int(time.time())}"
43+
job = self.client.load_table_from_dataframe(df, table_id, job_config=job_config)
44+
job.result()
45+
46+
# create FeatureView
47+
fv = FeatureView(
48+
name="test_bq_table_correctness",
49+
entities=["driver_id"],
50+
features=[Feature("value", ValueType.FLOAT)],
51+
ttl=timedelta(minutes=5),
52+
input=BigQuerySource(
53+
event_timestamp_column="ts",
54+
table_ref=table_id,
55+
created_timestamp_column="created_ts",
56+
field_mapping={"ts_1": "ts", "id": "driver_id"},
57+
date_partition_column="",
58+
),
59+
)
60+
config = RepoConfig(
61+
metadata_store="./metadata.db",
62+
project=f"test_bq_table_correctness_{int(time.time())}",
63+
provider="gcp",
64+
)
65+
fs = FeatureStore(config=config)
66+
fs.apply([fv])
67+
68+
# run materialize()
69+
fs.materialize(
70+
[fv.name],
71+
datetime.utcnow() - timedelta(minutes=5),
72+
datetime.utcnow() - timedelta(minutes=0),
73+
)
74+
75+
# check result of materialize()
76+
response_dict = fs.get_online_features(
77+
[f"{fv.name}:value"], [{"driver_id": 1}]
78+
).to_dict()
79+
assert abs(response_dict[f"{fv.name}:value"][0] - 0.3) < 1e-6
80+
81+
def test_bigquery_query_to_datastore_correctness(self):
82+
# create dataset
83+
ts = pd.Timestamp.now(tz="UTC").round("ms")
84+
data = {
85+
"id": [1, 2, 1],
86+
"value": [0.1, 0.2, 0.3],
87+
"ts_1": [ts - timedelta(minutes=2), ts, ts],
88+
"created_ts": [ts, ts, ts],
89+
}
90+
df = pd.DataFrame.from_dict(data)
91+
92+
# load dataset into BigQuery
93+
job_config = bigquery.LoadJobConfig()
94+
table_id = f"{self.gcp_project}.{self.bigquery_dataset}.query_correctness_{int(time.time())}"
95+
query = f"SELECT * FROM `{table_id}`"
96+
job = self.client.load_table_from_dataframe(df, table_id, job_config=job_config)
97+
job.result()
98+
99+
# create FeatureView
100+
fv = FeatureView(
101+
name="test_bq_query_correctness",
102+
entities=["driver_id"],
103+
features=[Feature("value", ValueType.FLOAT)],
104+
ttl=timedelta(minutes=5),
105+
input=BigQuerySource(
106+
event_timestamp_column="ts",
107+
created_timestamp_column="created_ts",
108+
field_mapping={"ts_1": "ts", "id": "driver_id"},
109+
date_partition_column="",
110+
query=query,
111+
),
112+
)
113+
config = RepoConfig(
114+
metadata_store="./metadata.db",
115+
project=f"test_bq_query_correctness_{int(time.time())}",
116+
provider="gcp",
117+
)
118+
fs = FeatureStore(config=config)
119+
fs.apply([fv])
120+
121+
# run materialize()
122+
fs.materialize(
123+
[fv.name],
124+
datetime.utcnow() - timedelta(minutes=5),
125+
datetime.utcnow() - timedelta(minutes=0),
126+
)
127+
128+
# check result of materialize()
129+
response_dict = fs.get_online_features(
130+
[f"{fv.name}:value"], [{"driver_id": 1}]
131+
).to_dict()
132+
assert abs(response_dict[f"{fv.name}:value"][0] - 0.3) < 1e-6

0 commit comments

Comments
 (0)