Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: replace TODOs with comments #9

Merged
merged 2 commits into from
Jul 20, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions edx_event_bus_kafka/publishing/event_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ def descend_avro_schema(serializer_schema: dict, field_path: List[str]) -> dict:
Returns:
Schema for some field

TODO: Move to openedx_events.event_bus.avro.serializer?
Note: Avro helpers could be moved to openedx_events.event_bus.avro.serializer to be
used for other event bus implementations other than Kafka.
"""
subschema = serializer_schema
for field_name in field_path:
Expand Down Expand Up @@ -114,9 +115,6 @@ def get_serializer(signal: OpenEdxPublicSignal) -> AvroSignalSerializer:
return AvroSignalSerializer(signal)


# TODO: Cache this, but in a way that still allows changes to settings
# via remote-config (and in particular does not result in mixed
# cache/uncached configuration).
def get_producer_for_signal(signal: OpenEdxPublicSignal, event_key_field: str) -> Optional[SerializingProducer]:
"""
Create the producer for a signal and a key field path.
Expand All @@ -127,6 +125,11 @@ def get_producer_for_signal(signal: OpenEdxPublicSignal, event_key_field: str) -
signal: The OpenEdxPublicSignal to make a producer for
event_key_field: Path to the event data field to use as the event key (period-delimited
string naming the dictionary keys to descend)

Performance note:
This could be cached, but requires care such that it allows changes to settings via
remote-config (and in particular does not result in mixed cache/uncached configuration).
This complexity is being deferred until this becomes a performance issue.
"""
if schema_registry_url := getattr(settings, 'SCHEMA_REGISTRY_URL', None):
schema_registry_config = {
Expand Down Expand Up @@ -188,6 +191,9 @@ def on_event_deliver(err, evt):
Arguments:
err: Error if event production failed
evt: Event that was delivered (or failed to be delivered)

Note: This is meant to be temporary until we implement
more rigorous error handling.
"""
if err is not None:
logger.warning(f"Event delivery failed: {err!r}")
Expand Down Expand Up @@ -218,4 +224,6 @@ def send_to_event_bus(signal: OpenEdxPublicSignal, topic: str, event_key_field:
producer.produce(topic, key=event_key, value=event_data,
on_delivery=on_event_deliver,
headers={EVENT_TYPE_HEADER_KEY: signal.event_type})
# TODO (EventBus): Investigate poll() vs. flush(), and other related settings:
# See https://github.com/openedx/event-bus-kafka/issues/10
producer.poll() # wait indefinitely for the above event to either be delivered or fail