1515
1616import logging
1717import os
18- import sys
18+ import time
1919from collections import OrderedDict
2020from typing import Dict , Union
2121from typing import List
22+
2223import grpc
23- import time
2424import pandas as pd
2525import pyarrow as pa
2626import pyarrow .parquet as pq
3535)
3636from feast .core .CoreService_pb2_grpc import CoreServiceStub
3737from feast .core .FeatureSet_pb2 import FeatureSetStatus
38- from feast .exceptions import format_grpc_exception
3938from feast .feature_set import FeatureSet , Entity
4039from feast .job import Job
40+ from feast .loaders .abstract_producer import get_producer
4141from feast .loaders .file import export_dataframe_to_staging_location
42- from feast .loaders .ingest import ingest_table_to_kafka
42+ from feast .loaders .ingest import KAFKA_CHUNK_PRODUCTION_TIMEOUT
43+ from feast .loaders .ingest import get_feature_row_chunks
4344from feast .serving .ServingService_pb2 import GetFeastServingInfoResponse
4445from feast .serving .ServingService_pb2 import (
4546 GetOnlineFeaturesRequest ,
@@ -259,7 +260,7 @@ def _apply_feature_set(self, feature_set: FeatureSet):
259260 print (f"No change detected or applied: { feature_set .name } " )
260261
261262 # Deep copy from the returned feature set to the local feature set
262- feature_set .update_from_feature_set (applied_fs )
263+ feature_set ._update_from_feature_set (applied_fs )
263264
264265 def list_feature_sets (self ) -> List [FeatureSet ]:
265266 """
@@ -472,35 +473,55 @@ def get_online_features(
472473 ) # type: GetOnlineFeaturesResponse
473474
474475 def ingest (
475- self ,
476- feature_set : Union [str , FeatureSet ],
477- source : Union [pd .DataFrame , str ],
478- version : int = None ,
479- force_update : bool = False ,
480- max_workers : int = CPU_COUNT ,
481- disable_progress_bar : bool = False ,
482- chunk_size : int = 5000 ,
483- timeout : int = None ,
484- ):
476+ self ,
477+ feature_set : Union [str , FeatureSet ],
478+ source : Union [pd .DataFrame , str ],
479+ chunk_size : int = 10000 ,
480+ version : int = None ,
481+ force_update : bool = False ,
482+ max_workers : int = max ( CPU_COUNT - 1 , 1 ) ,
483+ disable_progress_bar : bool = False ,
484+ timeout : int = KAFKA_CHUNK_PRODUCTION_TIMEOUT
485+ ) -> None :
485486 """
486487 Loads feature data into Feast for a specific feature set.
487488
488489 Args:
489- feature_set: Name of feature set or a feature set object
490- source: Either a file path or Pandas Dataframe to ingest into Feast
490+ feature_set (typing.Union[str, FeatureSet]):
491+ Feature set object or the string name of the feature set
492+ (without a version).
493+
494+ source (typing.Union[pd.DataFrame, str]):
495+ Either a file path or Pandas Dataframe to ingest into Feast
491496 Files that are currently supported:
492- * parquet
493- * csv
494- * json
495- version: Feature set version
496- force_update: Automatically update feature set based on source data
497- prior to ingesting. This will also register changes to Feast
498- max_workers: Number of worker processes to use to encode values
499- disable_progress_bar: Disable printing of progress statistics
500- chunk_size: Maximum amount of rows to load into memory and ingest at
501- a time
502- timeout: Seconds to wait before ingestion times out
497+ * parquet
498+ * csv
499+ * json
500+
501+ chunk_size (int):
502+ Amount of rows to load and ingest at a time.
503+
504+ version (int):
505+ Feature set version.
506+
507+ force_update (bool):
508+ Automatically update feature set based on source data prior to
509+ ingesting. This will also register changes to Feast.
510+
511+ max_workers (int):
512+ Number of worker processes to use to encode values.
513+
514+ disable_progress_bar (bool):
515+ Disable printing of progress statistics.
516+
517+ timeout (int):
518+ Timeout in seconds to wait for completion.
519+
520+ Returns:
521+ None:
522+ None
503523 """
524+
504525 if isinstance (feature_set , FeatureSet ):
505526 name = feature_set .name
506527 if version is None :
@@ -510,15 +531,21 @@ def ingest(
510531 else :
511532 raise Exception (f"Feature set name must be provided" )
512533
513- table = _read_table_from_source (source )
534+ # Read table and get row count
535+ tmp_table_name = _read_table_from_source (
536+ source , chunk_size , max_workers
537+ )
538+
539+ pq_file = pq .ParquetFile (tmp_table_name )
514540
515- # Update the feature set based on DataFrame schema
516- if force_update :
517- # Use a small as reference DataFrame to infer fields
518- ref_df = table .to_batches (max_chunksize = 20 )[0 ].to_pandas ()
541+ row_count = pq_file .metadata .num_rows
519542
520- feature_set .infer_fields_from_df (
521- ref_df , discard_unused_fields = True , replace_existing_features = True
543+ # Update the feature set based on PyArrow table of first row group
544+ if force_update :
545+ feature_set .infer_fields_from_pa (
546+ table = pq_file .read_row_group (0 ),
547+ discard_unused_fields = True ,
548+ replace_existing_features = True
522549 )
523550 self .apply (feature_set )
524551 current_time = time .time ()
@@ -538,22 +565,49 @@ def ingest(
538565 if timeout is not None :
539566 timeout = timeout - int (time .time () - current_time )
540567
541- if feature_set .source .source_type == "Kafka" :
542- print ("Ingesting to kafka..." )
543- ingest_table_to_kafka (
544- feature_set = feature_set ,
545- table = table ,
546- max_workers = max_workers ,
547- disable_pbar = disable_progress_bar ,
548- chunk_size = chunk_size ,
549- timeout = timeout ,
550- )
551- else :
552- raise Exception (
553- f"Could not determine source type for feature set "
554- f'"{ feature_set .name } " with source type '
555- f'"{ feature_set .source .source_type } "'
556- )
568+ try :
569+ # Kafka configs
570+ brokers = feature_set .get_kafka_source_brokers ()
571+ topic = feature_set .get_kafka_source_topic ()
572+ producer = get_producer (brokers , row_count , disable_progress_bar )
573+
574+ # Loop optimization declarations
575+ produce = producer .produce
576+ flush = producer .flush
577+
578+ # Transform and push data to Kafka
579+ if feature_set .source .source_type == "Kafka" :
580+ for chunk in get_feature_row_chunks (
581+ file = tmp_table_name ,
582+ row_groups = list (range (pq_file .num_row_groups )),
583+ fs = feature_set ,
584+ max_workers = max_workers ):
585+
586+ # Push FeatureRow one chunk at a time to kafka
587+ for serialized_row in chunk :
588+ produce (topic = topic , value = serialized_row )
589+
590+ # Force a flush after each chunk
591+ flush (timeout = timeout )
592+
593+ # Remove chunk from memory
594+ del chunk
595+
596+ else :
597+ raise Exception (
598+ f"Could not determine source type for feature set "
599+ f'"{ feature_set .name } " with source type '
600+ f'"{ feature_set .source .source_type } "'
601+ )
602+
603+ # Print ingestion statistics
604+ producer .print_results ()
605+ finally :
606+ # Remove parquet file(s) that were created earlier
607+ print ("Removing temporary file(s)..." )
608+ os .remove (tmp_table_name )
609+
610+ return None
557611
558612
559613def _build_feature_set_request (feature_ids : List [str ]) -> List [FeatureSetRequest ]:
@@ -583,18 +637,38 @@ def _build_feature_set_request(feature_ids: List[str]) -> List[FeatureSetRequest
583637 return list (feature_set_request .values ())
584638
585639
586- def _read_table_from_source (source : Union [pd .DataFrame , str ]) -> pa .lib .Table :
640+ def _read_table_from_source (
641+ source : Union [pd .DataFrame , str ],
642+ chunk_size : int ,
643+ max_workers : int
644+ ) -> str :
587645 """
588646 Infers a data source type (path or Pandas Dataframe) and reads it in as
589647 a PyArrow Table.
590648
649+ The PyArrow Table that is read will be written to a parquet file with row
650+ group size determined by the minimum of:
651+ * (table.num_rows / max_workers)
652+ * chunk_size
653+
654+ The parquet file that is created will be passed as file path to the
655+ multiprocessing pool workers.
656+
591657 Args:
592- source: Either a string path or Pandas Dataframe
658+ source (Union[pd.DataFrame, str]):
659+ Either a string path or Pandas DataFrame.
660+
661+ chunk_size (int):
662+ Number of worker processes to use to encode values.
663+
664+ max_workers (int):
665+ Amount of rows to load and ingest at a time.
593666
594667 Returns:
595- PyArrow table
668+ str: Path to parquet file that was created.
596669 """
597- # Pandas dataframe detected
670+
671+ # Pandas DataFrame detected
598672 if isinstance (source , pd .DataFrame ):
599673 table = pa .Table .from_pandas (df = source )
600674
@@ -618,4 +692,14 @@ def _read_table_from_source(source: Union[pd.DataFrame, str]) -> pa.lib.Table:
618692
619693 # Ensure that PyArrow table is initialised
620694 assert isinstance (table , pa .lib .Table )
621- return table
695+
696+ # Write table as parquet file with a specified row_group_size
697+ tmp_table_name = f"{ int (time .time ())} .parquet"
698+ row_group_size = min (int (table .num_rows / max_workers ), chunk_size )
699+ pq .write_table (table = table , where = tmp_table_name ,
700+ row_group_size = row_group_size )
701+
702+ # Remove table from memory
703+ del table
704+
705+ return tmp_table_name
0 commit comments