1515import multiprocessing
1616import os
1717import shutil
18- import tempfile
1918import uuid
2019from datetime import datetime
2120from itertools import groupby
2221from typing import Any , Dict , List , Optional , Union
23- from urllib .parse import urlparse
2422
2523import grpc
2624import pandas as pd
10199 GetOnlineFeaturesRequestV2 ,
102100)
103101from feast .serving .ServingService_pb2_grpc import ServingServiceStub
104- from feast .staging .storage_client import get_staging_client
102+ from feast .staging .entities import (
103+ stage_entities_to_bq ,
104+ stage_entities_to_fs ,
105+ table_reference_from_string ,
106+ )
105107
106108_logger = logging .getLogger (__name__ )
107109
@@ -855,6 +857,7 @@ def get_online_features(
855857 entity_rows = _infer_online_entity_rows (entity_rows ),
856858 project = project if project is not None else self .project ,
857859 ),
860+ timeout = self ._config .getint (CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY ),
858861 metadata = self ._get_grpc_metadata (),
859862 )
860863 except grpc .RpcError as e :
@@ -879,8 +882,11 @@ def get_historical_features(
879882 "feature_table:feature" where "feature_table" & "feature" refer to
880883 the feature and feature table names respectively.
881884 entity_source (Union[pd.DataFrame, FileSource, BigQuerySource]): Source for the entity rows.
882- If entity_source is a Panda DataFrame, the dataframe will be exported to the staging
883- location as parquet file. It is also assumed that the column event_timestamp is present
885+ If entity_source is a Panda DataFrame, the dataframe will be staged
886+ to become accessible by spark workers.
887+ If one of feature tables' source is in BigQuery - entities will be upload to BQ.
888+ Otherwise to remote file storage (derived from configured staging location).
889+ It is also assumed that the column event_timestamp is present
884890 in the dataframe, and is of type datetime without timezone information.
885891
886892 The user needs to make sure that the source (or staging location, if entity_source is
@@ -916,25 +922,27 @@ def get_historical_features(
916922 str (uuid .uuid4 ()),
917923 )
918924 output_format = self ._config .get (CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_FORMAT )
925+ feature_sources = [
926+ feature_table .batch_source for feature_table in feature_tables
927+ ]
919928
920929 if isinstance (entity_source , pd .DataFrame ):
921- staging_location = self ._config .get (CONFIG_SPARK_STAGING_LOCATION )
922- entity_staging_uri = urlparse (
923- os .path .join (staging_location , str (uuid .uuid4 ()))
924- )
925- staging_client = get_staging_client (entity_staging_uri .scheme )
926- with tempfile .NamedTemporaryFile () as df_export_path :
927- entity_source .to_parquet (df_export_path .name )
928- bucket = (
929- None
930- if entity_staging_uri .scheme == "file"
931- else entity_staging_uri .netloc
930+ if any (isinstance (source , BigQuerySource ) for source in feature_sources ):
931+ first_bq_source = [
932+ source
933+ for source in feature_sources
934+ if isinstance (source , BigQuerySource )
935+ ][0 ]
936+ source_ref = table_reference_from_string (
937+ first_bq_source .bigquery_options .table_ref
932938 )
933- staging_client . upload_file (
934- df_export_path . name , bucket , entity_staging_uri . path . lstrip ( "/" )
939+ entity_source = stage_entities_to_bq (
940+ entity_source , source_ref . project , source_ref . dataset_id
935941 )
936- entity_source = FileSource (
937- "event_timestamp" , ParquetFormat (), entity_staging_uri .geturl (),
942+ else :
943+ entity_source = stage_entities_to_fs (
944+ entity_source ,
945+ staging_location = self ._config .get (CONFIG_SPARK_STAGING_LOCATION ),
938946 )
939947
940948 if self ._use_job_service :
@@ -943,6 +951,7 @@ def get_historical_features(
943951 feature_refs = feature_refs ,
944952 entity_source = entity_source .to_proto (),
945953 project = project ,
954+ output_format = output_format ,
946955 output_location = output_location ,
947956 ),
948957 ** self ._extra_grpc_params (),
@@ -955,11 +964,7 @@ def get_historical_features(
955964 )
956965 else :
957966 return start_historical_feature_retrieval_job (
958- self ,
959- entity_source ,
960- feature_tables ,
961- output_format ,
962- os .path .join (output_location , str (uuid .uuid4 ())),
967+ self , entity_source , feature_tables , output_format , output_location ,
963968 )
964969
965970 def get_historical_features_df (
0 commit comments