Skip to content

Commit 31b9ca4

Browse files
committed
Improved metrics during dataframe ingestion
1 parent 5856895 commit 31b9ca4

2 files changed

Lines changed: 69 additions & 21 deletions

File tree

sdk/python/feast/client.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -464,6 +464,7 @@ def ingest(
464464
max_workers: int = CPU_COUNT,
465465
disable_progress_bar: bool = False,
466466
chunk_size: int = 5000,
467+
timeout: int = None,
467468
):
468469
"""
469470
Loads data into Feast for a specific feature set.
@@ -485,6 +486,7 @@ def ingest(
485486
chunk_size: Number of rows per chunk to encode before ingesting to
486487
Feast
487488
"""
489+
488490
if isinstance(feature_set, FeatureSet):
489491
name = feature_set.name
490492
if version is None:
@@ -508,8 +510,9 @@ def ingest(
508510
feature_set=feature_set,
509511
dataframe=dataframe,
510512
max_workers=max_workers,
511-
disable_progress_bar=disable_progress_bar,
513+
disable_pbar=disable_progress_bar,
512514
chunk_size=chunk_size,
515+
timeout=timeout,
513516
)
514517
else:
515518
raise Exception(

sdk/python/feast/loaders/ingest.py

Lines changed: 65 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import logging
22
import math
3+
import multiprocessing
34
import os
45
import time
56
import numpy as np
@@ -20,27 +21,50 @@
2021
FEAST_CORE_URL_ENV_KEY = "FEAST_CORE_URL" # type: str
2122
BATCH_FEATURE_REQUEST_WAIT_TIME_SECONDS = 300
2223
CPU_COUNT = os.cpu_count() # type: int
24+
KAFKA_CHUNK_PRODUCTION_TIMEOUT = 120 # type: int
2325

2426

2527
def _kafka_feature_row_chunk_producer(
26-
feature_row_chunk_queue: Queue, chunk_count: int, brokers, topic, progress_bar: tqdm
28+
feature_row_chunk_queue: Queue,
29+
chunk_count: int,
30+
brokers,
31+
topic,
32+
ctx: dict,
33+
pbar: tqdm,
2734
):
28-
processed_chunks = 0
29-
rows_processed = 0
35+
# Callback for failed production to Kafka
36+
def on_error(e):
37+
# Save last exception
38+
ctx["last_exception"] = e
39+
40+
# Increment error count
41+
if "error_count" in ctx:
42+
ctx["error_count"] += 1
43+
else:
44+
ctx["error_count"] = 1
45+
46+
# Callback for succeeded production to Kafka
47+
def on_success(meta):
48+
pbar.update()
49+
3050
producer = KafkaProducer(bootstrap_servers=brokers)
51+
processed_chunks = 0
52+
3153
while processed_chunks < chunk_count:
3254
if feature_row_chunk_queue.empty():
3355
time.sleep(0.1)
3456
else:
3557
feature_rows = feature_row_chunk_queue.get()
36-
rows_processed += len(feature_rows)
3758
for row in feature_rows:
38-
progress_bar.update()
39-
producer.send(topic, row.SerializeToString())
40-
41-
producer.flush()
42-
progress_bar.refresh()
59+
producer.send(topic, row.SerializeToString()).add_callback(
60+
on_success
61+
).add_errback(on_error)
62+
producer.flush(timeout=KAFKA_CHUNK_PRODUCTION_TIMEOUT)
4363
processed_chunks += 1
64+
pbar.refresh()
65+
# Using progress bar as counter is much faster than incrementing dict
66+
ctx["success_count"] = pbar.n
67+
pbar.close()
4468

4569

4670
def _encode_chunk(df: pd.DataFrame, feature_set: FeatureSet):
@@ -52,12 +76,11 @@ def ingest_kafka(
5276
feature_set: FeatureSet,
5377
dataframe: pd.DataFrame,
5478
max_workers: int,
79+
timeout: int = None,
5580
chunk_size: int = 5000,
56-
disable_progress_bar: bool = False,
81+
disable_pbar: bool = False,
5782
):
58-
progress_bar = tqdm(
59-
unit="rows", total=dataframe.shape[0], disable=disable_progress_bar
60-
)
83+
pbar = tqdm(unit="rows", total=dataframe.shape[0], disable=disable_pbar)
6184

6285
# Validate feature set schema
6386
validate_dataframe(dataframe, feature_set)
@@ -66,23 +89,30 @@ def ingest_kafka(
6689
num_chunks = max(dataframe.shape[0] / max(chunk_size, 100), 1)
6790
df_chunks = np.array_split(dataframe, num_chunks)
6891

69-
# Create queue through which encoding and ingestion will coordinate
92+
# Create queue through which encoding and production will coordinate
7093
chunk_queue = Queue()
7194

72-
# Start ingestion process to push feature rows to Kafka
95+
# Create a context object to send and receive information across processes
96+
ctx = multiprocessing.Manager().dict(
97+
{"success_count": 0, "error_count": 0, "last_exception": ""}
98+
)
99+
100+
# Create producer to push feature rows to Kafka
73101
ingestion_process = Process(
74102
target=_kafka_feature_row_chunk_producer,
75103
args=(
76104
chunk_queue,
77105
num_chunks,
78106
feature_set.get_kafka_source_brokers(),
79107
feature_set.get_kafka_source_topic(),
80-
progress_bar,
108+
ctx,
109+
pbar,
81110
),
82111
)
83112

84113
try:
85114
# Start ingestion process
115+
print(f"\nIngestion started for {feature_set.name}:{feature_set.version}")
86116
ingestion_process.start()
87117

88118
# Create a pool of workers to convert df chunks into feature row chunks
@@ -95,17 +125,32 @@ def ingest_kafka(
95125
_encode_chunk,
96126
zip(df_chunks[chunks_done:chunks_to], repeat(feature_set)),
97127
)
128+
129+
# Push feature row encoded chunks onto queue
98130
for result in results.get():
99131
chunk_queue.put(result)
100132
chunks_done += max_workers
101133
except Exception as ex:
102134
_logger.error(f"Exception occurred: {ex}")
103135
finally:
104-
ingestion_process.join()
105-
rows_ingested = progress_bar.total
106-
progress_bar.close()
136+
# Wait for ingestion to complete, or time out
137+
ingestion_process.join(timeout=timeout)
138+
failed_message = (
139+
""
140+
if ctx["error_count"] == 0
141+
else f"\nFail: {ctx['error_count']}/{dataframe.shape[0]}"
142+
)
143+
144+
last_exception_message = (
145+
""
146+
if ctx["last_exception"] == ""
147+
else f"\nLast exception:\n{ctx['last_exception']}"
148+
)
107149
print(
108-
f"\nIngested {rows_ingested} rows into {feature_set.name}:{feature_set.version}"
150+
f"\nIngestion statistics:"
151+
f"\nSuccess: {ctx['success_count']}/{dataframe.shape[0]}"
152+
f"{failed_message}"
153+
f"{last_exception_message}"
109154
)
110155

111156

0 commit comments

Comments
 (0)