Skip to content

Commit

Permalink
Fix Kafka offset checking test (#1212)
Browse files Browse the repository at this point in the history
* Fixes intermittent failures in `tests/test_kafka_source_stage_pipe.py::test_kafka_source_commit`
* The test checked the offsets in a stage, however the C++ impl for the source stage performs a commit after calling `on_next`. This also limited us to only testing sync commits.
* Updated test performs the offset check after the pipeline completes when we can be assured that all commits have completed.

Fxies #1217

## By Submitting this PR I confirm:
- I am familiar with the [Contributing Guidelines](https://github.com/nv-morpheus/Morpheus/blob/main/docs/source/developer_guide/contributing.md).
- When the PR is ready for review, new or existing tests cover these changes.
- When the PR is ready for review, the documentation is up to date with these changes.

Authors:
  - David Gardner (https://github.com/dagardner-nv)

Approvers:
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: #1212
  • Loading branch information
dagardner-nv authored Sep 22, 2023
1 parent f8b774a commit 13d9d8c
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 81 deletions.
27 changes: 17 additions & 10 deletions tests/_utils/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,23 +53,28 @@ def kafka_bootstrap_servers_fixture(kafka_server: (subprocess.Popen, int)): # p
yield f"localhost:{kafka_port}"


@pytest.fixture(name='kafka_consumer', scope='function')
def kafka_consumer_fixture(kafka_topics: KafkaTopics, _kafka_consumer: "KafkaConsumer"):
_kafka_consumer.subscribe([kafka_topics.output_topic])

# Wait until we have assigned partitions
def seek_to_beginning(kafka_consumer: "KafkaConsumer", timeout: int = PARTITION_ASSIGNMENT_TIMEOUT):
"""
Seeks to the beginning of the Kafka topic
"""
start = time.time()
end = start + PARTITION_ASSIGNMENT_TIMEOUT
end = start + timeout
partitions_assigned = False
while not partitions_assigned and time.time() <= end:
_kafka_consumer.poll(timeout_ms=20)
partitions_assigned = len(_kafka_consumer.assignment()) > 0
kafka_consumer.poll(timeout_ms=20)
partitions_assigned = len(kafka_consumer.assignment()) > 0
if not partitions_assigned:
time.sleep(0.1)

assert partitions_assigned

_kafka_consumer.seek_to_beginning()
kafka_consumer.seek_to_beginning()


@pytest.fixture(name='kafka_consumer', scope='function')
def kafka_consumer_fixture(kafka_topics: KafkaTopics, _kafka_consumer: "KafkaConsumer"):
_kafka_consumer.subscribe([kafka_topics.output_topic])
seek_to_beginning(_kafka_consumer)

yield _kafka_consumer

Expand Down Expand Up @@ -103,7 +108,9 @@ def _init_pytest_kafka() -> (bool, Exception):
'zookeeper_proc',
teardown_fn=teardown_fn,
scope='session')
_kafka_consumer = pytest_kafka.make_kafka_consumer('kafka_server', scope='function')
_kafka_consumer = pytest_kafka.make_kafka_consumer('kafka_server',
scope='function',
group_id='morpheus_unittest_consumer')

return (True, None)
except Exception as e:
Expand Down
104 changes: 33 additions & 71 deletions tests/test_kafka_source_stage_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,26 @@
import os
import typing

import mrc
import pandas as pd
import pytest
from mrc.core import operators as ops

from _utils import TEST_DIRS
from _utils import assert_results
from _utils.kafka import seek_to_beginning
from _utils.kafka import write_data_to_kafka
from _utils.kafka import write_file_to_kafka
from _utils.stages.dfp_length_checker import DFPLengthChecker
from morpheus.config import Config
from morpheus.pipeline.linear_pipeline import LinearPipeline
from morpheus.pipeline.single_port_stage import SinglePortStage
from morpheus.stages.general.trigger_stage import TriggerStage
from morpheus.stages.input.kafka_source_stage import KafkaSourceStage
from morpheus.stages.output.compare_dataframe_stage import CompareDataFrameStage
from morpheus.stages.postprocess.serialize_stage import SerializeStage
from morpheus.stages.preprocess.deserialize_stage import DeserializeStage

if (typing.TYPE_CHECKING):
from kafka import KafkaConsumer


@pytest.mark.kafka
def test_kafka_source_stage_pipe(config, kafka_bootstrap_servers: str, kafka_topics: typing.Tuple[str, str]) -> None:
Expand Down Expand Up @@ -93,90 +94,40 @@ def test_multi_topic_kafka_source_stage_pipe(config, kafka_bootstrap_servers: st
assert_results(comp_stage.get_results())


class OffsetChecker(SinglePortStage):
"""
Verifies that the kafka offsets are being updated as a way of verifying that the
consumer is performing a commit.
"""

def __init__(self, c: Config, bootstrap_servers: str, group_id: str):
super().__init__(c)

# Importing here so that running without the --run_kafka flag won't fail due
# to not having the kafka libs installed
from kafka import KafkaAdminClient

self._client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
self._group_id = group_id
self._offsets = None

@property
def name(self) -> str:
return "morpheus_offset_checker"

def accepted_types(self) -> typing.Tuple:
"""
Accepted input types for this stage are returned.
Returns
-------
typing.Tuple
Accepted input types.
"""
return (typing.Any, )

def supports_cpp_node(self):
return False

def _offset_checker(self, x):
at_least_one_gt = False
new_offsets = self._client.list_consumer_group_offsets(self._group_id)

if self._offsets is not None:
for (topic_partition, prev_offset) in self._offsets.items():
new_offset = new_offsets[topic_partition]

assert new_offset.offset >= prev_offset.offset

if new_offset.offset > prev_offset.offset:
at_least_one_gt = True

assert at_least_one_gt

self._offsets = new_offsets

return x

def _build_single(self, builder: mrc.Builder, input_stream):
node = builder.make_node(self.unique_name, ops.map(self._offset_checker))
builder.make_edge(input_stream[0], node)

return node, input_stream[1]


@pytest.mark.kafka
@pytest.mark.parametrize('async_commits', [True, False])
@pytest.mark.parametrize('num_records', [10, 100, 1000])
def test_kafka_source_commit(num_records, config, kafka_bootstrap_servers: str,
kafka_topics: typing.Tuple[str, str]) -> None:
def test_kafka_source_commit(num_records: int,
async_commits: bool,
config: Config,
kafka_bootstrap_servers: str,
kafka_topics: typing.Tuple[str, str],
kafka_consumer: "KafkaConsumer") -> None:
group_id = 'morpheus'

data = [{'v': i} for i in range(num_records)]
num_written = write_data_to_kafka(kafka_bootstrap_servers, kafka_topics.input_topic, data)
assert num_written == num_records

kafka_consumer.subscribe([kafka_topics.input_topic])
seek_to_beginning(kafka_consumer)
partitions = kafka_consumer.assignment()

# This method does not advance the consumer, and even if it did, this consumer has a different group_id than the
# source stage
expected_offsets = kafka_consumer.end_offsets(partitions)

pipe = LinearPipeline(config)
pipe.set_source(
KafkaSourceStage(config,
bootstrap_servers=kafka_bootstrap_servers,
input_topic=kafka_topics.input_topic,
auto_offset_reset="earliest",
poll_interval="1seconds",
group_id='morpheus',
group_id=group_id,
client_id='morpheus_kafka_source_commit',
stop_after=num_records,
async_commits=False))

pipe.add_stage(OffsetChecker(config, bootstrap_servers=kafka_bootstrap_servers, group_id='morpheus'))
async_commits=async_commits))
pipe.add_stage(TriggerStage(config))

pipe.add_stage(DeserializeStage(config))
Expand All @@ -187,6 +138,17 @@ def test_kafka_source_commit(num_records, config, kafka_bootstrap_servers: str,

assert_results(comp_stage.get_results())

from kafka import KafkaAdminClient
admin_client = KafkaAdminClient(bootstrap_servers=kafka_bootstrap_servers, client_id='offset_checker')
offsets = admin_client.list_consumer_group_offsets(group_id)

# The broker may have created additional partitions, offsets should be a superset of expected_offsets
for (topic_partition, expected_offset) in expected_offsets.items():
# The value of the offsets dict being returned is a tuple of (offset, metadata), while the value of the
# expected_offsets is just the offset.
actual_offset = offsets[topic_partition][0]
assert actual_offset == expected_offset


@pytest.mark.kafka
@pytest.mark.parametrize('num_records', [1000])
Expand Down

0 comments on commit 13d9d8c

Please sign in to comment.