Skip to content

Commit

Permalink
Added mirror command
Browse files Browse the repository at this point in the history
  • Loading branch information
JSchlarb committed Mar 10, 2024
1 parent 5695c0d commit beee4f7
Show file tree
Hide file tree
Showing 7 changed files with 222 additions and 26 deletions.
3 changes: 2 additions & 1 deletion src/main/kotlin/de/denktmit/kafka/KafkaCliApplication.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package de.denktmit.kafka

import de.denktmit.kafka.command.MirrorConfig
import de.denktmit.kafka.config.KafkaCliConfiguration
import org.springframework.boot.Banner
import org.springframework.boot.SpringApplication
Expand All @@ -8,7 +9,7 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties


@SpringBootApplication
@EnableConfigurationProperties(KafkaCliConfiguration::class)
@EnableConfigurationProperties(KafkaCliConfiguration::class, MirrorConfig::class)
class KafkaCliApplication

fun main(args: Array<String>) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@ package de.denktmit.kafka.command

import de.denktmit.kafka.config.CsvConfig.Companion.CSV_FORMAT
import de.denktmit.kafka.config.KafkaCliConfiguration
import de.denktmit.kafka.utils.*
import de.denktmit.kafka.utils.b64Key
import de.denktmit.kafka.utils.b64Value
import de.denktmit.kafka.utils.logEveryNthObservable
import de.denktmit.kafka.utils.writeCSV
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.ListOffsetsResult
import org.apache.kafka.clients.admin.OffsetSpec
import org.apache.kafka.common.TopicPartition
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.boot.autoconfigure.kafka.KafkaProperties
import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate
import org.springframework.shell.standard.ShellComponent
import org.springframework.shell.standard.ShellMethod
Expand All @@ -23,16 +27,20 @@ import java.util.concurrent.atomic.AtomicLong

