1313# limitations under the License.
1414import logging
1515import os
16+ import sys
1617from collections import OrderedDict
1718from typing import Dict , Union
1819from typing import List
20+ from urllib .parse import urlparse
21+
1922import grpc
2023import pandas as pd
21- from feast .loaders .ingest import ingest_kafka
22-
23- from feast .exceptions import format_grpc_exception
24+ import pyarrow as pa
25+ import pyarrow .parquet as pq
2426from feast .core .CoreService_pb2 import (
2527 GetFeastCoreVersionRequest ,
2628 ListFeatureSetsResponse ,
3133 GetFeatureSetResponse ,
3234)
3335from feast .core .CoreService_pb2_grpc import CoreServiceStub
36+ from feast .exceptions import format_grpc_exception
3437from feast .feature_set import FeatureSet , Entity
3538from feast .job import Job
39+ from feast .loaders .file import export_dataframe_to_staging_location
40+ from feast .loaders .ingest import ingest_table_to_kafka
41+ from feast .serving .ServingService_pb2 import GetFeastServingInfoResponse
3642from feast .serving .ServingService_pb2 import (
3743 GetOnlineFeaturesRequest ,
3844 GetBatchFeaturesRequest ,
4450 FeastServingType ,
4551)
4652from feast .serving .ServingService_pb2_grpc import ServingServiceStub
47- from feast .serving .ServingService_pb2 import GetFeastServingInfoResponse
48- from urllib .parse import urlparse
49- import uuid
50- import numpy as np
51- import sys
52- from feast .loaders .file import export_dataframe_to_staging_location
5353
5454_logger = logging .getLogger (__name__ )
5555
@@ -317,7 +317,7 @@ def get_batch_features(
317317
318318 Returns:
319319 Feast batch retrieval job: feast.job.Job
320-
320+
321321 Example usage:
322322 ============================================================
323323 >>> from feast import Client
@@ -458,7 +458,7 @@ def get_online_features(
458458 def ingest (
459459 self ,
460460 feature_set : Union [str , FeatureSet ],
461- dataframe : pd .DataFrame ,
461+ source : Union [ pd .DataFrame , str ] ,
462462 version : int = None ,
463463 force_update : bool = False ,
464464 max_workers : int = CPU_COUNT ,
@@ -471,20 +471,21 @@ def ingest(
471471
472472 :param feature_set: (str, FeatureSet) Feature set object or the
473473 string name of the feature set (without a version)
474- :param dataframe:
475- Pandas dataframe to load into Feast for this feature set
476- :param
477- version: (int) Version of the feature set for which this ingestion
478- should happen
479- :param force_update: (bool) Automatically update
480- feature set based on data frame before ingesting data
481- :param max_workers: Number of
482- worker processes to use to encode the dataframe
483- :param
484- disable_progress_bar: Disable progress bar during ingestion
485- :param
486- chunk_size: Number of rows per chunk to encode before ingesting to
487- Feast
474+ :param source:
475+ Either a file path or Pandas Dataframe to ingest into Feast
476+ Files that are currently supported:
477+ * parquet
478+ * csv
479+ * json
480+
481+ :param version: Feature set version
482+ :param force_update: (bool) Automatically update feature set based on
483+ source data prior to ingesting. This will also register changes to Feast
484+ :param max_workers: Number of worker processes to use to encode values
485+ :param disable_progress_bar: Disable printing of progress statistics
486+ :param timeout: Time in seconds before ingestion times out
487+ :param chunk_size: Amount of rows to load and ingest at a time
488+
488489 """
489490
490491 if isinstance (feature_set , FeatureSet ):
@@ -496,19 +497,24 @@ def ingest(
496497 else :
497498 raise Exception (f"Feature set name must be provided" )
498499
499- feature_set = self . get_feature_set ( name , version , fail_if_missing = True )
500+ table = _read_table_from_source ( source )
500501
501- # Update the feature set based on dataframe schema
502+ # Update the feature set based on DataFrame schema
502503 if force_update :
504+ # Use a small as reference DataFrame to infer fields
505+ ref_df = table .to_batches (max_chunksize = 20 )[0 ].to_pandas ()
506+
503507 feature_set .infer_fields_from_df (
504- dataframe , discard_unused_fields = True , replace_existing_features = True
508+ ref_df , discard_unused_fields = True , replace_existing_features = True
505509 )
506510 self .apply (feature_set )
507511
512+ feature_set = self .get_feature_set (name , version , fail_if_missing = True )
513+
508514 if feature_set .source .source_type == "Kafka" :
509- ingest_kafka (
515+ ingest_table_to_kafka (
510516 feature_set = feature_set ,
511- dataframe = dataframe ,
517+ table = table ,
512518 max_workers = max_workers ,
513519 disable_pbar = disable_progress_bar ,
514520 chunk_size = chunk_size ,
@@ -542,3 +548,39 @@ def _build_feature_set_request(feature_ids: List[str]) -> List[FeatureSetRequest
542548 )
543549 feature_set_request [feature_set ].feature_names .append (feature )
544550 return list (feature_set_request .values ())
551+
552+
553+ def _read_table_from_source (source : Union [pd .DataFrame , str ]) -> pa .lib .Table :
554+ """
555+ Infers a data source type (path or Pandas Dataframe) and reads it in as
556+ a PyArrow Table.
557+ :param source: Either a string path or Pandas dataframe
558+ :return: PyArrow table
559+ """
560+
561+ # Pandas dataframe detected
562+ if isinstance (source , pd .DataFrame ):
563+ table = pa .Table .from_pandas (df = source )
564+
565+ # Inferring a string path
566+ elif isinstance (source , str ):
567+ file_path = source
568+ filename , file_ext = os .path .splitext (file_path )
569+
570+ if ".csv" in file_ext :
571+ from pyarrow import csv
572+
573+ table = csv .read_csv (filename )
574+ elif ".json" in file_ext :
575+ from pyarrow import json
576+
577+ table = json .read_json (filename )
578+ else :
579+ table = pq .read_table (file_path )
580+ else :
581+ raise ValueError (f"Unknown data source provided for ingestion: { source } " )
582+
583+ # Ensure that PyArrow table is initialised
584+ assert isinstance (table , pa .lib .Table )
585+
586+ return table
0 commit comments