diff --git a/requirements.txt b/requirements.txt index 1ecc759..4d5e5e2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,3 +4,5 @@ requests==2.20.0 strict-rfc3339==0.7 nose==1.3.7 jsonschema==2.6.0 +kafkian=0.13.0 +requests==2.20.0 \ No newline at end of file diff --git a/setup.py b/setup.py index daa76c8..9a2f3ea 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ long_description = fh.read() setup(name='pipelinewise-tap-kafka', - version='4.0.0', + version='4.1.0', description='Singer.io tap for extracting data from Kafka topic - PipelineWise compatible', long_description=long_description, long_description_content_type='text/markdown', @@ -20,7 +20,9 @@ 'kafka-python==2.0.1', 'pipelinewise-singer-python==1.*', 'dpath==2.0.1', - 'filelock==3.0.12' + 'filelock==3.0.12', + 'kafkian==0.13.0', + 'requests==2.20.0' ], extras_require={ "test": [ diff --git a/tap_kafka/__init__.py b/tap_kafka/__init__.py index 242b5ed..8c6ef99 100644 --- a/tap_kafka/__init__.py +++ b/tap_kafka/__init__.py @@ -10,7 +10,7 @@ import tap_kafka.sync as sync import tap_kafka.common as common -LOGGER = singer.get_logger('tap_kafka') +LOGGER = singer.get_logger() REQUIRED_CONFIG_KEYS = [ 'bootstrap_servers', @@ -42,11 +42,20 @@ def do_discovery(config): """Discover kafka topic by trying to connect to the topic and generate singer schema according to the config""" try: + if "avro_schema" in config and config["avro_schema"]: + LOGGER.info(f"avro_schema value set to {config['avro_schema']}, using avro deserializer.") + from kafkian.serde.deserialization import AvroDeserializer + avd = AvroDeserializer(schema_registry_url=config['avro_schema']) + deserializer = avd.deserialize + else: + def deserializer(m): + return json.loads(m.decode(config['encoding'])) consumer = KafkaConsumer(config['topic'], group_id=config['group_id'], enable_auto_commit=False, consumer_timeout_ms=config['consumer_timeout_ms'], - # value_deserializer=lambda m: json.loads(m.decode('ascii')) + security_protocol=config['security_protocol'], + value_deserializer=deserializer, bootstrap_servers=config['bootstrap_servers'].split(',')) except Exception as ex: @@ -77,7 +86,7 @@ def generate_config(args_config): # Add optional parameters with defaults 'primary_keys': args_config.get('primary_keys', {}), - 'max_runtime_ms': args_config.get('max_runtime_ms', DEFAULT_MAX_RUNTIME_MS), + 'max_runtime_ms': int(args_config.get('max_runtime_ms', DEFAULT_MAX_RUNTIME_MS)), 'commit_interval_ms': args_config.get('commit_interval_ms', DEFAULT_COMMIT_INTERVAL_MS), 'batch_size_rows': args_config.get('batch_size_rows', DEFAULT_BATCH_SIZE_ROWS), 'batch_flush_interval_ms': args_config.get('batch_flush_interval_ms', DEFAULT_BATCH_FLUSH_INTERVAL_MS), @@ -89,7 +98,9 @@ def generate_config(args_config): 'encoding': args_config.get('encoding', DEFAULT_ENCODING), 'local_store_dir': args_config.get('local_store_dir', DEFAULT_LOCAL_STORE_DIR), 'local_store_batch_size_rows': args_config.get('local_store_batch_size_rows', - DEFAULT_LOCAL_STORE_BATCH_SIZE_ROWS) + DEFAULT_LOCAL_STORE_BATCH_SIZE_ROWS), + 'avro_schema': args_config.get('avro_schema', ''), + 'security_protocol': args_config.get('security_protocol', 'SSL') } @@ -100,9 +111,15 @@ def main_impl(): if args.discover: do_discovery(args.config) + elif args.properties: state = args.state or {} sync.do_sync(kafka_config, args.properties, state, fn_get_args=get_args) + elif "catalog_path" in args and args.catalog_path: + state = args.state or {} + with open(args.catalog_path) as json_file: + catalog = json.load(json_file) + sync.do_sync(kafka_config, catalog, state, fn_get_args=get_args) else: LOGGER.info("No properties were selected") diff --git a/tap_kafka/sync.py b/tap_kafka/sync.py index bf244ae..5f63c38 100644 --- a/tap_kafka/sync.py +++ b/tap_kafka/sync.py @@ -11,7 +11,7 @@ from .errors import InvalidBookmarkException -LOGGER = singer.get_logger('tap_kafka') +LOGGER = singer.get_logger() LOG_MESSAGES_PERIOD = 1000 # Print log messages to stderr after every nth messages UPDATE_BOOKMARK_PERIOD = 1000 # Update and send bookmark to stdout after nth messages @@ -69,6 +69,14 @@ def init_local_store(kafka_config): def init_kafka_consumer(kafka_config): LOGGER.info('Initialising Kafka Consumer...') + if "avro_schema" in kafka_config and kafka_config["avro_schema"]: + LOGGER.info(f"avro_schema value set to {kafka_config['avro_schema']}, using avro deserializer.") + from kafkian.serde.deserialization import AvroDeserializer + avd = AvroDeserializer(schema_registry_url=kafka_config['avro_schema']) + deserializer = avd.deserialize + else: + def deserializer(m): + return json.loads(m.decode(kafka_config['encoding'])) return KafkaConsumer( # Required parameters kafka_config['topic'], @@ -83,9 +91,10 @@ def init_kafka_consumer(kafka_config): max_poll_interval_ms=kafka_config['max_poll_interval_ms'], # Non-configurable parameters - enable_auto_commit=False, + enable_auto_commit=True, auto_offset_reset='earliest', - value_deserializer=lambda m: json.loads(m.decode(kafka_config['encoding']))) + security_protocol=kafka_config['security_protocol'], + value_deserializer=deserializer) def kafka_message_to_singer_record(message, topic, primary_keys): @@ -216,6 +225,7 @@ def read_kafka_topic(consumer, local_store, kafka_config, state, fn_get_args): local_store.insert(singer.format_message(singer.StateMessage(value=copy.deepcopy(state)))) local_store.persist_messages() commit_kafka_consumer(consumer, message.topic, message.partition, message.offset) + consumer.close() return last_flush_ts