diff --git a/CHANGELOG.md b/CHANGELOG.md index b94ba4f..95ad314 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,11 @@ +8.2.1 (2023-11-28) +------------------ + +**Fixes** +- Close kafka consumer at the end of sync +- Commit offsets synchronously. +- Bump `confluent-kafka[protobuf]` from `2.2.*` to `2.3.*` + 8.2.0 (2023-11-17) ------------------ - Add more info logs diff --git a/setup.py b/setup.py index 1cbebfb..87a041b 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ long_description = fh.read() setup(name='pipelinewise-tap-kafka', - version='8.2.0', + version='8.2.1', description='Singer.io tap for extracting data from Kafka topic - PipelineWise compatible', long_description=long_description, long_description_content_type='text/markdown', @@ -19,7 +19,7 @@ install_requires=[ 'pipelinewise-singer-python==2.*', 'dpath==2.1.*', - 'confluent-kafka[protobuf]==2.2.*', + 'confluent-kafka[protobuf]==2.3.*', 'grpcio-tools==1.57.*' ], extras_require={ diff --git a/tap_kafka/sync.py b/tap_kafka/sync.py index b44a3e5..9bdae8d 100644 --- a/tap_kafka/sync.py +++ b/tap_kafka/sync.py @@ -109,7 +109,14 @@ def epoch_to_iso_timestamp(epoch) -> str: return datetime.datetime.utcfromtimestamp(epoch / 1000).isoformat(timespec='milliseconds') +def error_cb(err): + """Error callback for kafka consumer""" + LOGGER.info('An error occurred: %s', err) + + def init_kafka_consumer(kafka_config): + """Initialise kafka consumer""" + LOGGER.info('Initialising Kafka Consumer...') consumer_conf = { @@ -125,6 +132,7 @@ def init_kafka_consumer(kafka_config): # Non-configurable parameters 'enable.auto.commit': False, 'value.deserializer': init_value_deserializer(kafka_config), + 'error_cb': error_cb, } if kafka_config['debug_contexts']: @@ -342,7 +350,8 @@ def commit_consumer_to_bookmarked_state(consumer, topic, state): bookmarked_partition['offset']) offsets_to_commit.append(topic_partition) - consumer.commit(offsets=offsets_to_commit) + consumer.commit(offsets=offsets_to_commit, asynchronous=False) + LOGGER.info("Bookmarked offsets committed") @@ -435,13 +444,16 @@ def do_sync(kafka_config, catalog, state): streams = catalog.get('streams', []) topic_pos = search_in_list_of_dict_by_key_value(streams, 'tap_stream_id', topic) - if topic_pos != -1: - # Send the initial schema message - send_schema_message(streams[topic_pos]) + if topic_pos == -1: + raise Exception(f'Invalid catalog object. Cannot find {topic} in catalog') - # Setup consumer - consumer = init_kafka_consumer(kafka_config) + # Send the initial schema message + send_schema_message(streams[topic_pos]) + # Setup consumer + consumer = init_kafka_consumer(kafka_config) + + try: partitions = select_kafka_partitions(consumer, kafka_config) partitions = set_partition_offsets(consumer, partitions, kafka_config, state) @@ -450,5 +462,7 @@ def do_sync(kafka_config, catalog, state): # Start consuming messages from kafka read_kafka_messages(consumer, kafka_config, state) - else: - raise Exception(f'Invalid catalog object. Cannot find {topic} in catalog') + finally: + # # Leave group and commit final offsets + LOGGER.info('Explicitly closing Kafka consumer...') + consumer.close() diff --git a/tests/unit/helper/kafka_consumer_mock.py b/tests/unit/helper/kafka_consumer_mock.py index b5ec597..50feaa4 100644 --- a/tests/unit/helper/kafka_consumer_mock.py +++ b/tests/unit/helper/kafka_consumer_mock.py @@ -74,9 +74,10 @@ def assign(self, offsets=None): if offsets: self.assigned_offsets = offsets - def commit(self, offsets=None): - if offsets: - self.committed_offsets = offsets + def commit(self, *args, **kwargs): + + if 'offsets' in kwargs: + self.committed_offsets = kwargs['offsets'] def offsets_for_times(self, topic_partitions): if topic_partitions[0].offset == 1638132327000: