Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Avro deserializer #29

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ requests==2.20.0
strict-rfc3339==0.7
nose==1.3.7
jsonschema==2.6.0
kafkian=0.13.0
requests==2.20.0
6 changes: 4 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
long_description = fh.read()

setup(name='pipelinewise-tap-kafka',
version='4.0.0',
version='4.1.0',
description='Singer.io tap for extracting data from Kafka topic - PipelineWise compatible',
long_description=long_description,
long_description_content_type='text/markdown',
Expand All @@ -20,7 +20,9 @@
'kafka-python==2.0.1',
'pipelinewise-singer-python==1.*',
'dpath==2.0.1',
'filelock==3.0.12'
'filelock==3.0.12',
'kafkian==0.13.0',
'requests==2.20.0'
],
extras_require={
"test": [
Expand Down
25 changes: 21 additions & 4 deletions tap_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import tap_kafka.sync as sync
import tap_kafka.common as common

LOGGER = singer.get_logger('tap_kafka')
LOGGER = singer.get_logger()

REQUIRED_CONFIG_KEYS = [
'bootstrap_servers',
Expand Down Expand Up @@ -42,11 +42,20 @@ def do_discovery(config):
"""Discover kafka topic by trying to connect to the topic and generate singer schema
according to the config"""
try:
if "avro_schema" in config and config["avro_schema"]:
LOGGER.info(f"avro_schema value set to {config['avro_schema']}, using avro deserializer.")
from kafkian.serde.deserialization import AvroDeserializer
avd = AvroDeserializer(schema_registry_url=config['avro_schema'])
deserializer = avd.deserialize
else:
def deserializer(m):
return json.loads(m.decode(config['encoding']))
consumer = KafkaConsumer(config['topic'],
group_id=config['group_id'],
enable_auto_commit=False,
consumer_timeout_ms=config['consumer_timeout_ms'],
# value_deserializer=lambda m: json.loads(m.decode('ascii'))
security_protocol=config['security_protocol'],
value_deserializer=deserializer,
bootstrap_servers=config['bootstrap_servers'].split(','))

except Exception as ex:
Expand Down Expand Up @@ -77,7 +86,7 @@ def generate_config(args_config):

# Add optional parameters with defaults
'primary_keys': args_config.get('primary_keys', {}),
'max_runtime_ms': args_config.get('max_runtime_ms', DEFAULT_MAX_RUNTIME_MS),
'max_runtime_ms': int(args_config.get('max_runtime_ms', DEFAULT_MAX_RUNTIME_MS)),
'commit_interval_ms': args_config.get('commit_interval_ms', DEFAULT_COMMIT_INTERVAL_MS),
'batch_size_rows': args_config.get('batch_size_rows', DEFAULT_BATCH_SIZE_ROWS),
'batch_flush_interval_ms': args_config.get('batch_flush_interval_ms', DEFAULT_BATCH_FLUSH_INTERVAL_MS),
Expand All @@ -89,7 +98,9 @@ def generate_config(args_config):
'encoding': args_config.get('encoding', DEFAULT_ENCODING),
'local_store_dir': args_config.get('local_store_dir', DEFAULT_LOCAL_STORE_DIR),
'local_store_batch_size_rows': args_config.get('local_store_batch_size_rows',
DEFAULT_LOCAL_STORE_BATCH_SIZE_ROWS)
DEFAULT_LOCAL_STORE_BATCH_SIZE_ROWS),
'avro_schema': args_config.get('avro_schema', ''),
'security_protocol': args_config.get('security_protocol', 'SSL')
}


Expand All @@ -100,9 +111,15 @@ def main_impl():

if args.discover:
do_discovery(args.config)

elif args.properties:
state = args.state or {}
sync.do_sync(kafka_config, args.properties, state, fn_get_args=get_args)
elif "catalog_path" in args and args.catalog_path:
state = args.state or {}
with open(args.catalog_path) as json_file:
catalog = json.load(json_file)
sync.do_sync(kafka_config, catalog, state, fn_get_args=get_args)
else:
LOGGER.info("No properties were selected")

Expand Down
16 changes: 13 additions & 3 deletions tap_kafka/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from .errors import InvalidBookmarkException

LOGGER = singer.get_logger('tap_kafka')
LOGGER = singer.get_logger()

LOG_MESSAGES_PERIOD = 1000 # Print log messages to stderr after every nth messages
UPDATE_BOOKMARK_PERIOD = 1000 # Update and send bookmark to stdout after nth messages
Expand Down Expand Up @@ -69,6 +69,14 @@ def init_local_store(kafka_config):

def init_kafka_consumer(kafka_config):
LOGGER.info('Initialising Kafka Consumer...')
if "avro_schema" in kafka_config and kafka_config["avro_schema"]:
LOGGER.info(f"avro_schema value set to {kafka_config['avro_schema']}, using avro deserializer.")
from kafkian.serde.deserialization import AvroDeserializer
avd = AvroDeserializer(schema_registry_url=kafka_config['avro_schema'])
deserializer = avd.deserialize
else:
def deserializer(m):
return json.loads(m.decode(kafka_config['encoding']))
return KafkaConsumer(
# Required parameters
kafka_config['topic'],
Expand All @@ -83,9 +91,10 @@ def init_kafka_consumer(kafka_config):
max_poll_interval_ms=kafka_config['max_poll_interval_ms'],

# Non-configurable parameters
enable_auto_commit=False,
enable_auto_commit=True,
auto_offset_reset='earliest',
value_deserializer=lambda m: json.loads(m.decode(kafka_config['encoding'])))
security_protocol=kafka_config['security_protocol'],
value_deserializer=deserializer)


def kafka_message_to_singer_record(message, topic, primary_keys):
Expand Down Expand Up @@ -216,6 +225,7 @@ def read_kafka_topic(consumer, local_store, kafka_config, state, fn_get_args):
local_store.insert(singer.format_message(singer.StateMessage(value=copy.deepcopy(state))))
local_store.persist_messages()
commit_kafka_consumer(consumer, message.topic, message.partition, message.offset)
consumer.close()

return last_flush_ts

Expand Down