Skip to content

Commit

Permalink
Merge pull request #140 from LibertyAces/feature/confkafka
Browse files Browse the repository at this point in the history
Introduce confluent kafka instead of aiokafka.
  • Loading branch information
PremyslCerny authored May 26, 2022
2 parents 836c8ed + e43d704 commit 5fa9174
Show file tree
Hide file tree
Showing 8 changed files with 174 additions and 530 deletions.
2 changes: 0 additions & 2 deletions bspump/kafka/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from .connection import KafkaConnection
from .source import KafkaSource
from .sink import KafkaSink
from .batchsink import KafkaBatchSink
from .keyfilter import KafkaKeyFilter
from .topic_initializer import KafkaTopicInitializer

Expand All @@ -10,6 +9,5 @@
"KafkaSource",
"KafkaSink",
"KafkaKeyFilter",
"KafkaBatchSink",
"KafkaTopicInitializer",
]
72 changes: 0 additions & 72 deletions bspump/kafka/batchsink.py

This file was deleted.

92 changes: 6 additions & 86 deletions bspump/kafka/connection.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
import logging
import re

import aiokafka

from ..abc.connection import Connection

Expand All @@ -19,100 +16,23 @@ class KafkaConnection(Connection):
.. code:: python
config = {"compression_type": "gzip"}
config = {"bootstrap_servers": "localhost:9092"}
app = bspump.BSPumpApplication()
svc = app.get_service("bspump.PumpService")
svc.add_connection(
bspump.kafka.KafkaConnection(app, "KafkaConnection", config)
)
..
``ConfigDefaults`` options:
Standard Kafka configuration options can be used,
as specified in librdkafka library,
where the options are simply passed to:
compression_type (str): Kafka supports several compression types: ``gzip``, ``snappy`` and ``lz4``.
This option needs to be specified in Kafka Producer only, Consumer will decompress automatically.
security_protocol (str): Protocol used to communicate with brokers.
Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT.
sasl_mechanism (str): Authentication mechanism when security_protocol
is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are:
PLAIN, GSSAPI, SCRAM-SHA-256, SCRAM-SHA-512. Default: PLAIN
sasl_plain_username (str): username for sasl PLAIN authentication.
Default: None
sasl_plain_password (str): password for sasl PLAIN authentication.
Default: None
https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
"""

ConfigDefaults = {
'bootstrap_servers': 'localhost:9092', # One or more URLs separated by whitespace or semicolon
'compression_type': '',
'security_protocol': 'PLAINTEXT',
'sasl_mechanism': 'PLAIN',
'sasl_plain_username': '',
'sasl_plain_password': '',

}

def __init__(self, app, id=None, config=None):
super().__init__(app, id=id, config=config)
self.Loop = app.Loop

async def create_producer(self, **kwargs):
"""
Description:
:returns:
"""
producer = aiokafka.AIOKafkaProducer(
loop=self.Loop,
bootstrap_servers=self.get_bootstrap_servers(),
compression_type=self.get_compression(),
security_protocol=self.Config.get('security_protocol'),
sasl_mechanism=self.Config.get('sasl_mechanism'),
sasl_plain_username=self.Config.get('sasl_plain_username') or None,
sasl_plain_password=self.Config.get('sasl_plain_password') or None,
**kwargs
)
return producer

def create_consumer(self, *topics, **kwargs):
"""
Description:
:returns:
"""
consumer = aiokafka.AIOKafkaConsumer(
*topics,
loop=self.Loop,
bootstrap_servers=self.get_bootstrap_servers(),
enable_auto_commit=False,
security_protocol=self.Config.get('security_protocol'),
sasl_mechanism=self.Config.get('sasl_mechanism'),
sasl_plain_username=self.Config.get('sasl_plain_username') or None,
sasl_plain_password=self.Config.get('sasl_plain_password') or None,
**kwargs
)
return consumer

def get_bootstrap_servers(self):
"""
Description:
:returns:
"""
return [
url for url
in re.split(r"[\s;]+", self.Config['bootstrap_servers'])
if url
]

def get_compression(self):
"""
Description: Returns compression type to use in connection
:returns:
"""
compression_type = self.Config.get("compression_type")
if compression_type in ("", "none", "None"):
compression_type = None
return compression_type
9 changes: 4 additions & 5 deletions bspump/kafka/keyfilter.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,12 @@ def __init__(self, app, pipeline, keys, id=None, config=None):
self.Keys = frozenset(keys)



def process(self, context, event):
kafka_ctx = context.get("kafka")
assert (kafka_ctx is not None)
kafka_key = context.get("kafka_key")
assert (kafka_key is not None)

key = kafka_ctx.key
if key is not None and key in self.Keys:
if kafka_key is not None and kafka_key in self.Keys:
return event

else:
return None
Loading

0 comments on commit 5fa9174

Please sign in to comment.