Skip to content

Commit 561b621

Browse files
junhui096woop
andauthored
Kafka producer should raise an exception when it fails to connect to broker (feast-dev#636)
* Added exception to flush when produce fails with unit test or with messages in queue. * Fix: Throw exception in callback * Removed error_count property in abstract_producer Co-authored-by: Willem Pienaar <6728866+woop@users.noreply.github.com>
1 parent dfc81b9 commit 561b621

File tree

2 files changed

+42
-24
lines changed

2 files changed

+42
-24
lines changed

sdk/python/feast/loaders/abstract_producer.py

Lines changed: 10 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@ class AbstractProducer:
2525
def __init__(self, brokers: str, row_count: int, disable_progress_bar: bool):
2626
self.brokers = brokers
2727
self.row_count = row_count
28-
self.error_count = 0
29-
self.last_exception = ""
3028

3129
# Progress bar will always display average rate
3230
self.pbar = tqdm(
@@ -45,8 +43,7 @@ def _inc_pbar(self, meta):
4543
self.pbar.update(1)
4644

4745
def _set_error(self, exception: str):
48-
self.error_count += 1
49-
self.last_exception = exception
46+
raise Exception(exception)
5047

5148
def print_results(self) -> None:
5249
"""
@@ -62,24 +59,7 @@ def print_results(self) -> None:
6259

6360
print("Ingestion complete!")
6461

65-
failed_message = (
66-
""
67-
if self.error_count == 0
68-
else f"\nFail: {self.error_count / self.row_count}"
69-
)
70-
71-
last_exception_message = (
72-
""
73-
if self.last_exception == ""
74-
else f"\nLast exception:\n{self.last_exception}"
75-
)
76-
77-
print(
78-
f"\nIngestion statistics:"
79-
f"\nSuccess: {self.pbar.n}/{self.row_count}"
80-
f"{failed_message}"
81-
f"{last_exception_message}"
82-
)
62+
print(f"\nIngestion statistics:" f"\nSuccess: {self.pbar.n}/{self.row_count}")
8363
return None
8464

8565

@@ -129,7 +109,10 @@ def flush(self, timeout: Optional[int]):
129109
Returns:
130110
int: Number of messages still in queue.
131111
"""
132-
return self.producer.flush(timeout=timeout)
112+
messages = self.producer.flush(timeout=timeout)
113+
if messages:
114+
raise Exception("Not all Kafka messages are successfully delivered.")
115+
return messages
133116

134117
def _delivery_callback(self, err: str, msg) -> None:
135118
"""
@@ -200,7 +183,10 @@ def flush(self, timeout: Optional[int]):
200183
KafkaTimeoutError: failure to flush buffered records within the
201184
provided timeout
202185
"""
203-
return self.producer.flush(timeout=timeout)
186+
messages = self.producer.flush(timeout=timeout)
187+
if messages:
188+
raise Exception("Not all Kafka messages are successfully delivered.")
189+
return messages
204190

205191

206192
def get_producer(

sdk/python/tests/test_client.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -601,6 +601,38 @@ def test_feature_set_ingest_success(self, dataframe, test_client, mocker):
601601
# Ingest data into Feast
602602
test_client.ingest("driver-feature-set", dataframe)
603603

604+
@pytest.mark.parametrize(
605+
"dataframe,test_client,exception",
606+
[(dataframes.GOOD, pytest.lazy_fixture("client"), Exception)],
607+
)
608+
def test_feature_set_ingest_throws_exception_if_kafka_down(
609+
self, dataframe, test_client, exception, mocker
610+
):
611+
612+
test_client.set_project("project1")
613+
driver_fs = FeatureSet(
614+
"driver-feature-set",
615+
source=KafkaSource(brokers="localhost:4412", topic="test"),
616+
)
617+
driver_fs.add(Feature(name="feature_1", dtype=ValueType.FLOAT))
618+
driver_fs.add(Feature(name="feature_2", dtype=ValueType.STRING))
619+
driver_fs.add(Feature(name="feature_3", dtype=ValueType.INT64))
620+
driver_fs.add(Entity(name="entity_id", dtype=ValueType.INT64))
621+
622+
# Register with Feast core
623+
test_client.apply(driver_fs)
624+
driver_fs = driver_fs.to_proto()
625+
driver_fs.meta.status = FeatureSetStatusProto.STATUS_READY
626+
627+
mocker.patch.object(
628+
test_client._core_service_stub,
629+
"GetFeatureSet",
630+
return_value=GetFeatureSetResponse(feature_set=driver_fs),
631+
)
632+
633+
with pytest.raises(exception):
634+
test_client.ingest("driver-feature-set", dataframe)
635+
604636
@pytest.mark.parametrize(
605637
"dataframe,exception,test_client",
606638
[

0 commit comments

Comments
 (0)