Skip to content

Commit

Permalink
Kafka producer should raise an exception when it fails to connect to …
Browse files Browse the repository at this point in the history
…broker (#636)

* Added exception to flush when produce fails with unit test or with messages in queue.

* Fix: Throw exception in callback

* Removed error_count property in abstract_producer

Co-authored-by: Willem Pienaar <6728866+woop@users.noreply.github.com>
  • Loading branch information
junhui096 and woop authored May 14, 2020
1 parent dfc81b9 commit 561b621
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 24 deletions.
34 changes: 10 additions & 24 deletions sdk/python/feast/loaders/abstract_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ class AbstractProducer:
def __init__(self, brokers: str, row_count: int, disable_progress_bar: bool):
self.brokers = brokers
self.row_count = row_count
self.error_count = 0
self.last_exception = ""

# Progress bar will always display average rate
self.pbar = tqdm(
Expand All @@ -45,8 +43,7 @@ def _inc_pbar(self, meta):
self.pbar.update(1)

def _set_error(self, exception: str):
self.error_count += 1
self.last_exception = exception
raise Exception(exception)

def print_results(self) -> None:
"""
Expand All @@ -62,24 +59,7 @@ def print_results(self) -> None:

print("Ingestion complete!")

failed_message = (
""
if self.error_count == 0
else f"\nFail: {self.error_count / self.row_count}"
)

last_exception_message = (
""
if self.last_exception == ""
else f"\nLast exception:\n{self.last_exception}"
)

print(
f"\nIngestion statistics:"
f"\nSuccess: {self.pbar.n}/{self.row_count}"
f"{failed_message}"
f"{last_exception_message}"
)
print(f"\nIngestion statistics:" f"\nSuccess: {self.pbar.n}/{self.row_count}")
return None


Expand Down Expand Up @@ -129,7 +109,10 @@ def flush(self, timeout: Optional[int]):
Returns:
int: Number of messages still in queue.
"""
return self.producer.flush(timeout=timeout)
messages = self.producer.flush(timeout=timeout)
if messages:
raise Exception("Not all Kafka messages are successfully delivered.")
return messages

def _delivery_callback(self, err: str, msg) -> None:
"""
Expand Down Expand Up @@ -200,7 +183,10 @@ def flush(self, timeout: Optional[int]):
KafkaTimeoutError: failure to flush buffered records within the
provided timeout
"""
return self.producer.flush(timeout=timeout)
messages = self.producer.flush(timeout=timeout)
if messages:
raise Exception("Not all Kafka messages are successfully delivered.")
return messages


def get_producer(
Expand Down
32 changes: 32 additions & 0 deletions sdk/python/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,38 @@ def test_feature_set_ingest_success(self, dataframe, test_client, mocker):
# Ingest data into Feast
test_client.ingest("driver-feature-set", dataframe)

@pytest.mark.parametrize(
"dataframe,test_client,exception",
[(dataframes.GOOD, pytest.lazy_fixture("client"), Exception)],
)
def test_feature_set_ingest_throws_exception_if_kafka_down(
self, dataframe, test_client, exception, mocker
):

test_client.set_project("project1")
driver_fs = FeatureSet(
"driver-feature-set",
source=KafkaSource(brokers="localhost:4412", topic="test"),
)
driver_fs.add(Feature(name="feature_1", dtype=ValueType.FLOAT))
driver_fs.add(Feature(name="feature_2", dtype=ValueType.STRING))
driver_fs.add(Feature(name="feature_3", dtype=ValueType.INT64))
driver_fs.add(Entity(name="entity_id", dtype=ValueType.INT64))

# Register with Feast core
test_client.apply(driver_fs)
driver_fs = driver_fs.to_proto()
driver_fs.meta.status = FeatureSetStatusProto.STATUS_READY

mocker.patch.object(
test_client._core_service_stub,
"GetFeatureSet",
return_value=GetFeatureSetResponse(feature_set=driver_fs),
)

with pytest.raises(exception):
test_client.ingest("driver-feature-set", dataframe)

@pytest.mark.parametrize(
"dataframe,exception,test_client",
[
Expand Down

0 comments on commit 561b621

Please sign in to comment.