Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

AP-1574 attempted fix to hanging consumer #164

Merged
merged 21 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
8.2.1 (2023-11-28)
------------------

**Fixes**
- Close kafka consumer at the end of sync
- Commit offsets synchronously.
- Bump `confluent-kafka[protobuf]` from `2.2.*` to `2.3.*`

8.2.0 (2023-11-17)
------------------
- Add more info logs
Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
long_description = fh.read()

setup(name='pipelinewise-tap-kafka',
version='8.2.0',
version='8.2.1',
description='Singer.io tap for extracting data from Kafka topic - PipelineWise compatible',
long_description=long_description,
long_description_content_type='text/markdown',
Expand All @@ -19,7 +19,7 @@
install_requires=[
'pipelinewise-singer-python==2.*',
'dpath==2.1.*',
'confluent-kafka[protobuf]==2.2.*',
'confluent-kafka[protobuf]==2.3.*',
'grpcio-tools==1.57.*'
],
extras_require={
Expand Down
30 changes: 22 additions & 8 deletions tap_kafka/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,14 @@ def epoch_to_iso_timestamp(epoch) -> str:
return datetime.datetime.utcfromtimestamp(epoch / 1000).isoformat(timespec='milliseconds')


def error_cb(err):
"""Error callback for kafka consumer"""
LOGGER.info('An error occurred: %s', err)


def init_kafka_consumer(kafka_config):
"""Initialise kafka consumer"""

LOGGER.info('Initialising Kafka Consumer...')

consumer_conf = {
Expand All @@ -125,6 +132,7 @@ def init_kafka_consumer(kafka_config):
# Non-configurable parameters
'enable.auto.commit': False,
'value.deserializer': init_value_deserializer(kafka_config),
'error_cb': error_cb,
}

if kafka_config['debug_contexts']:
Expand Down Expand Up @@ -342,7 +350,8 @@ def commit_consumer_to_bookmarked_state(consumer, topic, state):
bookmarked_partition['offset'])
offsets_to_commit.append(topic_partition)

consumer.commit(offsets=offsets_to_commit)
consumer.commit(offsets=offsets_to_commit, asynchronous=False)

LOGGER.info("Bookmarked offsets committed")


Expand Down Expand Up @@ -435,13 +444,16 @@ def do_sync(kafka_config, catalog, state):
streams = catalog.get('streams', [])
topic_pos = search_in_list_of_dict_by_key_value(streams, 'tap_stream_id', topic)

if topic_pos != -1:
# Send the initial schema message
send_schema_message(streams[topic_pos])
if topic_pos == -1:
raise Exception(f'Invalid catalog object. Cannot find {topic} in catalog')

# Setup consumer
consumer = init_kafka_consumer(kafka_config)
# Send the initial schema message
send_schema_message(streams[topic_pos])

# Setup consumer
consumer = init_kafka_consumer(kafka_config)

try:
partitions = select_kafka_partitions(consumer, kafka_config)

partitions = set_partition_offsets(consumer, partitions, kafka_config, state)
Expand All @@ -450,5 +462,7 @@ def do_sync(kafka_config, catalog, state):

# Start consuming messages from kafka
read_kafka_messages(consumer, kafka_config, state)
else:
raise Exception(f'Invalid catalog object. Cannot find {topic} in catalog')
finally:
# # Leave group and commit final offsets
LOGGER.info('Explicitly closing Kafka consumer...')
consumer.close()
7 changes: 4 additions & 3 deletions tests/unit/helper/kafka_consumer_mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,10 @@ def assign(self, offsets=None):
if offsets:
self.assigned_offsets = offsets

def commit(self, offsets=None):
if offsets:
self.committed_offsets = offsets
def commit(self, *args, **kwargs):

if 'offsets' in kwargs:
self.committed_offsets = kwargs['offsets']

def offsets_for_times(self, topic_partitions):
if topic_partitions[0].offset == 1638132327000:
Expand Down
Loading