From e8bf01a8ffd9cbf7ccf34420d6135b6ef885051a Mon Sep 17 00:00:00 2001 From: Robert Raposa Date: Wed, 20 Jul 2022 12:59:31 -0400 Subject: [PATCH 1/2] docs: replace TODOs with comments Replace some TODOs with comments about what could be done in the future if and when it is needed. --- edx_event_bus_kafka/publishing/event_producer.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/edx_event_bus_kafka/publishing/event_producer.py b/edx_event_bus_kafka/publishing/event_producer.py index 06c6eff..c8bcbd1 100644 --- a/edx_event_bus_kafka/publishing/event_producer.py +++ b/edx_event_bus_kafka/publishing/event_producer.py @@ -67,7 +67,8 @@ def descend_avro_schema(serializer_schema: dict, field_path: List[str]) -> dict: Returns: Schema for some field - TODO: Move to openedx_events.event_bus.avro.serializer? + Note: Avro helpers could be moved to openedx_events.event_bus.avro.serializer to be + used for other event bus implementations other than Kafka. """ subschema = serializer_schema for field_name in field_path: @@ -114,9 +115,6 @@ def get_serializer(signal: OpenEdxPublicSignal) -> AvroSignalSerializer: return AvroSignalSerializer(signal) -# TODO: Cache this, but in a way that still allows changes to settings -# via remote-config (and in particular does not result in mixed -# cache/uncached configuration). def get_producer_for_signal(signal: OpenEdxPublicSignal, event_key_field: str) -> Optional[SerializingProducer]: """ Create the producer for a signal and a key field path. @@ -127,6 +125,11 @@ def get_producer_for_signal(signal: OpenEdxPublicSignal, event_key_field: str) - signal: The OpenEdxPublicSignal to make a producer for event_key_field: Path to the event data field to use as the event key (period-delimited string naming the dictionary keys to descend) + + Performance note: + This could be cached, but requires care such that it allows changes to settings via + remote-config (and in particular does not result in mixed cache/uncached configuration). + This complexity is being deferred until this becomes a performance issue. """ if schema_registry_url := getattr(settings, 'SCHEMA_REGISTRY_URL', None): schema_registry_config = { From 932a377e13499782211929ed87680e2e4dcb70fb Mon Sep 17 00:00:00 2001 From: Robert Raposa Date: Wed, 20 Jul 2022 16:58:47 -0400 Subject: [PATCH 2/2] fixup! more comments --- edx_event_bus_kafka/publishing/event_producer.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/edx_event_bus_kafka/publishing/event_producer.py b/edx_event_bus_kafka/publishing/event_producer.py index c8bcbd1..285cab4 100644 --- a/edx_event_bus_kafka/publishing/event_producer.py +++ b/edx_event_bus_kafka/publishing/event_producer.py @@ -191,6 +191,9 @@ def on_event_deliver(err, evt): Arguments: err: Error if event production failed evt: Event that was delivered (or failed to be delivered) + + Note: This is meant to be temporary until we implement + more rigorous error handling. """ if err is not None: logger.warning(f"Event delivery failed: {err!r}") @@ -221,4 +224,6 @@ def send_to_event_bus(signal: OpenEdxPublicSignal, topic: str, event_key_field: producer.produce(topic, key=event_key, value=event_data, on_delivery=on_event_deliver, headers={EVENT_TYPE_HEADER_KEY: signal.event_type}) + # TODO (EventBus): Investigate poll() vs. flush(), and other related settings: + # See https://github.com/openedx/event-bus-kafka/issues/10 producer.poll() # wait indefinitely for the above event to either be delivered or fail