Skip to content

Commit

Permalink
Merge pull request #42 from myNameIsPatrick/fix/use_stored_offset
Browse files Browse the repository at this point in the history
Obey offsets stored in Kafka for existing groups
  • Loading branch information
spenczar authored Aug 19, 2020
2 parents 49de502 + 8fdcaed commit 4e489f8
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 38 deletions.
38 changes: 14 additions & 24 deletions adc/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import enum
import logging
from datetime import timedelta
from typing import Dict, Iterator, List, Optional, Set, Union
from typing import Dict, Iterator, List, Optional, Set

import confluent_kafka # type: ignore
import confluent_kafka.admin # type: ignore
Expand Down Expand Up @@ -44,25 +44,6 @@ def subscribe(self,
topic=topic,
partition=partition_id,
)
# FIXME: This doesn't obey offsets stored in Kafka for existing groups.
if self.conf.start_at is ConsumerStartPosition.EARLIEST:
tp.offset = confluent_kafka.OFFSET_BEGINNING
elif self.conf.start_at is ConsumerStartPosition.LATEST:
# FIXME: librdkafka has a bug in offset handling - it caches
# "OFFSET_END", and will repeatedly move to the end of the
# topic. See https://github.com/edenhill/librdkafka/pull/2876 -
# it should get fixed in v1.5 of librdkafka.

librdkafka_version = confluent_kafka.libversion()[0]
if librdkafka_version < "1.5.0":
self.logger.warn(
"In librdkafka before v1.5, LATEST offsets have buggy behavior; you may "
f"not receive data (your librdkafka version is {librdkafka_version}). See "
"https://github.com/confluentinc/confluent-kafka-dotnet/issues/1254.")
tp.offset = confluent_kafka.OFFSET_END
else:
tp.offset = self.conf.start_at

assignment.append(tp)

self.logger.debug("registering topic assignment")
Expand Down Expand Up @@ -186,7 +167,6 @@ def close(self):
class ConsumerStartPosition(enum.Enum):
EARLIEST = 1
LATEST = 2
PRODUCER = 3

def __str__(self):
return self.name.lower()
Expand All @@ -207,9 +187,8 @@ class ConsumerConfig:
# was marked done with consumer.mark_done, regardless of this setting. This
# is only used when the position in the stream is unknown.
#
# This can either be a logical offset via a ConsumerStartPosition value, or
# it can be a specific offset as an integer.
start_at: Union[ConsumerStartPosition, int] = ConsumerStartPosition.EARLIEST
# This is specified as a logical offset via a ConsumerStartPosition value.
start_at: ConsumerStartPosition = ConsumerStartPosition.EARLIEST

# Authentication package to pass in to read from Kafka.
auth: Optional[SASLAuth] = None
Expand Down Expand Up @@ -238,6 +217,17 @@ def _to_confluent_kafka(self) -> Dict:
}
config["default.topic.config"] = default_topic_config
elif self.start_at is ConsumerStartPosition.LATEST:
# FIXME: librdkafka has a bug in offset handling - it caches
# "OFFSET_END", and will repeatedly move to the end of the
# topic. See https://github.com/edenhill/librdkafka/pull/2876 -
# it should get fixed in v1.5 of librdkafka.

