Skip to content

Commit 2abeabd

Browse files
voonhousfeast-ci-bot
authored andcommitted
Add minimal implementation of ingesting Parquet and CSV files (feast-dev#327)
* [WIP] Ingest Parquet in chunks * [WIP] Added iterator to create chunk of feature rows given pyarrow table * [WIP] Removed unused ingest_file in ingest.py * [WIP] Standardised docstring to use reStructuredText format with type * [WIP] Adding missing required module (pyarrow) * [WIP] Removed circular dependency to fix tests * [WIP] Removed circular dependency (Removed import FeatureSet) * [WIP] Reverting changes * [WIP] Re-implementing ingestion of parquet and csv files * Upgrade type inference to use value types instead of column types * Fix bug in applying a new feature set from a dataframe not returning a feature set * Upgrade ingestion to use a queue * Implement column value type inference * Add e2e test for file based ingestion * [WIP] Changing string for easier debugging * Merge ingest and ingest_file methods on Feast client * Fixed typo in typing * Remove dataframe keyword from tests * Add all_types_parquet.yaml * Fix broken tests for new ingestion client
1 parent 60db2ff commit 2abeabd

10 files changed

Lines changed: 482 additions & 172 deletions

File tree

sdk/python/feast/client.py

Lines changed: 72 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,16 @@
1313
# limitations under the License.
1414
import logging
1515
import os
16+
import sys
1617
from collections import OrderedDict
1718
from typing import Dict, Union
1819
from typing import List
20+
from urllib.parse import urlparse
21+
1922
import grpc
2023
import pandas as pd
21-
from feast.loaders.ingest import ingest_kafka
22-
23-
from feast.exceptions import format_grpc_exception
24+
import pyarrow as pa
25+
import pyarrow.parquet as pq
2426
from feast.core.CoreService_pb2 import (
2527
GetFeastCoreVersionRequest,
2628
ListFeatureSetsResponse,
@@ -31,8 +33,12 @@
3133
GetFeatureSetResponse,
3234
)
3335
from feast.core.CoreService_pb2_grpc import CoreServiceStub
36+
from feast.exceptions import format_grpc_exception
3437
from feast.feature_set import FeatureSet, Entity
3538
from feast.job import Job
39+
from feast.loaders.file import export_dataframe_to_staging_location
40+
from feast.loaders.ingest import ingest_table_to_kafka
41+
from feast.serving.ServingService_pb2 import GetFeastServingInfoResponse
3642
from feast.serving.ServingService_pb2 import (
3743
GetOnlineFeaturesRequest,
3844
GetBatchFeaturesRequest,
@@ -44,12 +50,6 @@
4450
FeastServingType,
4551
)
4652
from feast.serving.ServingService_pb2_grpc import ServingServiceStub
47-
from feast.serving.ServingService_pb2 import GetFeastServingInfoResponse
48-
from urllib.parse import urlparse
49-
import uuid
50-
import numpy as np
51-
import sys
52-
from feast.loaders.file import export_dataframe_to_staging_location
5353

5454
_logger = logging.getLogger(__name__)
5555

@@ -317,7 +317,7 @@ def get_batch_features(
317317
318318
Returns:
319319
Feast batch retrieval job: feast.job.Job
320-
320+
321321
Example usage:
322322
============================================================
323323
>>> from feast import Client
@@ -458,7 +458,7 @@ def get_online_features(
458458
def ingest(
459459
self,
460460
feature_set: Union[str, FeatureSet],
461-
dataframe: pd.DataFrame,
461+
source: Union[pd.DataFrame, str],
462462
version: int = None,
463463
force_update: bool = False,
464464
max_workers: int = CPU_COUNT,
@@ -471,20 +471,21 @@ def ingest(
471471
472472
:param feature_set: (str, FeatureSet) Feature set object or the
473473
string name of the feature set (without a version)
474-
:param dataframe:
475-
Pandas dataframe to load into Feast for this feature set
476-
:param
477-
version: (int) Version of the feature set for which this ingestion
478-
should happen
479-
:param force_update: (bool) Automatically update
480-
feature set based on data frame before ingesting data
481-
:param max_workers: Number of
482-
worker processes to use to encode the dataframe
483-
:param
484-
disable_progress_bar: Disable progress bar during ingestion
485-
:param
486-
chunk_size: Number of rows per chunk to encode before ingesting to
487-
Feast
474+
:param source:
475+
Either a file path or Pandas Dataframe to ingest into Feast
476+
Files that are currently supported:
477+
* parquet
478+
* csv
479+
* json
480+
481+
:param version: Feature set version
482+
:param force_update: (bool) Automatically update feature set based on
483+
source data prior to ingesting. This will also register changes to Feast
484+
:param max_workers: Number of worker processes to use to encode values
485+
:param disable_progress_bar: Disable printing of progress statistics
486+
:param timeout: Time in seconds before ingestion times out
487+
:param chunk_size: Amount of rows to load and ingest at a time
488+
488489
"""
489490

490491
if isinstance(feature_set, FeatureSet):
@@ -496,19 +497,24 @@ def ingest(
496497
else:
497498
raise Exception(f"Feature set name must be provided")
498499

499-
feature_set = self.get_feature_set(name, version, fail_if_missing=True)
500+
table = _read_table_from_source(source)
500501

501-
# Update the feature set based on dataframe schema
502+
# Update the feature set based on DataFrame schema
502503
if force_update:
504+
# Use a small as reference DataFrame to infer fields
505+
ref_df = table.to_batches(max_chunksize=20)[0].to_pandas()
506+
503507
feature_set.infer_fields_from_df(
504-
dataframe, discard_unused_fields=True, replace_existing_features=True
508+
ref_df, discard_unused_fields=True, replace_existing_features=True
505509
)
506510
self.apply(feature_set)
507511

512+
feature_set = self.get_feature_set(name, version, fail_if_missing=True)
513+
508514
if feature_set.source.source_type == "Kafka":
509-
ingest_kafka(
515+
ingest_table_to_kafka(
510516
feature_set=feature_set,
511-
dataframe=dataframe,
517+
table=table,
512518
max_workers=max_workers,
513519
disable_pbar=disable_progress_bar,
514520
chunk_size=chunk_size,
@@ -542,3 +548,39 @@ def _build_feature_set_request(feature_ids: List[str]) -> List[FeatureSetRequest
542548
)
543549
feature_set_request[feature_set].feature_names.append(feature)
544550
return list(feature_set_request.values())
551+
552+
553+
def _read_table_from_source(source: Union[pd.DataFrame, str]) -> pa.lib.Table:
554+
"""
555+
Infers a data source type (path or Pandas Dataframe) and reads it in as
556+
a PyArrow Table.
557+
:param source: Either a string path or Pandas dataframe
558+
:return: PyArrow table
559+
"""
560+
561+
# Pandas dataframe detected
562+
if isinstance(source, pd.DataFrame):
563+
table = pa.Table.from_pandas(df=source)
564+
565+
# Inferring a string path
566+
elif isinstance(source, str):
567+
file_path = source
568+
filename, file_ext = os.path.splitext(file_path)
569+
570+
if ".csv" in file_ext:
571+
from pyarrow import csv
572+
573+
table = csv.read_csv(filename)
574+
elif ".json" in file_ext:
575+
from pyarrow import json
576+
577+
table = json.read_json(filename)
578+
else:
579+
table = pq.read_table(file_path)
580+
else:
581+
raise ValueError(f"Unknown data source provided for ingestion: {source}")
582+
583+
# Ensure that PyArrow table is initialised
584+
assert isinstance(table, pa.lib.Table)
585+
586+
return table

sdk/python/feast/feature_set.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,7 @@ def infer_fields_from_df(
223223
replace_existing_features: bool = False,
224224
replace_existing_entities: bool = False,
225225
discard_unused_fields: bool = False,
226+
rows_to_sample: int = 100,
226227
):
227228
"""
228229
Adds fields (Features or Entities) to a feature set based on the schema
@@ -318,8 +319,10 @@ def infer_fields_from_df(
318319

319320
# Store this field as a feature
320321
new_fields[column] = Feature(
321-
name=column, dtype=pandas_dtype_to_feast_value_type(df[column].dtype)
322+
name=column,
323+
dtype=self._infer_pd_column_type(column, df[column], rows_to_sample),
322324
)
325+
323326
output_log += f"{type(new_fields[column]).__name__} {new_fields[column].name} ({new_fields[column].dtype}) added from dataframe.\n"
324327

325328
# Discard unused fields from feature set
@@ -336,6 +339,34 @@ def infer_fields_from_df(
336339
self._fields = new_fields
337340
print(output_log)
338341

342+
def _infer_pd_column_type(self, column, series, rows_to_sample):
343+
dtype = None
344+
sample_count = 0
345+
346+
# Loop over all rows for this column to infer types
347+
for key, value in series.iteritems():
348+
sample_count += 1
349+
# Stop sampling at the row limit
350+
if sample_count > rows_to_sample:
351+
continue
352+
353+
# Infer the specific type for this row
354+
current_dtype = pandas_dtype_to_feast_value_type(name=column, value=value)
355+
356+
# Make sure the type is consistent for column
357+
if dtype:
358+
if dtype != current_dtype:
359+
raise ValueError(
360+
f"Type mismatch detected in column {column}. Both "
361+
f"the types {current_dtype} and {dtype} "
362+
f"have been found."
363+
)
364+
else:
365+
# Store dtype in field to type map if it isnt already
366+
dtype = current_dtype
367+
368+
return dtype
369+
339370
def _update_from_feature_set(self, feature_set, is_dirty: bool = True):
340371

341372
self.name = feature_set.name

0 commit comments

Comments
 (0)