|
1 | 1 | import configparser |
2 | 2 | import os |
3 | 3 | import uuid |
4 | | -from datetime import datetime |
| 4 | +from datetime import datetime, timedelta |
5 | 5 | from itertools import groupby |
6 | 6 | from typing import Dict, List, Optional, Union, cast |
7 | 7 |
|
8 | 8 | import pandas as pd |
9 | 9 | import redis |
10 | 10 | from croniter import croniter |
| 11 | +from google.cloud import bigquery |
11 | 12 |
|
12 | 13 | import feast |
13 | 14 | from feast.config import Config |
14 | 15 | from feast.constants import ConfigOptions as feast_opt |
15 | 16 | from feast.data_source import BigQuerySource, FileSource |
16 | 17 | from feast.grpc.grpc import create_grpc_channel |
17 | | -from feast.staging.entities import ( |
18 | | - stage_entities_to_bq, |
19 | | - stage_entities_to_fs, |
20 | | - table_reference_from_string, |
21 | | -) |
| 18 | +from feast.staging.entities import stage_entities_to_fs, table_reference_from_string |
22 | 19 | from feast_spark.api.JobService_pb2 import ( |
23 | 20 | GetHealthMetricsRequest, |
24 | 21 | GetHistoricalFeaturesRequest, |
|
51 | 48 | ) |
52 | 49 |
|
53 | 50 |
|
| 51 | +def stage_entities_to_bq_with_partition( |
| 52 | + entity_source: pd.DataFrame, project: str, dataset: str |
| 53 | +) -> BigQuerySource: |
| 54 | + """ |
| 55 | + Stores given (entity) dataframe as new table in BQ. Name of the table generated based on current time. |
| 56 | + Table will expire in 1 day. |
| 57 | + Returns BigQuerySource with reference to created table. |
| 58 | + """ |
| 59 | + |
| 60 | + bq_client: bigquery.Client = bigquery.Client() |
| 61 | + destination = bigquery.TableReference( |
| 62 | + bigquery.DatasetReference(project, dataset), |
| 63 | + f"_entities_{datetime.now():%Y%m%d%H%M%s}", |
| 64 | + ) |
| 65 | + |
| 66 | + # prevent casting ns -> ms exception inside pyarrow |
| 67 | + entity_source["event_timestamp"] = entity_source["event_timestamp"].dt.floor("ms") |
| 68 | + |
| 69 | + load_job_config = bigquery.LoadJobConfig( |
| 70 | + time_partitioning=bigquery.TimePartitioning( |
| 71 | + type_=bigquery.TimePartitioningType.DAY, field="event_timestamp", |
| 72 | + ) |
| 73 | + ) |
| 74 | + load_job: bigquery.LoadJob = bq_client.load_table_from_dataframe( |
| 75 | + entity_source, destination, job_config=load_job_config, |
| 76 | + ) |
| 77 | + load_job.result() # wait until complete |
| 78 | + |
| 79 | + dest_table: bigquery.Table = bq_client.get_table(destination) |
| 80 | + dest_table.expires = datetime.now() + timedelta(days=1) |
| 81 | + bq_client.update_table(dest_table, fields=["expires"]) |
| 82 | + |
| 83 | + return BigQuerySource( |
| 84 | + event_timestamp_column="event_timestamp", |
| 85 | + table_ref=f"{destination.project}:{destination.dataset_id}.{destination.table_id}", |
| 86 | + ) |
| 87 | + |
| 88 | + |
54 | 89 | class Client: |
55 | 90 | _feast: feast.Client |
56 | 91 |
|
@@ -197,7 +232,7 @@ def get_historical_features( |
197 | 232 | staging_bq_project = source_ref.project |
198 | 233 | staging_bq_dataset = source_ref.dataset_id |
199 | 234 |
|
200 | | - entity_source = stage_entities_to_bq( |
| 235 | + entity_source = stage_entities_to_bq_with_partition( |
201 | 236 | entity_source, staging_bq_project, staging_bq_dataset |
202 | 237 | ) |
203 | 238 | else: |
|
0 commit comments