@ShellComponent
class KafkaTopicBackupCommand(
var receiverOptions: ReceiverOptions<ByteBuffer, ByteBuffer>,
var adminClient: AdminClient,
val receiverOptions: ReceiverOptions<ByteBuffer, ByteBuffer>,
val kafkaProperties: KafkaProperties,
val config: KafkaCliConfiguration
) {
companion object {
val LOGGER: Logger = LoggerFactory.getLogger(KafkaTopicBackupCommand::class.java)
}

fun discoverTopics(topics: List<String>) =
Flux.fromIterable(adminClient.describeTopics(topics).topicNameValues().values)

fun discoverTopics(topics: List<String>): Flux<MutableMap.MutableEntry<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo>> {

val adminClient = AdminClient.create(kafkaProperties.buildAdminProperties(null))

return Flux.fromIterable(adminClient.describeTopics(topics).topicNameValues().values)
.flatMap { value ->
Flux.fromIterable(
adminClient.listOffsets(
Expand All @@ -42,6 +50,7 @@ class KafkaTopicBackupCommand(
.get().entries
)
}
}

fun consumeTopics(it: MutableMap.MutableEntry<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo>): Flux<*> {
val path = Path.of("${it.key.topic()}-${it.key.partition()}.csv")
Expand Down
169 changes: 169 additions & 0 deletions src/main/kotlin/de/denktmit/kafka/command/KafkaTopicMirrorCommand.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package de.denktmit.kafka.command

import de.denktmit.kafka.config.KafkaCliConfiguration
import de.denktmit.kafka.utils.getTopics
import de.denktmit.kafka.utils.logEveryNthObservable
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.DisconnectException
import org.apache.kafka.common.errors.TopicAuthorizationException
import org.jline.terminal.Terminal
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.boot.autoconfigure.kafka.KafkaProperties
import org.springframework.boot.context.properties.ConfigurationProperties
import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate
import org.springframework.shell.standard.AbstractShellComponent
import org.springframework.shell.standard.ShellComponent
import org.springframework.shell.standard.ShellMethod
import reactor.core.publisher.Flux
import reactor.core.scheduler.Schedulers
import reactor.kafka.receiver.ReceiverOptions
import reactor.kafka.sender.KafkaSender
import reactor.kafka.sender.SenderOptions
import reactor.kafka.sender.SenderRecord
import reactor.util.retry.Retry
import java.nio.ByteBuffer
import java.time.Duration
import java.util.concurrent.CountDownLatch


@ShellComponent
class KafkaTopicMirrorCommand(
var receiverOptions: ReceiverOptions<ByteBuffer, ByteBuffer>,
val senderOptions: SenderOptions<ByteBuffer, ByteBuffer>,
kafkaProperties: KafkaProperties,
val config: KafkaCliConfiguration,
val mirrorConfig: MirrorConfig
) : AbstractShellComponent() {
companion object {
val LOGGER: Logger = LoggerFactory.getLogger(KafkaTopicMirrorCommand::class.java)
}


val consumerAdminClient: AdminClient = AdminClient.create(kafkaProperties.buildConsumerProperties(null))
val producerAdminClient: AdminClient = AdminClient.create(kafkaProperties.buildProducerProperties(null))

val countDownLatch = CountDownLatch(1)
val scheduler = Schedulers.boundedElastic()

fun subscribe(flux: Flux<*>) {
val sub = flux.doFinally { countDownLatch.countDown() }.subscribe()

terminal.handle(Terminal.Signal.INT) { sub.dispose() }

countDownLatch.await()
scheduler.dispose()
}

@ShellMethod(key = ["mirror"], value = "unidirectional topic mirroring")
fun mirror() {
val partitions = replicateTopicConfigs()

val flux = Flux.fromIterable(partitions).flatMap { partitions ->
val opts = receiverOptions.assignment(partitions)
// .addAssignListener { onAssign -> onAssign.forEach { assign -> assign.seekToBeginning(); } }

val topicName = partitions[0].topic()

val inFlux = ReactiveKafkaConsumerTemplate(opts).receiveAutoAck().subscribeOn(scheduler)
.retryWhen(
Retry.fixedDelay(Long.MAX_VALUE, Duration.ofSeconds(10))
.filter { throwable -> throwable is DisconnectException }
)
.onErrorResume(TopicAuthorizationException::class.java) { ex: TopicAuthorizationException ->
LOGGER.error("Unable to consume topic", ex)
Flux.empty()
}
.map {
SenderRecord.create(
ProducerRecord(it.topic(), it.partition(), it.timestamp(), it.key(), it.value(), it.headers()),
null
)
}
.doFinally { countDownLatch.countDown() }
.logEveryNthObservable(
{ rn, _ -> LOGGER.info("[{}] consumed {} msgs ", topicName, rn) },
{ rn -> LOGGER.info("[{}] consumed {} msgs in total ", topicName, rn) },
10_000
)

KafkaSender.create(senderOptions).send(inFlux)
.onErrorResume(TopicAuthorizationException::class.java) { ex: TopicAuthorizationException ->
LOGGER.error("Unable to produce to topic", ex)
Flux.empty()
}
.logEveryNthObservable(
{ rn, _ -> LOGGER.info("[{}] produced {} msgs ", topicName, rn) },
{ rn -> LOGGER.info("[{}] produced {} msgs in total ", topicName, rn) },
10_000
)
}

subscribe(flux)
}

private fun topicFilter(): (String) -> Boolean {
if (mirrorConfig.topics.isNotEmpty()) {
return fun(topicName: String): Boolean = mirrorConfig.topics.contains(topicName)
}

val allowedTopicPattern = Regex(mirrorConfig.allowedTopicPattern)
return fun(topicName: String): Boolean = allowedTopicPattern.matches(topicName)
}

fun replicateTopicConfigs(): List<List<TopicPartition>> {
val allowed = Regex(mirrorConfig.allowedConfigrationPattern)
val denied = mirrorConfig.deniedConfigrationPattern?.let { Regex(it) }

val sourceTopics = consumerAdminClient.getTopics(topicFilter()).allTopicNames().get()
val targetTopics = producerAdminClient.getTopics(topicFilter()).allTopicNames().get().keys

val topicConfiguration = sourceTopics.filter {
!targetTopics.contains(it.key)
}.map { topicName -> ConfigResource(ConfigResource.Type.TOPIC, topicName.key) }

val describeConfigResult = consumerAdminClient.describeConfigs(topicConfiguration).all().get()

val newTopics = describeConfigResult.map { (key, description) ->
val topicName = key.name()
val partitions = sourceTopics[topicName]!!.partitions()

val config = description.entries()
.filter { allowed.matches(it.name()) && (denied?.matches(it.name())?.not() ?: true) }
.associate { it.name() to it.value() }

NewTopic(
topicName,
partitions.size,
partitions[0].replicas().size.toShort()
).configs(config)
}

producerAdminClient.createTopics(newTopics).all().get()

return sourceTopics.values.map { topic ->
LOGGER.info(
"Subscribe to topic {} with partitions [{}]",
topic.name(),
topic.partitions().map { it.partition() }
)

topic.partitions().map {
TopicPartition(topic.name(), it.partition())
}
}
}

}

@ConfigurationProperties(prefix = "mirror")
data class MirrorConfig(
val topics: List<String> = emptyList(),
val allowedTopicPattern: String = ".*",
val allowedConfigrationPattern: String = ".*",
val deniedConfigrationPattern: String? = "",
)
14 changes: 6 additions & 8 deletions src/main/kotlin/de/denktmit/kafka/config/KafkaConfig.kt
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package de.denktmit.kafka.config

import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.common.serialization.ByteBufferDeserializer
import org.apache.kafka.common.serialization.ByteBufferSerializer
import org.springframework.boot.autoconfigure.kafka.KafkaProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Lazy
import reactor.kafka.receiver.ReceiverOptions
import reactor.kafka.sender.SenderOptions
import java.nio.ByteBuffer
Expand All @@ -14,18 +14,16 @@ import java.nio.ByteBuffer
@Configuration
class KafkaConfig {
@Bean
@Lazy
fun receiverOptions(props: KafkaProperties): ReceiverOptions<ByteBuffer, ByteBuffer> =
ReceiverOptions.create<ByteBuffer, ByteBuffer>(props.buildConsumerProperties())
ReceiverOptions.create<ByteBuffer, ByteBuffer>(props.buildConsumerProperties(null))
.withKeyDeserializer(ByteBufferDeserializer())
.withValueDeserializer(ByteBufferDeserializer())

@Bean
@Lazy
fun senderOptions(props: KafkaProperties): SenderOptions<ByteBuffer, ByteBuffer> =
SenderOptions.create<ByteBuffer, ByteBuffer>(props.buildProducerProperties())
SenderOptions.create<ByteBuffer, ByteBuffer>(props.buildProducerProperties(null))
.withKeySerializer(ByteBufferSerializer())
.withValueSerializer(ByteBufferSerializer())


@Bean
fun adminClient(props: KafkaProperties): AdminClient = AdminClient.create(props.buildAdminProperties())
}
}
13 changes: 13 additions & 0 deletions src/main/kotlin/de/denktmit/kafka/utils/AdminClient.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package de.denktmit.kafka.utils

import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.DescribeTopicsResult

fun AdminClient.getTopics(filter: (String) -> Boolean): DescribeTopicsResult {
val topics = listTopics().listings().get()
.filter { topicListing -> filter(topicListing.name()) }
.map { it.name() }

return describeTopics(topics)
}

2 changes: 1 addition & 1 deletion src/main/kotlin/de/denktmit/kafka/utils/Flux.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@ fun <T> Flux<T>.logEveryNthObservable(
logAfterTerminateFunction(latestRowNum.get())
}
.map { obj: Tuple2<Long?, T> -> obj.t2 }
}
}
28 changes: 17 additions & 11 deletions src/main/resources/application.yaml
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
spring:
kafka:
bootstrap-servers: localhost:9092
properties:
security.protocol: "SASL_SSL"
sasl.mechanism: PLAIN
group.id: "{GROUP_ID}"
sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username='${SASL_USERNAME}' password='${SASL_PASSWORD}';"
logging:
file:
name: kafka-cli.log

producer:
bootstrap-servers: "localhost:9094"

consumer:
group-id: mm
bootstrap-servers: "localhost:9092"
properties:
auto.offset.reset: earliest

application:
backupTopics: []
restoreTopics: {}
backupTopics: [ ]
restoreTopics: { }

mirror:
topics:
allowedTopicPattern: "".*"
allowedConfigrationPattern: ".*"
deniedConfigrationPattern: ""

0 comments on commit beee4f7

Please sign in to comment.