librdkafka_version = confluent_kafka.libversion()[0]
if librdkafka_version < "1.5.0":
self.logger.warn(
"In librdkafka before v1.5, LATEST offsets have buggy behavior; you may "
f"not receive data (your librdkafka version is {librdkafka_version}). See "
"https://github.com/confluentinc/confluent-kafka-dotnet/issues/1254.")
default_topic_config = config.get("default.topic.config", {})
default_topic_config = {
"auto.offset.reset": "LATEST",
Expand Down
4 changes: 2 additions & 2 deletions adc/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
def open(url: str,
mode: str = 'r',
auth: Optional[auth.SASLAuth] = None,
start_at: Union[consumer.ConsumerStartPosition, int] = consumer.ConsumerStartPosition.EARLIEST, # noqa: E501
start_at: consumer.ConsumerStartPosition = consumer.ConsumerStartPosition.EARLIEST, # noqa: E501
read_forever: bool = True,
) -> Union[producer.Producer, Iterable[confluent_kafka.Message]]:
group_id, broker_addresses, topics = kafka.parse_kafka_url(url)
Expand Down Expand Up @@ -43,7 +43,7 @@ def _open_consumer(
broker_addresses: List[str],
topics: List[str],
auth: Optional[auth.SASLAuth],
start_at: Union[consumer.ConsumerStartPosition, int],
start_at: consumer.ConsumerStartPosition,
read_forever: bool,
) -> Iterable[confluent_kafka.Message]:
client = consumer.Consumer(consumer.ConsumerConfig(
Expand Down
90 changes: 78 additions & 12 deletions tests/test_kafka_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,32 +85,98 @@ def test_consume_from_end(self):
self.assertEqual(msg.topic(), topic)
self.assertEqual(msg.value(), b"message 4")

def test_consume_from_specified_offset(self):
def test_consume_from_beginning(self):
# Write a few messages.
topic = "test_consume_from_specified_offset"
simple_write_msgs(self.kafka, topic, [
topic = "test_consume_from_beginning"
batch = [
"message 1",
"message 2",
"message 3",
"message 4",
])
]
simple_write_msgs(self.kafka, topic, batch)

# Start a consumer from the third message (offset '2')
# Start a consumer from the beginning.
consumer = adc.consumer.Consumer(adc.consumer.ConsumerConfig(
broker_urls=[self.kafka.address],
group_id="test_consumer",
auth=self.kafka.auth,
start_at=2,
read_forever=False,
start_at=adc.consumer.ConsumerStartPosition.EARLIEST,
))
consumer.subscribe(topic)
stream = consumer.stream()
msgs = [msg for msg in stream]

msg = next(stream)
self.assertEqual(msg.topic(), topic)
self.assertEqual(msg.value(), b"message 3")
msg = next(stream)
self.assertEqual(msg.topic(), topic)
self.assertEqual(msg.value(), b"message 4")
self.assertEqual(len(batch), len(msgs))
for expected, actual in zip(batch, msgs):
self.assertEqual(actual.topic(), topic)
self.assertEqual(actual.value().decode(), expected)

def test_consume_stored_offsets(self):
# Write first batch of messages.
topic = "test_stored_offsets"
batch_1 = [
"message 1",
"message 2",
"message 3",
"message 4",
]
simple_write_msgs(self.kafka, topic, batch_1)

# Start first consumer, reading from earliest offset.
consumer_1 = adc.consumer.Consumer(adc.consumer.ConsumerConfig(
broker_urls=[self.kafka.address],
group_id="test_consumer_1",
auth=self.kafka.auth,
read_forever=False,
start_at=adc.consumer.ConsumerStartPosition.EARLIEST,
))
consumer_1.subscribe(topic)
stream_1 = consumer_1.stream()
msgs_1 = [msg for msg in stream_1]

# Check that all messages from first batch are processed.
self.assertEqual(len(batch_1), len(msgs_1))
for expected, actual in zip(batch_1, msgs_1):
self.assertEqual(actual.topic(), topic)
self.assertEqual(actual.value().decode(), expected)

# Write second batch of messages.
batch_2 = [
"message 5",
"message 6",
"message 7",
]
simple_write_msgs(self.kafka, topic, batch_2)

# Read more messages from first consumer. This should now
# only read from the stored offset, so that only the second
# batch is processed.
stream_1 = consumer_1.stream()
msgs_1 = [msg for msg in stream_1]
self.assertEqual(len(batch_2), len(msgs_1))
for expected, actual in zip(batch_2, msgs_1):
self.assertEqual(actual.topic(), topic)
self.assertEqual(actual.value().decode(), expected)

# Start second consumer, also reading from earliest offset.
consumer_2 = adc.consumer.Consumer(adc.consumer.ConsumerConfig(
broker_urls=[self.kafka.address],
group_id="test_consumer_2",
auth=self.kafka.auth,
read_forever=False,
start_at=adc.consumer.ConsumerStartPosition.EARLIEST,
))
consumer_2.subscribe(topic)
stream_2 = consumer_2.stream()
msgs_2 = [msg for msg in stream_2]

# Now check that messages from both batches are processed.
self.assertEqual(len(batch_1 + batch_2), len(msgs_2))
for expected, actual in zip(batch_1 + batch_2, msgs_2):
self.assertEqual(actual.topic(), topic)
self.assertEqual(actual.value().decode(), expected)

def test_consume_not_forever(self):
topic = "test_consume_not_forever"
Expand Down

0 comments on commit 4e489f8

Please sign in to comment.