1515from feast .data_source import BigQuerySource , DataSource
1616from feast .errors import FeastProviderLoginError
1717from feast .feature_view import FeatureView
18- from feast .infra .offline_stores .offline_store import OfflineStore
18+ from feast .infra .offline_stores .offline_store import OfflineStore , RetrievalJob
1919from feast .infra .provider import (
2020 DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL ,
21- RetrievalJob ,
2221 _get_requested_feature_views_to_features_dict ,
2322)
2423from feast .registry import Registry
@@ -52,14 +51,15 @@ class BigQueryOfflineStoreConfig(FeastConfigBaseModel):
5251class BigQueryOfflineStore (OfflineStore ):
5352 @staticmethod
5453 def pull_latest_from_table_or_query (
54+ config : RepoConfig ,
5555 data_source : DataSource ,
5656 join_key_columns : List [str ],
5757 feature_name_columns : List [str ],
5858 event_timestamp_column : str ,
5959 created_timestamp_column : Optional [str ],
6060 start_date : datetime ,
6161 end_date : datetime ,
62- ) -> pyarrow . Table :
62+ ) -> RetrievalJob :
6363 assert isinstance (data_source , BigQuerySource )
6464 from_expression = data_source .get_table_query_string ()
6565
@@ -74,6 +74,7 @@ def pull_latest_from_table_or_query(
7474 timestamp_desc_string = " DESC, " .join (timestamps ) + " DESC"
7575 field_string = ", " .join (join_key_columns + feature_name_columns + timestamps )
7676
77+ client = _get_bigquery_client (project = config .offline_store .project_id )
7778 query = f"""
7879 SELECT { field_string }
7980 FROM (
@@ -84,14 +85,7 @@ def pull_latest_from_table_or_query(
8485 )
8586 WHERE _feast_row = 1
8687 """
87-
88- return BigQueryOfflineStore ._pull_query (query )
89-
90- @staticmethod
91- def _pull_query (query : str ) -> pyarrow .Table :
92- client = _get_bigquery_client ()
93- query_job = client .query (query )
94- return query_job .to_arrow ()
88+ return BigQueryRetrievalJob (query = query , client = client , config = config )
9589
9690 @staticmethod
9791 def get_historical_features (
@@ -104,19 +98,18 @@ def get_historical_features(
10498 full_feature_names : bool = False ,
10599 ) -> RetrievalJob :
106100 # TODO: Add entity_df validation in order to fail before interacting with BigQuery
101+ assert isinstance (config .offline_store , BigQueryOfflineStoreConfig )
107102
108- client = _get_bigquery_client ()
109-
103+ client = _get_bigquery_client (project = config .offline_store .project_id )
110104 expected_join_keys = _get_join_keys (project , feature_views , registry )
111105
112106 assert isinstance (config .offline_store , BigQueryOfflineStoreConfig )
113- dataset_project = config .offline_store .project_id or client .project
114107
115108 table = _upload_entity_df_into_bigquery (
116109 client = client ,
117110 project = config .project ,
118111 dataset_name = config .offline_store .dataset ,
119- dataset_project = dataset_project ,
112+ dataset_project = client . project ,
120113 entity_df = entity_df ,
121114 )
122115
@@ -265,10 +258,7 @@ def _block_until_done():
265258 if not job_config :
266259 today = date .today ().strftime ("%Y%m%d" )
267260 rand_id = str (uuid .uuid4 ())[:7 ]
268- dataset_project = (
269- self .config .offline_store .project_id or self .client .project
270- )
271- path = f"{ dataset_project } .{ self .config .offline_store .dataset } .historical_{ today } _{ rand_id } "
261+ path = f"{ self .client .project } .{ self .config .offline_store .dataset } .historical_{ today } _{ rand_id } "
272262 job_config = bigquery .QueryJobConfig (destination = path )
273263
274264 bq_job = self .client .query (self .query , job_config = job_config )
@@ -287,6 +277,9 @@ def _block_until_done():
287277 print (f"Done writing to '{ job_config .destination } '." )
288278 return str (job_config .destination )
289279
280+ def to_table (self ) -> pyarrow .Table :
281+ return self .client .query (self .query ).to_arrow ()
282+
290283
291284@dataclass (frozen = True )
292285class FeatureViewQueryContext :
@@ -451,9 +444,9 @@ def build_point_in_time_query(
451444 return query
452445
453446
454- def _get_bigquery_client ():
447+ def _get_bigquery_client (project : Optional [ str ] = None ):
455448 try :
456- client = bigquery .Client ()
449+ client = bigquery .Client (project = project )
457450 except DefaultCredentialsError as e :
458451 raise FeastProviderLoginError (
459452 str (e )
0 commit comments