From 22fc94be1fab6b5ce938041b172ba501f4acd51b Mon Sep 17 00:00:00 2001 From: Vishu Guntupalli Date: Wed, 30 Jun 2021 14:19:56 -0500 Subject: [PATCH 1/8] - Adding functionality to configure Avro deserializer and security protocol. --- requirements.txt | 1 + tap_kafka/__init__.py | 17 ++++++++++++++--- tap_kafka/sync.py | 13 +++++++++++-- 3 files changed, 26 insertions(+), 5 deletions(-) diff --git a/requirements.txt b/requirements.txt index 1ecc759..31f76ae 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,3 +4,4 @@ requests==2.20.0 strict-rfc3339==0.7 nose==1.3.7 jsonschema==2.6.0 +kafkian=0.13.0 diff --git a/tap_kafka/__init__.py b/tap_kafka/__init__.py index 242b5ed..c56db63 100644 --- a/tap_kafka/__init__.py +++ b/tap_kafka/__init__.py @@ -10,7 +10,7 @@ import tap_kafka.sync as sync import tap_kafka.common as common -LOGGER = singer.get_logger('tap_kafka') +LOGGER = singer.get_logger() REQUIRED_CONFIG_KEYS = [ 'bootstrap_servers', @@ -42,11 +42,20 @@ def do_discovery(config): """Discover kafka topic by trying to connect to the topic and generate singer schema according to the config""" try: + if "avro_schema" in config and config["avro_schema"]: + LOGGER.info(f"avro_schema value set to {config['avro_schema']}, using avro deserializer.") + from kafkian.serde.deserialization import AvroDeserializer + avd = AvroDeserializer(schema_registry_url=config['avro_schema']) + deserializer = avd.deserialize + else: + def deserializer(m): + return json.loads(m.decode(config['encoding'])) consumer = KafkaConsumer(config['topic'], group_id=config['group_id'], enable_auto_commit=False, consumer_timeout_ms=config['consumer_timeout_ms'], - # value_deserializer=lambda m: json.loads(m.decode('ascii')) + security_protocol=config['security_protocol'], + value_deserializer=deserializer, bootstrap_servers=config['bootstrap_servers'].split(',')) except Exception as ex: @@ -89,7 +98,9 @@ def generate_config(args_config): 'encoding': args_config.get('encoding', DEFAULT_ENCODING), 'local_store_dir': args_config.get('local_store_dir', DEFAULT_LOCAL_STORE_DIR), 'local_store_batch_size_rows': args_config.get('local_store_batch_size_rows', - DEFAULT_LOCAL_STORE_BATCH_SIZE_ROWS) + DEFAULT_LOCAL_STORE_BATCH_SIZE_ROWS), + 'avro_schema': args_config.get('avro_schema', ''), + 'security_protocol': args_config.get('security_protocol', 'SSL') } diff --git a/tap_kafka/sync.py b/tap_kafka/sync.py index bf244ae..1366cdc 100644 --- a/tap_kafka/sync.py +++ b/tap_kafka/sync.py @@ -11,7 +11,7 @@ from .errors import InvalidBookmarkException -LOGGER = singer.get_logger('tap_kafka') +LOGGER = singer.get_logger() LOG_MESSAGES_PERIOD = 1000 # Print log messages to stderr after every nth messages UPDATE_BOOKMARK_PERIOD = 1000 # Update and send bookmark to stdout after nth messages @@ -69,6 +69,14 @@ def init_local_store(kafka_config): def init_kafka_consumer(kafka_config): LOGGER.info('Initialising Kafka Consumer...') + if "avro_schema" in kafka_config and kafka_config["avro_schema"]: + LOGGER.info(f"avro_schema value set to {kafka_config['avro_schema']}, using avro deserializer.") + from kafkian.serde.deserialization import AvroDeserializer + avd = AvroDeserializer(schema_registry_url=kafka_config['avro_schema']) + deserializer = avd.deserialize + else: + def deserializer(m): + return json.loads(m.decode(kafka_config['encoding'])) return KafkaConsumer( # Required parameters kafka_config['topic'], @@ -85,7 +93,8 @@ def init_kafka_consumer(kafka_config): # Non-configurable parameters enable_auto_commit=False, auto_offset_reset='earliest', - value_deserializer=lambda m: json.loads(m.decode(kafka_config['encoding']))) + security_protocol=kafka_config['security_protocol'], + value_deserializer=deserializer) def kafka_message_to_singer_record(message, topic, primary_keys): From d6341f7cf29c8384819c04266b35720bb8770424 Mon Sep 17 00:00:00 2001 From: Vishu Guntupalli Date: Wed, 30 Jun 2021 14:20:42 -0500 Subject: [PATCH 2/8] - Adding kafkian to dependencies. --- setup.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/setup.py b/setup.py index daa76c8..1521744 100644 --- a/setup.py +++ b/setup.py @@ -20,7 +20,8 @@ 'kafka-python==2.0.1', 'pipelinewise-singer-python==1.*', 'dpath==2.0.1', - 'filelock==3.0.12' + 'filelock==3.0.12', + 'kafkian==0.13.0' ], extras_require={ "test": [ From 16b9fb5768ffee0d6d493f69bda248f04c699155 Mon Sep 17 00:00:00 2001 From: Vishu Guntupalli Date: Wed, 30 Jun 2021 14:28:27 -0500 Subject: [PATCH 3/8] - Bumping feature version. --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 1521744..f8c1fdb 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ long_description = fh.read() setup(name='pipelinewise-tap-kafka', - version='4.0.0', + version='4.1.0', description='Singer.io tap for extracting data from Kafka topic - PipelineWise compatible', long_description=long_description, long_description_content_type='text/markdown', From 0588493d792a2e819e97a6f1798b12a9631ea84b Mon Sep 17 00:00:00 2001 From: Vishu Guntupalli Date: Thu, 1 Jul 2021 15:37:45 -0500 Subject: [PATCH 4/8] - Adding the option to pass the catalog file path as input; making singer compatible changes. --- requirements.txt | 1 + setup.py | 3 ++- tap_kafka/__init__.py | 6 ++++++ 3 files changed, 9 insertions(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 31f76ae..4d5e5e2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,3 +5,4 @@ strict-rfc3339==0.7 nose==1.3.7 jsonschema==2.6.0 kafkian=0.13.0 +requests==2.20.0 \ No newline at end of file diff --git a/setup.py b/setup.py index f8c1fdb..9a2f3ea 100644 --- a/setup.py +++ b/setup.py @@ -21,7 +21,8 @@ 'pipelinewise-singer-python==1.*', 'dpath==2.0.1', 'filelock==3.0.12', - 'kafkian==0.13.0' + 'kafkian==0.13.0', + 'requests==2.20.0' ], extras_require={ "test": [ diff --git a/tap_kafka/__init__.py b/tap_kafka/__init__.py index c56db63..989f7dc 100644 --- a/tap_kafka/__init__.py +++ b/tap_kafka/__init__.py @@ -111,9 +111,15 @@ def main_impl(): if args.discover: do_discovery(args.config) + elif args.properties: state = args.state or {} sync.do_sync(kafka_config, args.properties, state, fn_get_args=get_args) + elif "catalog_path" in args and args.catalog_path: + state = args.state or {} + with open(args.catalog_path) as json_file: + catalog = json.load(json_file) + sync.do_sync(kafka_config, catalog, state, fn_get_args=get_args) else: LOGGER.info("No properties were selected") From afc7f59987707466b98d1f2d3df6979a8a58a10f Mon Sep 17 00:00:00 2001 From: Vishu Guntupalli Date: Fri, 16 Jul 2021 14:20:42 -0500 Subject: [PATCH 5/8] - Making auto commit True. --- tap_kafka/sync.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tap_kafka/sync.py b/tap_kafka/sync.py index 1366cdc..9f73bb0 100644 --- a/tap_kafka/sync.py +++ b/tap_kafka/sync.py @@ -91,7 +91,7 @@ def deserializer(m): max_poll_interval_ms=kafka_config['max_poll_interval_ms'], # Non-configurable parameters - enable_auto_commit=False, + enable_auto_commit=True, auto_offset_reset='earliest', security_protocol=kafka_config['security_protocol'], value_deserializer=deserializer) From 61e82e5d547b69f8fdc90f310f5db04c6944b0e0 Mon Sep 17 00:00:00 2001 From: Vishu Guntupalli Date: Thu, 22 Jul 2021 14:56:24 -0500 Subject: [PATCH 6/8] - Closing the connection upon hard stop. --- tap_kafka/sync.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tap_kafka/sync.py b/tap_kafka/sync.py index 9f73bb0..6494c40 100644 --- a/tap_kafka/sync.py +++ b/tap_kafka/sync.py @@ -217,6 +217,7 @@ def read_kafka_topic(consumer, local_store, kafka_config, state, fn_get_args): max_runtime_s = max_runtime_ms / 1000 if now >= (start_time + max_runtime_s): LOGGER.info(f'Max runtime {max_runtime_s} seconds exceeded. Stop consuming more messages.') + consumer.close() break # Update singer bookmark at the last time to point it the the last processed offset From a7d67367a0fe03890d6390ed6f82cc3374bca2cb Mon Sep 17 00:00:00 2001 From: Vishu Guntupalli Date: Thu, 22 Jul 2021 16:01:55 -0500 Subject: [PATCH 7/8] - Closing the connection upon hard stop. --- tap_kafka/sync.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tap_kafka/sync.py b/tap_kafka/sync.py index 6494c40..5f63c38 100644 --- a/tap_kafka/sync.py +++ b/tap_kafka/sync.py @@ -217,7 +217,6 @@ def read_kafka_topic(consumer, local_store, kafka_config, state, fn_get_args): max_runtime_s = max_runtime_ms / 1000 if now >= (start_time + max_runtime_s): LOGGER.info(f'Max runtime {max_runtime_s} seconds exceeded. Stop consuming more messages.') - consumer.close() break # Update singer bookmark at the last time to point it the the last processed offset @@ -226,6 +225,7 @@ def read_kafka_topic(consumer, local_store, kafka_config, state, fn_get_args): local_store.insert(singer.format_message(singer.StateMessage(value=copy.deepcopy(state)))) local_store.persist_messages() commit_kafka_consumer(consumer, message.topic, message.partition, message.offset) + consumer.close() return last_flush_ts From 57e03690c32d7cea85e68d9c9286b94ba2595f46 Mon Sep 17 00:00:00 2001 From: Vishu Guntupalli Date: Fri, 23 Jul 2021 15:53:34 -0500 Subject: [PATCH 8/8] - casting args to int. --- tap_kafka/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tap_kafka/__init__.py b/tap_kafka/__init__.py index 989f7dc..8c6ef99 100644 --- a/tap_kafka/__init__.py +++ b/tap_kafka/__init__.py @@ -86,7 +86,7 @@ def generate_config(args_config): # Add optional parameters with defaults 'primary_keys': args_config.get('primary_keys', {}), - 'max_runtime_ms': args_config.get('max_runtime_ms', DEFAULT_MAX_RUNTIME_MS), + 'max_runtime_ms': int(args_config.get('max_runtime_ms', DEFAULT_MAX_RUNTIME_MS)), 'commit_interval_ms': args_config.get('commit_interval_ms', DEFAULT_COMMIT_INTERVAL_MS), 'batch_size_rows': args_config.get('batch_size_rows', DEFAULT_BATCH_SIZE_ROWS), 'batch_flush_interval_ms': args_config.get('batch_flush_interval_ms', DEFAULT_BATCH_FLUSH_INTERVAL_MS),