11import logging
22import math
3+ import multiprocessing
34import os
45import time
56import numpy as np
2021FEAST_CORE_URL_ENV_KEY = "FEAST_CORE_URL" # type: str
2122BATCH_FEATURE_REQUEST_WAIT_TIME_SECONDS = 300
2223CPU_COUNT = os .cpu_count () # type: int
24+ KAFKA_CHUNK_PRODUCTION_TIMEOUT = 120 # type: int
2325
2426
2527def _kafka_feature_row_chunk_producer (
26- feature_row_chunk_queue : Queue , chunk_count : int , brokers , topic , progress_bar : tqdm
28+ feature_row_chunk_queue : Queue ,
29+ chunk_count : int ,
30+ brokers ,
31+ topic ,
32+ ctx : dict ,
33+ pbar : tqdm ,
2734):
28- processed_chunks = 0
29- rows_processed = 0
35+ # Callback for failed production to Kafka
36+ def on_error (e ):
37+ # Save last exception
38+ ctx ["last_exception" ] = e
39+
40+ # Increment error count
41+ if "error_count" in ctx :
42+ ctx ["error_count" ] += 1
43+ else :
44+ ctx ["error_count" ] = 1
45+
46+ # Callback for succeeded production to Kafka
47+ def on_success (meta ):
48+ pbar .update ()
49+
3050 producer = KafkaProducer (bootstrap_servers = brokers )
51+ processed_chunks = 0
52+
3153 while processed_chunks < chunk_count :
3254 if feature_row_chunk_queue .empty ():
3355 time .sleep (0.1 )
3456 else :
3557 feature_rows = feature_row_chunk_queue .get ()
36- rows_processed += len (feature_rows )
3758 for row in feature_rows :
38- progress_bar .update ()
39- producer .send (topic , row .SerializeToString ())
40-
41- producer .flush ()
42- progress_bar .refresh ()
59+ producer .send (topic , row .SerializeToString ()).add_callback (
60+ on_success
61+ ).add_errback (on_error )
62+ producer .flush (timeout = KAFKA_CHUNK_PRODUCTION_TIMEOUT )
4363 processed_chunks += 1
64+ pbar .refresh ()
65+ # Using progress bar as counter is much faster than incrementing dict
66+ ctx ["success_count" ] = pbar .n
67+ pbar .close ()
4468
4569
4670def _encode_chunk (df : pd .DataFrame , feature_set : FeatureSet ):
@@ -52,12 +76,11 @@ def ingest_kafka(
5276 feature_set : FeatureSet ,
5377 dataframe : pd .DataFrame ,
5478 max_workers : int ,
79+ timeout : int = None ,
5580 chunk_size : int = 5000 ,
56- disable_progress_bar : bool = False ,
81+ disable_pbar : bool = False ,
5782):
58- progress_bar = tqdm (
59- unit = "rows" , total = dataframe .shape [0 ], disable = disable_progress_bar
60- )
83+ pbar = tqdm (unit = "rows" , total = dataframe .shape [0 ], disable = disable_pbar )
6184
6285 # Validate feature set schema
6386 validate_dataframe (dataframe , feature_set )
@@ -66,23 +89,30 @@ def ingest_kafka(
6689 num_chunks = max (dataframe .shape [0 ] / max (chunk_size , 100 ), 1 )
6790 df_chunks = np .array_split (dataframe , num_chunks )
6891
69- # Create queue through which encoding and ingestion will coordinate
92+ # Create queue through which encoding and production will coordinate
7093 chunk_queue = Queue ()
7194
72- # Start ingestion process to push feature rows to Kafka
95+ # Create a context object to send and receive information across processes
96+ ctx = multiprocessing .Manager ().dict (
97+ {"success_count" : 0 , "error_count" : 0 , "last_exception" : "" }
98+ )
99+
100+ # Create producer to push feature rows to Kafka
73101 ingestion_process = Process (
74102 target = _kafka_feature_row_chunk_producer ,
75103 args = (
76104 chunk_queue ,
77105 num_chunks ,
78106 feature_set .get_kafka_source_brokers (),
79107 feature_set .get_kafka_source_topic (),
80- progress_bar ,
108+ ctx ,
109+ pbar ,
81110 ),
82111 )
83112
84113 try :
85114 # Start ingestion process
115+ print (f"\n Ingestion started for { feature_set .name } :{ feature_set .version } " )
86116 ingestion_process .start ()
87117
88118 # Create a pool of workers to convert df chunks into feature row chunks
@@ -95,17 +125,32 @@ def ingest_kafka(
95125 _encode_chunk ,
96126 zip (df_chunks [chunks_done :chunks_to ], repeat (feature_set )),
97127 )
128+
129+ # Push feature row encoded chunks onto queue
98130 for result in results .get ():
99131 chunk_queue .put (result )
100132 chunks_done += max_workers
101133 except Exception as ex :
102134 _logger .error (f"Exception occurred: { ex } " )
103135 finally :
104- ingestion_process .join ()
105- rows_ingested = progress_bar .total
106- progress_bar .close ()
136+ # Wait for ingestion to complete, or time out
137+ ingestion_process .join (timeout = timeout )
138+ failed_message = (
139+ ""
140+ if ctx ["error_count" ] == 0
141+ else f"\n Fail: { ctx ['error_count' ]} /{ dataframe .shape [0 ]} "
142+ )
143+
144+ last_exception_message = (
145+ ""
146+ if ctx ["last_exception" ] == ""
147+ else f"\n Last exception:\n { ctx ['last_exception' ]} "
148+ )
107149 print (
108- f"\n Ingested { rows_ingested } rows into { feature_set .name } :{ feature_set .version } "
150+ f"\n Ingestion statistics:"
151+ f"\n Success: { ctx ['success_count' ]} /{ dataframe .shape [0 ]} "
152+ f"{ failed_message } "
153+ f"{ last_exception_message } "
109154 )
110155
111156
0 commit comments