|
26 | 26 | from feast.registry import Registry |
27 | 27 | from feast.repo_config import FeastConfigBaseModel, RepoConfig |
28 | 28 |
|
| 29 | +from ...usage import log_exceptions_and_usage |
29 | 30 | from .bigquery_source import BigQuerySource |
30 | 31 |
|
31 | 32 | try: |
@@ -62,6 +63,7 @@ class BigQueryOfflineStoreConfig(FeastConfigBaseModel): |
62 | 63 |
|
63 | 64 | class BigQueryOfflineStore(OfflineStore): |
64 | 65 | @staticmethod |
| 66 | + @log_exceptions_and_usage(offline_store="bigquery") |
65 | 67 | def pull_latest_from_table_or_query( |
66 | 68 | config: RepoConfig, |
67 | 69 | data_source: DataSource, |
@@ -113,6 +115,7 @@ def pull_latest_from_table_or_query( |
113 | 115 | ) |
114 | 116 |
|
115 | 117 | @staticmethod |
| 118 | + @log_exceptions_and_usage(offline_store="bigquery") |
116 | 119 | def get_historical_features( |
117 | 120 | config: RepoConfig, |
118 | 121 | feature_views: List[FeatureView], |
@@ -221,7 +224,7 @@ def on_demand_feature_views(self) -> Optional[List[OnDemandFeatureView]]: |
221 | 224 |
|
222 | 225 | def _to_df_internal(self) -> pd.DataFrame: |
223 | 226 | with self._query_generator() as query: |
224 | | - df = self.client.query(query).to_dataframe(create_bqstorage_client=True) |
| 227 | + df = self._execute_query(query).to_dataframe(create_bqstorage_client=True) |
225 | 228 | return df |
226 | 229 |
|
227 | 230 | def to_sql(self) -> str: |
@@ -265,24 +268,29 @@ def to_bigquery( |
265 | 268 | return str(job_config.destination) |
266 | 269 |
|
267 | 270 | with self._query_generator() as query: |
268 | | - bq_job = self.client.query(query, job_config=job_config) |
269 | | - |
270 | | - if job_config.dry_run: |
271 | | - print( |
272 | | - "This query will process {} bytes.".format( |
273 | | - bq_job.total_bytes_processed |
274 | | - ) |
275 | | - ) |
276 | | - return None |
277 | | - |
278 | | - block_until_done(client=self.client, bq_job=bq_job, timeout=timeout) |
| 271 | + self._execute_query(query, job_config, timeout) |
279 | 272 |
|
280 | 273 | print(f"Done writing to '{job_config.destination}'.") |
281 | 274 | return str(job_config.destination) |
282 | 275 |
|
283 | 276 | def _to_arrow_internal(self) -> pyarrow.Table: |
284 | 277 | with self._query_generator() as query: |
285 | | - return self.client.query(query).to_arrow() |
| 278 | + return self._execute_query(query).to_arrow() |
| 279 | + |
| 280 | + @log_exceptions_and_usage |
| 281 | + def _execute_query( |
| 282 | + self, query, job_config=None, timeout: int = 1800 |
| 283 | + ) -> bigquery.job.query.QueryJob: |
| 284 | + bq_job = self.client.query(query, job_config=job_config) |
| 285 | + |
| 286 | + if job_config and job_config.dry_run: |
| 287 | + print( |
| 288 | + "This query will process {} bytes.".format(bq_job.total_bytes_processed) |
| 289 | + ) |
| 290 | + return None |
| 291 | + |
| 292 | + block_until_done(client=self.client, bq_job=bq_job, timeout=timeout) |
| 293 | + return bq_job |
286 | 294 |
|
287 | 295 |
|
288 | 296 | def block_until_done( |
|
0 commit comments