Skip to content

Commit 437b350

Browse files
voonhousfeast-ci-bot
authored andcommitted
Rebasing changes (feast-dev#355)
1 parent 98ea901 commit 437b350

File tree

9 files changed

+857
-299
lines changed

9 files changed

+857
-299
lines changed

.prow/config.yaml

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -145,18 +145,18 @@ presubmits:
145145
postsubmits:
146146
gojek/feast:
147147
- name: publish-python-sdk
148-
decorate: true
148+
decorate: true
149149
spec:
150150
containers:
151151
- image: python:3
152152
command:
153153
- sh
154-
- -c
154+
- -c
155155
- |
156156
.prow/scripts/publish-python-sdk.sh \
157157
--directory-path sdk/python --repository pypi
158158
volumeMounts:
159-
- name: pypirc
159+
- name: pypirc
160160
mountPath: /root/.pypirc
161161
subPath: .pypirc
162162
readOnly: true
@@ -170,7 +170,7 @@ postsubmits:
170170
- ^v(0|[1-9]\d*)\.(0|[1-9]\d*)\.(0|[1-9]\d*)(-(0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*)(\.(0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*)?(\+[0-9a-zA-Z-]+(\.[0-9a-zA-Z-]+)*)?$
171171

172172
- name: publish-docker-images
173-
decorate: true
173+
decorate: true
174174
spec:
175175
containers:
176176
- image: google/cloud-sdk:273.0.0
@@ -182,14 +182,14 @@ postsubmits:
182182
--archive-uri gs://feast-templocation-kf-feast/.m2.2019-10-24.tar \
183183
--output-dir $PWD/
184184
185-
if [ $PULL_BASE_REF == "master" ]; then
186-
185+
if [ $PULL_BASE_REF == "master" ]; then
186+
187187
.prow/scripts/publish-docker-image.sh \
188188
--repository gcr.io/kf-feast/feast-core \
189189
--tag dev \
190190
--file infra/docker/core/Dockerfile \
191191
--google-service-account-file /etc/gcloud/service-account.json
192-
192+
193193
.prow/scripts/publish-docker-image.sh \
194194
--repository gcr.io/kf-feast/feast-serving \
195195
--tag dev \
@@ -203,13 +203,13 @@ postsubmits:
203203
docker push gcr.io/kf-feast/feast-serving:${PULL_BASE_SHA}
204204
205205
else
206-
206+
207207
.prow/scripts/publish-docker-image.sh \
208208
--repository gcr.io/kf-feast/feast-core \
209209
--tag ${PULL_BASE_REF:1} \
210210
--file infra/docker/core/Dockerfile \
211211
--google-service-account-file /etc/gcloud/service-account.json
212-
212+
213213
.prow/scripts/publish-docker-image.sh \
214214
--repository gcr.io/kf-feast/feast-serving \
215215
--tag ${PULL_BASE_REF:1} \
@@ -244,7 +244,7 @@ postsubmits:
244244
- ^v(0|[1-9]\d*)\.(0|[1-9]\d*)\.(0|[1-9]\d*)(-(0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*)(\.(0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*)?(\+[0-9a-zA-Z-]+(\.[0-9a-zA-Z-]+)*)?$
245245

246246
- name: publish-helm-chart
247-
decorate: true
247+
decorate: true
248248
spec:
249249
containers:
250250
- image: google/cloud-sdk:273.0.0-slim
@@ -253,7 +253,7 @@ postsubmits:
253253
- -c
254254
- |
255255
gcloud auth activate-service-account --key-file /etc/gcloud/service-account.json
256-
256+
257257
curl -s https://get.helm.sh/helm-v2.16.1-linux-amd64.tar.gz | tar -C /tmp -xz
258258
mv /tmp/linux-amd64/helm /usr/bin/helm
259259
helm init --client-only

sdk/__init__.py

Whitespace-only changes.

sdk/python/feast/client.py

Lines changed: 140 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@
1515

1616
import logging
1717
import os
18-
import sys
18+
import time
1919
from collections import OrderedDict
2020
from typing import Dict, Union
2121
from typing import List
22+
2223
import grpc
23-
import time
2424
import pandas as pd
2525
import pyarrow as pa
2626
import pyarrow.parquet as pq
@@ -35,11 +35,12 @@
3535
)
3636
from feast.core.CoreService_pb2_grpc import CoreServiceStub
3737
from feast.core.FeatureSet_pb2 import FeatureSetStatus
38-
from feast.exceptions import format_grpc_exception
3938
from feast.feature_set import FeatureSet, Entity
4039
from feast.job import Job
40+
from feast.loaders.abstract_producer import get_producer
4141
from feast.loaders.file import export_dataframe_to_staging_location
42-
from feast.loaders.ingest import ingest_table_to_kafka
42+
from feast.loaders.ingest import KAFKA_CHUNK_PRODUCTION_TIMEOUT
43+
from feast.loaders.ingest import get_feature_row_chunks
4344
from feast.serving.ServingService_pb2 import GetFeastServingInfoResponse
4445
from feast.serving.ServingService_pb2 import (
4546
GetOnlineFeaturesRequest,
@@ -259,7 +260,7 @@ def _apply_feature_set(self, feature_set: FeatureSet):
259260
print(f"No change detected or applied: {feature_set.name}")
260261

261262
# Deep copy from the returned feature set to the local feature set
262-
feature_set.update_from_feature_set(applied_fs)
263+
feature_set._update_from_feature_set(applied_fs)
263264

264265
def list_feature_sets(self) -> List[FeatureSet]:
265266
"""
@@ -472,35 +473,55 @@ def get_online_features(
472473
) # type: GetOnlineFeaturesResponse
473474

474475
def ingest(
475-
self,
476-
feature_set: Union[str, FeatureSet],
477-
source: Union[pd.DataFrame, str],
478-
version: int = None,
479-
force_update: bool = False,
480-
max_workers: int = CPU_COUNT,
481-
disable_progress_bar: bool = False,
482-
chunk_size: int = 5000,
483-
timeout: int = None,
484-
):
476+
self,
477+
feature_set: Union[str, FeatureSet],
478+
source: Union[pd.DataFrame, str],
479+
chunk_size: int = 10000,
480+
version: int = None,
481+
force_update: bool = False,
482+
max_workers: int = max(CPU_COUNT - 1, 1),
483+
disable_progress_bar: bool = False,
484+
timeout: int = KAFKA_CHUNK_PRODUCTION_TIMEOUT
485+
) -> None:
485486
"""
486487
Loads feature data into Feast for a specific feature set.
487488
488489
Args:
489-
feature_set: Name of feature set or a feature set object
490-
source: Either a file path or Pandas Dataframe to ingest into Feast
490+
feature_set (typing.Union[str, FeatureSet]):
491+
Feature set object or the string name of the feature set
492+
(without a version).
493+
494+
source (typing.Union[pd.DataFrame, str]):
495+
Either a file path or Pandas Dataframe to ingest into Feast
491496
Files that are currently supported:
492-
* parquet
493-
* csv
494-
* json
495-
version: Feature set version
496-
force_update: Automatically update feature set based on source data
497-
prior to ingesting. This will also register changes to Feast
498-
max_workers: Number of worker processes to use to encode values
499-
disable_progress_bar: Disable printing of progress statistics
500-
chunk_size: Maximum amount of rows to load into memory and ingest at
501-
a time
502-
timeout: Seconds to wait before ingestion times out
497+
* parquet
498+
* csv
499+
* json
500+
501+
chunk_size (int):
502+
Amount of rows to load and ingest at a time.
503+
504+
version (int):
505+
Feature set version.
506+
507+
force_update (bool):
508+
Automatically update feature set based on source data prior to
509+
ingesting. This will also register changes to Feast.
510+
511+
max_workers (int):
512+
Number of worker processes to use to encode values.
513+
514+
disable_progress_bar (bool):
515+
Disable printing of progress statistics.
516+
517+
timeout (int):
518+
Timeout in seconds to wait for completion.
519+
520+
Returns:
521+
None:
522+
None
503523
"""
524+
504525
if isinstance(feature_set, FeatureSet):
505526
name = feature_set.name
506527
if version is None:
@@ -510,15 +531,21 @@ def ingest(
510531
else:
511532
raise Exception(f"Feature set name must be provided")
512533

513-
table = _read_table_from_source(source)
534+
# Read table and get row count
535+
tmp_table_name = _read_table_from_source(
536+
source, chunk_size, max_workers
537+
)
538+
539+
pq_file = pq.ParquetFile(tmp_table_name)
514540

515-
# Update the feature set based on DataFrame schema
516-
if force_update:
517-
# Use a small as reference DataFrame to infer fields
518-
ref_df = table.to_batches(max_chunksize=20)[0].to_pandas()
541+
row_count = pq_file.metadata.num_rows
519542

520-
feature_set.infer_fields_from_df(
521-
ref_df, discard_unused_fields=True, replace_existing_features=True
543+
# Update the feature set based on PyArrow table of first row group
544+
if force_update:
545+
feature_set.infer_fields_from_pa(
546+
table=pq_file.read_row_group(0),
547+
discard_unused_fields=True,
548+
replace_existing_features=True
522549
)
523550
self.apply(feature_set)
524551
current_time = time.time()
@@ -538,22 +565,49 @@ def ingest(
538565
if timeout is not None:
539566
timeout = timeout - int(time.time() - current_time)
540567

541-
if feature_set.source.source_type == "Kafka":
542-
print("Ingesting to kafka...")
543-
ingest_table_to_kafka(
544-
feature_set=feature_set,
545-
table=table,
546-
max_workers=max_workers,
547-
disable_pbar=disable_progress_bar,
548-
chunk_size=chunk_size,
549-
timeout=timeout,
550-
)
551-
else:
552-
raise Exception(
553-
f"Could not determine source type for feature set "
554-
f'"{feature_set.name}" with source type '
555-
f'"{feature_set.source.source_type}"'
556-
)
568+
try:
569+
# Kafka configs
570+
brokers = feature_set.get_kafka_source_brokers()
571+
topic = feature_set.get_kafka_source_topic()
572+
producer = get_producer(brokers, row_count, disable_progress_bar)
573+
574+
# Loop optimization declarations
575+
produce = producer.produce
576+
flush = producer.flush
577+
578+
# Transform and push data to Kafka
579+
if feature_set.source.source_type == "Kafka":
580+
for chunk in get_feature_row_chunks(
581+
file=tmp_table_name,
582+
row_groups=list(range(pq_file.num_row_groups)),
583+
fs=feature_set,
584+
max_workers=max_workers):
585+
586+
# Push FeatureRow one chunk at a time to kafka
587+
for serialized_row in chunk:
588+
produce(topic=topic, value=serialized_row)
589+
590+
# Force a flush after each chunk
591+
flush(timeout=timeout)
592+
593+
# Remove chunk from memory
594+
del chunk
595+
596+
else:
597+
raise Exception(
598+
f"Could not determine source type for feature set "
599+
f'"{feature_set.name}" with source type '
600+
f'"{feature_set.source.source_type}"'
601+
)
602+
603+
# Print ingestion statistics
604+
producer.print_results()
605+
finally:
606+
# Remove parquet file(s) that were created earlier
607+
print("Removing temporary file(s)...")
608+
os.remove(tmp_table_name)
609+
610+
return None
557611

558612

559613
def _build_feature_set_request(feature_ids: List[str]) -> List[FeatureSetRequest]:
@@ -583,18 +637,38 @@ def _build_feature_set_request(feature_ids: List[str]) -> List[FeatureSetRequest
583637
return list(feature_set_request.values())
584638

585639

586-
def _read_table_from_source(source: Union[pd.DataFrame, str]) -> pa.lib.Table:
640+
def _read_table_from_source(
641+
source: Union[pd.DataFrame, str],
642+
chunk_size: int,
643+
max_workers: int
644+
) -> str:
587645
"""
588646
Infers a data source type (path or Pandas Dataframe) and reads it in as
589647
a PyArrow Table.
590648
649+
The PyArrow Table that is read will be written to a parquet file with row
650+
group size determined by the minimum of:
651+
* (table.num_rows / max_workers)
652+
* chunk_size
653+
654+
The parquet file that is created will be passed as file path to the
655+
multiprocessing pool workers.
656+
591657
Args:
592-
source: Either a string path or Pandas Dataframe
658+
source (Union[pd.DataFrame, str]):
659+
Either a string path or Pandas DataFrame.
660+
661+
chunk_size (int):
662+
Number of worker processes to use to encode values.
663+
664+
max_workers (int):
665+
Amount of rows to load and ingest at a time.
593666
594667
Returns:
595-
PyArrow table
668+
str: Path to parquet file that was created.
596669
"""
597-
# Pandas dataframe detected
670+
671+
# Pandas DataFrame detected
598672
if isinstance(source, pd.DataFrame):
599673
table = pa.Table.from_pandas(df=source)
600674

@@ -618,4 +692,14 @@ def _read_table_from_source(source: Union[pd.DataFrame, str]) -> pa.lib.Table:
618692

619693
# Ensure that PyArrow table is initialised
620694
assert isinstance(table, pa.lib.Table)
621-
return table
695+
696+
# Write table as parquet file with a specified row_group_size
697+
tmp_table_name = f"{int(time.time())}.parquet"
698+
row_group_size = min(int(table.num_rows/max_workers), chunk_size)
699+
pq.write_table(table=table, where=tmp_table_name,
700+
row_group_size=row_group_size)
701+
702+
# Remove table from memory
703+
del table
704+
705+
return tmp_table_name

0 commit comments

Comments
 (0)