forked from BigDataDevs/kafka-elasticsearch-consumer
-
Notifications
You must be signed in to change notification settings - Fork 0
/
kafka-es-indexer.properties
79 lines (64 loc) · 3.71 KB
/
kafka-es-indexer.properties
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
### Kafka properties ####################################
# all properties starting with this prefix - will be added to KafkaPRoperties object
# with the property name = original property name minus the prefix
kafka.consumer.property.prefix=consumer.kafka.property.
# Kafka Brokers host:port list: <host1>:<port1>,…,<hostN>:<portN>
# default: localhost:9092
# old: kafka.consumer.brokers.list=localhost:9092
consumer.kafka.property.bootstrap.servers=localhost:9092
# Kafka Consumer group name prefix -
# each indexer job will have a clientId = kafka.consumer.group.name + "_" + partitionNumber
# default: kafka_es_indexer
# old: kafka.consumer.group.name=kafka_es_indexer
consumer.kafka.property.group.id=kafka-batch-consumer
# kafka session timeout in ms - is kafka broker does not get a heartbeat from a consumer during this interval -
# consumer is marked as 'dead' and re-balancing is kicking off
# default: 30s x 1000 = 30000 ms
# old: kafka.consumer.session.timeout.ms=30000
consumer.kafka.property.session.timeout.ms=30000
# Max number of bytes to fetch in one poll request PER partition
# default: 1M = 1048576
# old: kafka.consumer.max.partition.fetch.bytes=1048576
consumer.kafka.property.max.partition.fetch.bytes=1048576
# application instance name:
# used as a common name prefix of all consumer threads
application.id=app1
# Kafka Topic from which the message has to be processed
# mandatory property, no default value specified.
kafka.consumer.source.topic=my_log_topic
#Number of consumer threads
kafka.consumer.pool.count=5
# time in ms to wait for new messages to arrive when calling poll() on Kafka brokers , if there are no messages right away
# WARNING: make sure this value is not higher than kafka.consumer.session.timeout.ms !!!
# default: 10 sec = 10 x 1000 = 10000 ms
kafka.consumer.poll.interval.ms=10000
# number of time poll records will be attempted to be re-processed in the event of a recoverable exception
# from the IBatchMessageProcessor.beforeCommitCallBack() method
kafka.consumer.poll.retry.limit=5
# time delay in ms before retires of the poll records in the event of a recoverable exception
# from the IBatchMessageProcessor.beforeCommitCallBack() method
kafka.consumer.poll.retry.delay.interval.ms=1000
# in the case when the max limit of recoverable exceptions was reached:
# if set to TRUE - ignore the exception and continue processing the next poll()
# if set to FALSE - throw ConcumerUnrecoverableException and shutdown the Consumer
kafka.consumer.ignore.overlimit.recoverable.errors=false
### ElasticSearch properties ####################################
# ElasticSearch Host and Port List for all the nodes
# Example: elasticsearch.hosts.list=machine_1_ip:9300,machine_2_ip:9300
elasticsearch.hosts.list=localhost:9300
# Name of the ElasticSearch Cluster that messages will be posted to;
# Tip: Its not a good idea to use the default name "ElasticSearch" as your cluster name.
elasticsearch.cluster.name=KafkaESCluster
# ES Index Name that messages will be posted/indexed to; this can be customized via using a custom IndexHandler implementation class
# Default: "kafkaESIndex"
elasticsearch.index.name=kafkaESIndex
# ES Index Type that messages will be posted/indexed to; this can be customized via using a custom IndexHandler implementation class
# Default: “kafkaESType”
elasticsearch.index.type=kafkaESType
#Sleep time in ms between re-attempts of sending batch to ES , in case of SERVICE_UNAVAILABLE response
# Default: 10000
elasticsearch.reconnect.attempt.wait.ms=10000
# number of times to try to re-connect to ES when performing batch indexing , if connection to ES fails
elasticsearch.indexing.retry.attempts=2
# sleep time in ms between attempts to connect to ES
elasticsearch.indexing.retry.sleep.ms=10000