Skip to content

Commit

Permalink
Added mirror-json
Browse files Browse the repository at this point in the history
  • Loading branch information
JSchlarb committed Mar 14, 2024
1 parent 227afd1 commit e328987
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 11 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/workflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ jobs:
- name: build container images
run: >-
./mvnw spring-boot:build-image -q -B -DskipTests
-Dsha="-${GITHUB_SHA::6}"
-Dsha1="-${GITHUB_SHA::6}"
-Dspring-boot.build-image.publish=true
-Ddocker.publishRegistry.username=${{ github.actor }}
-Ddocker.publishRegistry.password=${{ secrets.GITHUB_TOKEN }}
Expand Down
15 changes: 15 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,16 @@
<docker.publishRegistry.username></docker.publishRegistry.username>
<docker.publishRegistry.password></docker.publishRegistry.password>
</properties>
<repositories>
<repository>
<id>central</id>
<url>https://repo1.maven.org/maven2/</url>
</repository>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
Expand All @@ -34,6 +44,11 @@
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib</artifactId>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-protobuf-serializer</artifactId>
<version>7.4.1</version>
</dependency>
<dependency>
<groupId>org.springframework.shell</groupId>
<artifactId>spring-shell-starter</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
package de.denktmit.kafka.command

import com.google.protobuf.DynamicMessage
import com.google.protobuf.util.JsonFormat
import de.denktmit.kafka.config.KafkaCliConfiguration
import de.denktmit.kafka.utils.getTopics
import de.denktmit.kafka.utils.logEveryNthObservable
import de.denktmit.kafka.utils.logThroughputEveryDuration
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientFactory
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.ConfigResource
Expand Down Expand Up @@ -36,15 +43,14 @@ import java.util.concurrent.CountDownLatch
class KafkaTopicMirrorCommand(
var receiverOptions: ReceiverOptions<ByteBuffer, ByteBuffer>,
val senderOptions: SenderOptions<ByteBuffer, ByteBuffer>,
kafkaProperties: KafkaProperties,
val kafkaProperties: KafkaProperties,
val config: KafkaCliConfiguration,
val mirrorConfig: MirrorConfig
val mirrorConfig: MirrorConfig,
) : AbstractShellComponent() {
companion object {
val LOGGER: Logger = LoggerFactory.getLogger(KafkaTopicMirrorCommand::class.java)
}


val consumerAdminClient: AdminClient = AdminClient.create(kafkaProperties.buildConsumerProperties(null))
val producerAdminClient: AdminClient = AdminClient.create(kafkaProperties.buildProducerProperties(null))

Expand All @@ -60,8 +66,58 @@ class KafkaTopicMirrorCommand(
scheduler.dispose()
}

@ShellMethod(key = ["mirror-json"], value = "unidirectional topic mirroring")
fun mirrorJson() {
val printer = JsonFormat.printer()

val deserializer = KafkaProtobufDeserializer<DynamicMessage>(schemaRegistry())

mirror {
val rawMsg = it.value()
rawMsg.position(0)
val msgArray = ByteArray(rawMsg.remaining())
rawMsg.get(msgArray)

val msg = printer.print(deserializer.deserialize(it.topic(), msgArray)).toByteArray()

SenderRecord.create(
ProducerRecord(
it.topic(),
it.partition(),
it.timestamp(),
it.key(),
ByteBuffer.wrap(msg),
it.headers()
), null
)
}
}

private fun schemaRegistry(): SchemaRegistryClient? {
val configs = kafkaProperties.buildAdminProperties(null)

val urlString = configs["schema.registry.url"] as String
val urls = urlString.split(",".toRegex()).dropLastWhile { it.isEmpty() }

return SchemaRegistryClientFactory.newClient(
urls,
100_000,
listOf(ProtobufSchemaProvider()),
configs,
emptyMap()
)
}

@ShellMethod(key = ["mirror"], value = "unidirectional topic mirroring")
fun mirror() {
fun mirrorRaw() {
mirror {
SenderRecord.create(
ProducerRecord(it.topic(), it.partition(), it.timestamp(), it.key(), it.value(), it.headers()), null
)
}
}

fun mirror(converter: (ConsumerRecord<ByteBuffer, ByteBuffer>) -> SenderRecord<ByteBuffer, ByteBuffer, Nothing?>?) {
val flux = Flux.fromIterable(replicateTopicConfigs()).flatMap { partitions ->
val opts = receiverOptions.assignment(partitions)
// .addAssignListener { onAssign -> onAssign.forEach { assign -> assign.seekToBeginning(); } }
Expand All @@ -77,12 +133,7 @@ class KafkaTopicMirrorCommand(
LOGGER.error("Unable to consume topic", ex)
Flux.empty()
}
.map {
SenderRecord.create(
ProducerRecord(it.topic(), it.partition(), it.timestamp(), it.key(), it.value(), it.headers()),
null
)
}
.map(converter)
.doFinally { countDownLatch.countDown() }
.logEveryNthObservable(
{ rn, _ -> LOGGER.info("[{}] consumed {} msgs ", topicName, rn) },
Expand Down

0 comments on commit e328987

Please sign in to comment.