Skip to content

Commit

Permalink
Added mirror-json
Browse files Browse the repository at this point in the history
  • Loading branch information
JSchlarb committed Mar 13, 2024
1 parent 227afd1 commit af97411
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 8 deletions.
14 changes: 14 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@
<docker.publishRegistry.username></docker.publishRegistry.username>
<docker.publishRegistry.password></docker.publishRegistry.password>
</properties>
<repositories>
<repository>
<id>central</id>
</repository>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
Expand All @@ -34,6 +43,11 @@
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib</artifactId>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-protobuf-serializer</artifactId>
<version>7.4.1</version>
</dependency>
<dependency>
<groupId>org.springframework.shell</groupId>
<artifactId>spring-shell-starter</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package de.denktmit.kafka.command

import com.google.protobuf.DynamicMessage
import com.google.protobuf.util.JsonFormat
import de.denktmit.kafka.config.KafkaCliConfiguration
import de.denktmit.kafka.utils.getTopics
import de.denktmit.kafka.utils.logEveryNthObservable
import de.denktmit.kafka.utils.logThroughputEveryDuration
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.ConfigResource
Expand Down Expand Up @@ -38,7 +42,8 @@ class KafkaTopicMirrorCommand(
val senderOptions: SenderOptions<ByteBuffer, ByteBuffer>,
kafkaProperties: KafkaProperties,
val config: KafkaCliConfiguration,
val mirrorConfig: MirrorConfig
val mirrorConfig: MirrorConfig,
val deserializer: KafkaProtobufDeserializer<DynamicMessage>,
) : AbstractShellComponent() {
companion object {
val LOGGER: Logger = LoggerFactory.getLogger(KafkaTopicMirrorCommand::class.java)
Expand All @@ -60,8 +65,36 @@ class KafkaTopicMirrorCommand(
scheduler.dispose()
}

@ShellMethod(key = ["mirror-json"], value = "unidirectional topic mirroring")
fun mirrorJson() {
val printer = JsonFormat.printer()

mirror {
val msg = printer.print(deserializer.deserialize(it.topic(), it.value().array())).toByteArray()

SenderRecord.create(
ProducerRecord(
it.topic(),
it.partition(),
it.timestamp(),
it.key(),
ByteBuffer.wrap(msg),
it.headers()
), null
)
}
}

@ShellMethod(key = ["mirror"], value = "unidirectional topic mirroring")
fun mirror() {
fun mirrorRaw() {
mirror {
SenderRecord.create(
ProducerRecord(it.topic(), it.partition(), it.timestamp(), it.key(), it.value(), it.headers()), null
)
}
}

fun mirror(converter: (ConsumerRecord<ByteBuffer, ByteBuffer>) -> SenderRecord<ByteBuffer, ByteBuffer, Nothing?>?) {
val flux = Flux.fromIterable(replicateTopicConfigs()).flatMap { partitions ->
val opts = receiverOptions.assignment(partitions)
// .addAssignListener { onAssign -> onAssign.forEach { assign -> assign.seekToBeginning(); } }
Expand All @@ -77,12 +110,7 @@ class KafkaTopicMirrorCommand(
LOGGER.error("Unable to consume topic", ex)
Flux.empty()
}
.map {
SenderRecord.create(
ProducerRecord(it.topic(), it.partition(), it.timestamp(), it.key(), it.value(), it.headers()),
null
)
}
.map(converter)
.doFinally { countDownLatch.countDown() }
.logEveryNthObservable(
{ rn, _ -> LOGGER.info("[{}] consumed {} msgs ", topicName, rn) },
Expand Down

0 comments on commit af97411

Please sign in to comment.