-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add a stop method to the consumer class; address #57 #58
add a stop method to the consumer class; address #57 #58
Conversation
This can be used to signal the consumer to stop if it is running in a different thread
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall. I added some feedback that I think should be addressed, but mostly minor and/or nitpicky suggestions.
adc/consumer.py
Outdated
@@ -121,11 +128,14 @@ def _stream_forever(self, | |||
batch_size: int = 100, | |||
batch_timeout: timedelta = timedelta(seconds=1.0), | |||
) -> Iterator[confluent_kafka.Message]: | |||
while True: | |||
self.running = True | |||
while self.running is True: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replace all instances of self.running is True
with self.running
and all instances of self.running is False
with not self.running
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With use of an Event
this would become not self._stop. is_set()
, and similar for other uses.
adc/consumer.py
Outdated
@@ -25,6 +25,7 @@ def __init__(self, conf: 'ConsumerConfig') -> None: | |||
# Workaround for https://github.com/edenhill/librdkafka/issues/3263. | |||
# Remove once confluent-kafka-python 1.9.0 has been released. | |||
self._consumer.poll(0) | |||
self.running = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this would be better as a private variable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, and furthermore I think it should probably be a threading.Event
object, instead of a simple boolean. Practically speaking, using a boolean will work fine in CPython as long as the GIL prevents threads from actually executing different parts of this code concurrently, but for compatibility with other implementations and future-proofing, I think a thread-safe object should be used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should make a few changes for strict/pedantic thread safety.
adc/consumer.py
Outdated
@@ -25,6 +25,7 @@ def __init__(self, conf: 'ConsumerConfig') -> None: | |||
# Workaround for https://github.com/edenhill/librdkafka/issues/3263. | |||
# Remove once confluent-kafka-python 1.9.0 has been released. | |||
self._consumer.poll(0) | |||
self.running = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, and furthermore I think it should probably be a threading.Event
object, instead of a simple boolean. Practically speaking, using a boolean will work fine in CPython as long as the GIL prevents threads from actually executing different parts of this code concurrently, but for compatibility with other implementations and future-proofing, I think a thread-safe object should be used.
adc/consumer.py
Outdated
@@ -121,11 +128,14 @@ def _stream_forever(self, | |||
batch_size: int = 100, | |||
batch_timeout: timedelta = timedelta(seconds=1.0), | |||
) -> Iterator[confluent_kafka.Message]: | |||
while True: | |||
self.running = True | |||
while self.running is True: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With use of an Event
this would become not self._stop. is_set()
, and similar for other uses.
thank you both. I have made the changes, please have a look when you can. |
I'll also mention that with this branch, I have now tested gwcelery termination on several occasions over the last 20 hours, and it has closed the connection and stopped the listener (https://git.ligo.org/emfollow/gwcelery/-/merge_requests/905/diffs#b03e78e6b901a90969bee266a4e115cf50cb5d89_119_92). Condor_rm removes all jobs successfully. |
This can be used to signal the consumer to stop if it is running in a different thread