diff --git a/CHANGELOG.rst b/CHANGELOG.rst index d7c4624..30fed93 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -14,7 +14,12 @@ Change Log Unreleased ********** +[5.4.0] - 2023-08-28 +******************** +Changed +======= * Changed ordering of certain context assignments in producer code. +* Adds custom exceptions for producing and consuming errors. [5.3.1] - 2023-08-10 ******************** diff --git a/edx_event_bus_kafka/__init__.py b/edx_event_bus_kafka/__init__.py index 8d5a511..be17940 100644 --- a/edx_event_bus_kafka/__init__.py +++ b/edx_event_bus_kafka/__init__.py @@ -9,4 +9,4 @@ from edx_event_bus_kafka.internal.consumer import KafkaEventConsumer from edx_event_bus_kafka.internal.producer import KafkaEventProducer, create_producer -__version__ = '5.3.1' +__version__ = '5.4.0' diff --git a/edx_event_bus_kafka/internal/consumer.py b/edx_event_bus_kafka/internal/consumer.py index 24367ca..9cb050a 100644 --- a/edx_event_bus_kafka/internal/consumer.py +++ b/edx_event_bus_kafka/internal/consumer.py @@ -85,6 +85,12 @@ def __init__(self, message: str, causes: list): self.causes = causes # just used for testing +class EventConsumptionException(Exception): + """ + Indicates that we had an issue in event production. Useful for filtering on later. + """ + + def _reconnect_to_db_if_needed(): """ Reconnects the db connection if needed. @@ -513,8 +519,8 @@ def record_event_consuming_error(self, run_context, error, maybe_message): try: # This is gross, but our record_exception wrapper doesn't take args at the moment, # and will only read the exception from stack context. - raise Exception(error) - except BaseException: + raise EventConsumptionException(error) + except EventConsumptionException: self._add_message_monitoring(run_context=run_context, message=maybe_kafka_message, error=error) record_exception() logger.exception( diff --git a/edx_event_bus_kafka/internal/producer.py b/edx_event_bus_kafka/internal/producer.py index 3125027..499bfd9 100644 --- a/edx_event_bus_kafka/internal/producer.py +++ b/edx_event_bus_kafka/internal/producer.py @@ -38,6 +38,10 @@ confluent_kafka = None +class EventProductionException(Exception): + """ An exception we can check for when errors occur in event production code. """ + + def record_producing_error(error, context): """ Record an error in producing an event to both the monitoring system and the regular logs @@ -49,8 +53,8 @@ def record_producing_error(error, context): try: # record_exception() is a wrapper around a New Relic method that can only be called within an except block, # so first re-raise the error - raise Exception(error) - except BaseException: + raise EventProductionException(error) + except EventProductionException: record_exception() logger.exception(f"Error delivering message to Kafka event bus. {error=!s} {context!r}")