Skip to content

Commit

Permalink
Generate kafka-python and pykafka docker image (apache#165)
Browse files Browse the repository at this point in the history
  • Loading branch information
hangc0276 authored Jul 8, 2020
1 parent f36281c commit af7ccf8
Show file tree
Hide file tree
Showing 4 changed files with 216 additions and 0 deletions.
20 changes: 20 additions & 0 deletions integrations/python2-kafka-python/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

FROM python:2

WORKDIR /usr/src/app

COPY . .

64 changes: 64 additions & 0 deletions integrations/python2-kafka-python/produceConsume.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

#-*- coding:utf-8 -*-

from kafka import KafkaConsumer, KafkaProducer
import os
import traceback

if __name__ == '__main__':
broker_list = os.getenv("KOP_BROKER") if os.getenv("KOP_BROKER") is not None else "localhost:9092"
topic = os.getenv("KOP_TOPIC") if os.getenv("KOP_TOPIC") is not None else "test"
should_produce = os.getenv("KOP_PRODUCE") if os.getenv("KOP_PRODUCE") is not None \
and os.getenv("KOP_PRODUCE") == 'true' else False
should_consume = os.getenv("KOP_CONSUME") if os.getenv("KOP_CONSUME") is not None \
and os.getenv("KOP_CONSUME") == 'true' else False
limit = int(os.getenv("KOP_LIMIT")) if os.getenv("KOP_LIMIT") is not None else 10
group_id = os.getenv("KOP_GROUPID") if os.getenv("KOP_GROUPID") is not None else 'test_kop'

if should_produce is True:
print "starting to produce\n"
producer = None
try:
producer = KafkaProducer(bootstrap_servers=broker_list)
msg = "hello pulsar kop, id: "
for x in range(0, limit):
producer.send(topic, msg + str(x))
print "produced all messages successfully\n"
except Exception as e:
print "Exception: " + traceback.format_exc()
raise e
finally:
if producer is not None:
producer.close()

if should_consume is True:
print "starting to consume\n"
consumer = None
try:
consumer = KafkaConsumer(topic, bootstrap_servers=broker_list, group_id=group_id, auto_offset_reset='smallest')
count = 0
for message in consumer:
count += 1
print "receive message: " + str(message.value) + '\n'
if count == limit:
break
print "consumed all messages successfully\n"
except Exception as e:
print "Exception: " + traceback.format_exc()
raise e
finally:
if consumer is not None:
consumer.close()
27 changes: 27 additions & 0 deletions integrations/python2-pykafka/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

FROM python:2

RUN apt update && apt install -y git build-essential
RUN git clone https://github.com/edenhill/librdkafka.git \
&& cd librdkafka \
&& git checkout v1.2.2 \
&& ./configure --prefix /usr \
&& make -j4 \
&& make install

WORKDIR /usr/src/app

COPY . .
105 changes: 105 additions & 0 deletions integrations/python2-pykafka/produceConsume.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

#-*- coding:utf-8 -*-

from pykafka import KafkaClient
import os
import traceback

if __name__ == '__main__':
broker_list = os.getenv("KOP_BROKER") if os.getenv("KOP_BROKER") is not None else "localhost:9092"
topic = os.getenv("KOP_TOPIC") if os.getenv("KOP_TOPIC") is not None else "test"
should_produce = True if os.getenv("KOP_PRODUCE") is not None \
and os.getenv("KOP_PRODUCE") == 'true' else False
should_consume = True if os.getenv("KOP_CONSUME") is not None \
and os.getenv("KOP_CONSUME") == 'true' else False
limit = int(os.getenv("KOP_LIMIT")) if os.getenv("KOP_LIMIT") is not None else 10
group_id = os.getenv("KOP_GROUPID") if os.getenv("KOP_GROUPID") is not None else 'test_kop'
balance_consume = True if os.getenv("KOP_BALANCE_CONSUME") is not None \
and os.getenv("KOP_BALANCE_CONSUME") == 'true' else False
use_rdkafka = True if os.getenv("KOP_USE_RDKAFKA") is not None \
and os.getenv("KOP_USE_RDKAFKA") == 'true' else False

print broker_list
client = KafkaClient(hosts=broker_list)
myTopic = client.topics[topic]
if should_produce is True and use_rdkafka is False:
print "starting to produce\n"
with myTopic.get_sync_producer() as producer:
try:
for i in range(limit):
producer.produce("hello kop, id: " + str(i))
print "hello kop, id: " + str(i)
print "produced all messages successfully\n"
except Exception as e:
print "Exception: " + traceback.format_exc()
raise e

if should_produce is True and use_rdkafka is True:
print "starting to produce\n"
producer = None
try:
producer = myTopic.get_producer(use_rdkafka = use_rdkafka)
for i in range(limit):
producer.produce("hello, kop, id: " + str(i))
print "produced all messages successfully\n"
except Exception as e:
print "Exception: " + traceback.format_exc()
raise e
finally:
if producer is not None:
producer.close()

if should_consume is True and balance_consume is False:
print "starting to consume\n"
count = 0
consumer = None
try:
consumer = myTopic.get_simple_consumer(use_rdkafka=use_rdkafka)
for message in consumer:
if message is not None:
count += 1
print message.offset, message.value
if count == limit:
break
except Exception as e:
print "Exception: " + traceback.format_exc()
raise e
finally:
if consumer is not None:
consumer.close()

if should_consume is True and balance_consume is True:
print "starting to consume\n"
count = 0
consumer = None
try:
balanced_consumer = myTopic.get_balanced_consumer(
consumer_group=group_id,
auto_commit_enable=True,
use_rdkafka=use_rdkafka)
for message in consumer:
if message is not None:
count += 1
print message.offset, message.value
if count == limit:
break
except Exception as e:
print "Exception: " + traceback.format_exc()
raise e
finally:
if consumer is not None:
consumer.close()

0 comments on commit af7ccf8

Please sign in to comment.