diff --git a/README.md b/README.md index 014de84..8defd6e 100644 --- a/README.md +++ b/README.md @@ -3,36 +3,13 @@ ## Purpose This ansible role installs kafka cluster or standalone instance based on KRaft protocol. -## Configuration - -| Variable | Description | Default | -| ------ | ------ | ------ | -| kafka_version | Kafka version | 3.4.0 | -| kafka_scala_version | Kafka scala version | 2.13 | -| kafka_openjdk_version | Kafka OpenJDK version | 17 | -| kafka_download_url | Kafka archive download url | `https://dlcdn.apache.org/kafka/{{ kafka_version }}/kafka_{{ kafka_scala_version }}-{{ kafka_version }}.tgz` | -| kafka_hosts_group | Kafka hosts group in ansible inventory | kafka | -| kafka_user | Kafka user | kafka | -| kafka_group | Kafka group | kafka | -| kafka_config_directory | Kafka config directory | `/etc/kafka` | -| kafka_data_directory | Kafka data directory | `/var/lib/kafka` | -| kafka_log_directory | Kafka log directory | `/var/log/kafka` | -| kafka_extra_files | Defines extra files with some content. Files are located in `kafka_config_directory` | {} | -| kafka_extra_envs | Defines extra environment variables | {} | -| kafka_server_properties | Defines extra parameters in server.properties config | {} | -| kafka_log4j_properties | Overrides default log4j.properties | "" | -| kafka_sasl_enabled | Enables/Disables SASL | false | -| kafka_password | Password of kafka user (If SASL is enabled) | "changeMe" | -| kafka_users | Defines other users if it is required (if SASL is enabled) | `{admin.password: "changeMe"}` | -| kafka_opts | Defines KAFKA_OPTS environment variable | "" | - ## Example of inventory and playbook 1) inventory file ```ini [kafka] -kafka-1.example.com kafka_node_id=1 -kafka-2.example.com kafka_node_id=2 -kafka-3.example.com kafka_node_id=3 +kafka-1.example.com kafka_node_id=1 kafka_process_roles=broker,controller +kafka-2.example.com kafka_node_id=2 kafka_process_roles=broker,controller +kafka-3.example.com kafka_node_id=3 kafka_process_roles=broker,controller ``` 2) Playbook ```yaml diff --git a/defaults/main.yaml b/defaults/main.yaml index c939de5..136b14b 100644 --- a/defaults/main.yaml +++ b/defaults/main.yaml @@ -1,9 +1,8 @@ --- -kafka_version: 3.4.0 +kafka_version: 3.6.0 kafka_scala_version: 2.13 kafka_openjdk_version: 17 kafka_download_url: https://dlcdn.apache.org/kafka/{{ kafka_version }}/kafka_{{ kafka_scala_version }}-{{ kafka_version }}.tgz - # must be defined as same as ansible inventory group kafka_hosts_group: kafka kafka_user: kafka @@ -12,173 +11,52 @@ kafka_config_directory: /etc/kafka kafka_data_directory: /var/lib/kafka kafka_log_directory: /var/log/kafka kafka_extra_files: {} -# prometheus-jmx-exporter.yml: | -# --- -# lowercaseOutputName: true -# lowercaseOutputLabelNames: true -# whitelistObjectNames: ["kafka.controller:*", "kafka.log:*", "kafka.network:*", "kafka.server:*"] -# rules: -# # kafka.controller:* -# - pattern: kafka.controller<>Count -# name: jmx_kafka_controller_$1_total -# type: COUNTER -# help: "" -# - pattern: kafka.controller<>Value -# name: jmx_kafka_controller_activecontroller_total -# type: GAUGE -# help: "" -# # kafka.log:* -# - pattern: kafka.log<>Count -# name: jmx_kafka_log_flush_total -# type: COUNTER -# help: "" -# - pattern: kafka.log<>Value -# name: jmx_kafka_log_offlinedirectory_total -# type: GAUGE -# help: "" -# # kafka.network:* -# - pattern: kafka.network<>Count -# name: jmx_kafka_network_errors_total -# type: COUNTER -# labels: -# request: "$1" -# error: "$2" -# help: "" -# - pattern: kafka.network<>Count -# name: jmx_kafka_network_requests_total -# type: COUNTER -# labels: -# request: "$1" -# version: "$2" -# help: "" -# - pattern: kafka.network<>Count -# name: jmx_kafka_network_$1_total -# type: COUNTER -# labels: -# request: "$2" -# help: "" -# - pattern: kafka.network<>Count -# name: jmx_kafka_network_$1time_seconds -# valueFactor: 0.001 -# type: COUNTER -# labels: -# request: "$2" -# help: "" -# - pattern: kafka.network<>Value -# name: jmx_kafka_network_$1_$2 -# type: GAUGE -# help: "" -# #kafka.server:* -# - pattern: kafka.server<>Value -# name: jmx_kafka_server_broker_state -# type: GAUGE -# help: "Broker state: 1=Starting, 2=RecoveringFromUncleanShutdown, 3=RunningAsBroker, 4=RunningAsController, 6=PendingControlledShutdown, 7=BrokerShuttingDown" -# - pattern: kafka.server<>Count -# name: jmx_kafka_server_brokertopic_$1_total -# type: COUNTER -# help: "" -# - pattern: kafka.server<>Count -# name: jmx_kafka_server_brokertopic_$1_total -# type: COUNTER -# labels: -# topic: $2 -# help: "" -# - pattern: kafka.server<>Value -# name: jmx_kafka_server_delayedoperation_total -# type: GAUGE -# labels: -# purgatory: $1 -# help: "" -# - pattern: kafka.server<>Count -# name: jmx_kafka_server_$1_total -# type: COUNTER -# help: "" -# - pattern: kafka.server<>Value -# name: jmx_kafka_server_$1_total -# type: GAUGE -# help: "" -# - pattern: kafka.server<>queue-size -# name: jmx_kafka_server_queue_total -# type: GAUGE -# labels: -# type: $1 -# help: "" -# - pattern: kafka.server<>Value -# name: jmx_kafka_server_replicafetcherlag_total -# type: GAUGE -# labels: -# thread: $1 -# topic: $2 -# partition: $3 -# help: "Lag in messages per follower replica" -# - pattern: kafka.server<>OneMinuteRate -# name: jmx_kafka_server_requesthandleravgidle_seconds -# type: GAUGE -# valueFactor: 0.000000001 -# help: "The average fraction of time the request handler threads are idle" -# - pattern: kafka.server<>Value -# name: jmx_kafka_server_replicafetcherlag_max -# type: GAUGE -# help: "Max lag in messages btw follower and leader replicas" -# - pattern: kafka.server<>Count -# name: jmx_kafka_server_zookeeper_connection_total -# type: COUNTER -# labels: -# event_type: $1 -# help: "ZooKeeper connection statuses" -# - pattern: kafka.server<>Count -# name: jmx_kafka_server_zookeeper_requests_total -# type: COUNTER -# help: "ZooKeeper client requests" -# - pattern: kafka.server<>(.+)-count -# name: jmx_kafka_server_socket_$3_total -# type: COUNTER -# labels: -# listener: $1 -# thread: $2 -# help: "" -# - pattern: kafka.server<>(.+)-total -# name: jmx_kafka_server_socket_$3_total -# type: GAUGE -# labels: -# listener: $1 -# thread: $2 -# help: "" - kafka_extra_envs: {} -# KAFKA_HEAP_OPTS: -Xms4G -Xmx4G -# KAFKA_JVM_PERFORMANCE_OPTS: -server -XX:MetaspaceSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80 -Djava.awt.headless=true -Duser.timezone=Europe/Moscow - + #KAFKA_HEAP_OPTS: -Xms4G -Xmx4G + #KAFKA_JVM_PERFORMANCE_OPTS: -server -XX:MetaspaceSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true -Duser.timezone=Europe/Moscow +# https://kafka.apache.org/documentation/#brokerconfigs_process.roles +kafka_process_roles: broker,controller kafka_server_properties: {} -# controller.quorum.election.backoff.max.ms: 2000 -# auto.create.topics.enable: 'false' -# default.replication.factor: 2 -# min.insync.replicas: 2 -# message.max.bytes: 10485760 -# num.network.threads: 128 -# num.io.threads: 256 -# num.partitions: 1 -# num.replica.fetchers: 3 -# num.recovery.threads.per.data.dir: 2 -# socket.send.buffer.bytes: -1 -# socket.receive.buffer.bytes: -1 -# socket.request.max.bytes: 104857600 -# offsets.topic.num.partitions: 20 -# offsets.topic.replication.factor: 3 -# transaction.state.log.num.partitions: 20 -# transaction.state.log.replication.factor: 3 -# log.retention.hours: 168 -# log.retention.bytes: -1 -# log.segment.bytes: 104857600 -# log.retention.check.interval.ms: 60000 -# log.flush.scheduler.interval.ms: 1000 - + #auto.create.topics.enable: 'false' + #default.replication.factor: 2 + #min.insync.replicas: 2 + #message.max.bytes: 10485760 + #num.network.threads: 128 + #num.io.threads: 256 + #num.partitions: 1 + #num.replica.fetchers: 3 + #num.recovery.threads.per.data.dir: 2 + #socket.send.buffer.bytes: -1 + #socket.receive.buffer.bytes: -1 + #socket.request.max.bytes: 104857600 + #offsets.topic.num.partitions: 20 + #offsets.topic.replication.factor: 3 + #transaction.state.log.num.partitions: 20 + #transaction.state.log.replication.factor: 3 + #log.retention.hours: 168 + #log.retention.bytes: -1 + #log.segment.bytes: 104857600 + #log.retention.check.interval.ms: 60000 + #log.flush.scheduler.interval.ms: 1000 kafka_log4j_properties: "" # enables SASL authorization kafka_sasl_enabled: false +# enables ACLs +# https://kafka.apache.org/documentation/#security_authz +kafka_acl_enabled: false +# defines a password of default kafka superuser kafka_password: "changeMe" +# defines extra users kafka_users: admin: + superuser: true password: changeMe + foo: + superuser: false + password: bar # kafka_opts provides you with an opportunity to define KAFKA_OPTS environment variable -kafka_opts: "" # -javaagent:/opt/prometheus/jmx_javaagent.jar=9071:/etc/kafka/prometheus-jmx-exporter.yml +kafka_opts: "" +# enables mirror maker if you need to migrate some data between two or more kafka clusters +# https://kafka.apache.org/documentation/#mirrormakerconfigs +kafka_mirror_maker_enabled: false +kafka_mirror_maker_properties: {} diff --git a/handlers/main.yaml b/handlers/main.yaml index de7a065..2a07cba 100644 --- a/handlers/main.yaml +++ b/handlers/main.yaml @@ -5,3 +5,10 @@ daemon_reload: true enabled: true state: restarted + +- name: Restart kafka-mirror-maker + ansible.builtin.systemd: + name: kafka-mirror-maker + daemon_reload: true + enabled: true + state: restarted diff --git a/meta/main.yaml b/meta/main.yaml index f1787df..e6d9c93 100644 --- a/meta/main.yaml +++ b/meta/main.yaml @@ -1,7 +1,7 @@ --- dependencies: [] galaxy_info: - description: A purpose of a role is to deploy kafka cluster/standalone instance + description: This ansible role installs kafka cluster or standalone instance based on KRaft protocol. min_ansible_version: 2.13 platforms: - name: Ubuntu diff --git a/tasks/install_kafka.yaml b/tasks/install_kafka.yaml index df1097b..d10c60a 100644 --- a/tasks/install_kafka.yaml +++ b/tasks/install_kafka.yaml @@ -22,7 +22,7 @@ owner: "{{ kafka_user }}" group: "{{ kafka_group }}" mode: "0750" - with_items: + loop: - "{{ kafka_config_directory }}" - "{{ kafka_data_directory }}" - "{{ kafka_log_directory }}" @@ -105,6 +105,16 @@ mode: "0644" notify: Restart kafka +- name: Kafka | Create mirror maker config + ansible.builtin.template: + src: connect-mirror-maker.properties.j2 + dest: "{{ kafka_config_directory }}/connect-mirror-maker.properties" + owner: "{{ kafka_user }}" + group: "{{ kafka_group }}" + mode: "0640" + notify: Restart kafka-mirror-maker + when: kafka_mirror_maker_enabled + - name: Kafka | Check Cluster UUID ansible.builtin.stat: path: "{{ kafka_config_directory }}/cluster_uuid" @@ -154,9 +164,27 @@ mode: "0644" notify: Restart kafka +- name: Kafka | Create mirror maker systemd service + ansible.builtin.template: + src: kafka-mirror-maker.service.j2 + dest: /etc/systemd/system/kafka-mirror-maker.service + owner: root + group: root + mode: "0640" + notify: Restart kafka-mirror-maker + when: kafka_mirror_maker_enabled + - name: Kafka | Start service ansible.builtin.systemd: name: kafka daemon_reload: true enabled: true state: started + +- name: Kafka | Start mirror maker systemd service + ansible.builtin.systemd: + name: kafka-mirror-maker + daemon_reload: true + enabled: true + state: started + when: kafka_mirror_maker_enabled diff --git a/tasks/main.yaml b/tasks/main.yaml index 0b33bcc..588ad36 100644 --- a/tasks/main.yaml +++ b/tasks/main.yaml @@ -1,8 +1,10 @@ --- -- ansible.builtin.import_tasks: install_openjdk.yaml +- name: Install openjdk + ansible.builtin.import_tasks: install_openjdk.yaml tags: - java -- ansible.builtin.import_tasks: install_kafka.yaml +- name: Install kafka + ansible.builtin.import_tasks: install_kafka.yaml tags: - kafka diff --git a/templates/connect-mirror-maker.properties.j2 b/templates/connect-mirror-maker.properties.j2 new file mode 100644 index 0000000..e9fdb9d --- /dev/null +++ b/templates/connect-mirror-maker.properties.j2 @@ -0,0 +1,5 @@ +{% if kafka_mirror_maker_properties is defined and kafka_mirror_maker_properties %} +{% for key, value in kafka_mirror_maker_properties.items() %} +{{ key }}={{ value }} +{% endfor %} +{% endif %} diff --git a/templates/kafka-mirror-maker.service.j2 b/templates/kafka-mirror-maker.service.j2 new file mode 100644 index 0000000..0d976fb --- /dev/null +++ b/templates/kafka-mirror-maker.service.j2 @@ -0,0 +1,17 @@ +[Unit] +Description=Apache Kafka Mirror Maker +Wants=network.target +After=network.target + +[Service] +Type=simple +User={{ kafka_user }} +Group={{ kafka_group }} +ExecStart=/opt/kafka/bin/connect-mirror-maker.sh {{ kafka_config_directory }}/connect-mirror-maker.properties +ExecStop=/bin/kill $MAINPID +Restart=always +RestartSec=10 +KillMode=process + +[Install] +WantedBy=multi-user.target diff --git a/templates/kafka.service.j2 b/templates/kafka.service.j2 index ad6e0cb..8f5b4a9 100644 --- a/templates/kafka.service.j2 +++ b/templates/kafka.service.j2 @@ -8,7 +8,7 @@ Type=simple User={{ kafka_user }} Group={{ kafka_group }} Environment="LOG_DIR={{ kafka_log_directory }}" -Environment="KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:{{ kafka_config_directory }}/log4j.properties +Environment="KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:{{ kafka_config_directory }}/log4j.properties" {% if kafka_sasl_enabled or kafka_opts %} Environment="KAFKA_OPTS={% if kafka_sasl_enabled %}-Djava.security.auth.login.config={{ kafka_config_directory }}/jaas.conf{% endif %} {% if kafka_opts %}{{ kafka_opts }}{% endif %}" {% endif %} @@ -24,4 +24,4 @@ RestartSec=10 KillMode=process [Install] -WantedBy=multi-user.target \ No newline at end of file +WantedBy=multi-user.target diff --git a/templates/server.properties.j2 b/templates/server.properties.j2 index ffcd0a6..d2456fd 100644 --- a/templates/server.properties.j2 +++ b/templates/server.properties.j2 @@ -1,25 +1,44 @@ # {{ ansible_managed }} {% set kafka_hosts = groups[kafka_hosts_group] %} -process.roles=broker,controller -{% if kafka_hosts | length == 3 %} +process.roles={{ kafka_process_roles }} +{% if kafka_hosts | length >= 3 %} +{%- set kafka_controllers = [] -%} +{%- for host in kafka_hosts -%} +{%- if 'controller' in hostvars[host]['kafka_process_roles'] -%} +{%- set node_id = hostvars[host]['kafka_node_id'] | string -%} +{%- set node_address = hostvars[host]['ansible_host'] | default(hostvars[host]['ansible_default_ipv4']['address']) -%} +{%- set controller = node_id + '@' + node_address + ':9093' -%} +{{ kafka_controllers.append(controller) }} +{%- endif -%} +{%- endfor %} node.id={{ kafka_node_id }} -controller.quorum.voters={% for host in kafka_hosts %}{{ hostvars[host]['kafka_node_id'] }}@{{ hostvars[host]['ansible_host'] | default(hostvars[host]['ansible_default_ipv4']['address']) }}:9093{{ ',' if not loop.last }}{% endfor %} +controller.quorum.voters={% for controller in kafka_controllers %}{{ controller }}{{ ',' if not loop.last }}{% endfor %} {% else %} node.id=1 controller.quorum.voters=1@localhost:9093 {% endif %} +listeners={% for listener in kafka_process_roles.split(',') %}{% if listener == 'broker' %}BROKER://:9092{% elif listener == 'controller' %}CONTROLLER://:9093{% endif %}{{ ',' if not loop.last }}{% endfor %} + +inter.broker.listener.name=BROKER {% if kafka_sasl_enabled %} -listeners=SASL_PLAINTEXT://:9092,CONTROLLER://:9093 -inter.broker.listener.name=SASL_PLAINTEXT sasl.enabled.mechanisms=PLAIN +sasl.mechanism.controller.protocol=PLAIN sasl.mechanism.inter.broker.protocol=PLAIN -{% else %} -listeners=PLAINTEXT://:9092,CONTROLLER://:9093 -inter.broker.listener.name=PLAINTEXT {% endif %} controller.listener.names=CONTROLLER -listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL +listener.security.protocol.map=CONTROLLER:{{ 'SASL_PLAINTEXT' if kafka_sasl_enabled else 'PLAINTEXT' }},BROKER:{{ 'SASL_PLAINTEXT' if kafka_sasl_enabled else 'PLAINTEXT' }} +{% if kafka_sasl_enabled and kafka_acl_enabled %} +{% set kafka_superusers = [] %} +{% for key, value in kafka_users.items() %} +{% if value.superuser is defined and value.superuser | bool %} +{{ kafka_superusers.append(key) }} +{% endif %} +{% endfor %} +authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer +super.users=User:kafka;{% for superuser in kafka_superusers %}User:{{ superuser }}{{ ';' if not loop.last }}{% endfor %} +{% endif %} + log.dirs={{ kafka_data_directory }} {% if kafka_server_properties is defined and kafka_server_properties %} {% for key, value in kafka_server_properties.items() %}