Skip to content

Commit

Permalink
creating schema registry instance
Browse files Browse the repository at this point in the history
  • Loading branch information
JSchlarb committed Mar 13, 2024
1 parent cc338b5 commit 30330b4
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 4 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/workflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ jobs:
- name: build container images
run: >-
./mvnw spring-boot:build-image -q -B -DskipTests
-Dsha="-${GITHUB_SHA::6}"
-Dsha1="-${GITHUB_SHA::6}"
-Dspring-boot.build-image.publish=true
-Ddocker.publishRegistry.username=${{ github.actor }}
-Ddocker.publishRegistry.password=${{ secrets.GITHUB_TOKEN }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import de.denktmit.kafka.config.KafkaCliConfiguration
import de.denktmit.kafka.utils.getTopics
import de.denktmit.kafka.utils.logEveryNthObservable
import de.denktmit.kafka.utils.logThroughputEveryDuration
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientFactory
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.NewTopic
Expand Down Expand Up @@ -40,16 +43,14 @@ import java.util.concurrent.CountDownLatch
class KafkaTopicMirrorCommand(
var receiverOptions: ReceiverOptions<ByteBuffer, ByteBuffer>,
val senderOptions: SenderOptions<ByteBuffer, ByteBuffer>,
kafkaProperties: KafkaProperties,
val kafkaProperties: KafkaProperties,
val config: KafkaCliConfiguration,
val mirrorConfig: MirrorConfig,
val deserializer: KafkaProtobufDeserializer<DynamicMessage>,
) : AbstractShellComponent() {
companion object {
val LOGGER: Logger = LoggerFactory.getLogger(KafkaTopicMirrorCommand::class.java)
}


val consumerAdminClient: AdminClient = AdminClient.create(kafkaProperties.buildConsumerProperties(null))
val producerAdminClient: AdminClient = AdminClient.create(kafkaProperties.buildProducerProperties(null))

Expand All @@ -69,6 +70,8 @@ class KafkaTopicMirrorCommand(
fun mirrorJson() {
val printer = JsonFormat.printer()

val deserializer = KafkaProtobufDeserializer<DynamicMessage>(schemaRegistry())

mirror {
val msg = printer.print(deserializer.deserialize(it.topic(), it.value().array())).toByteArray()

Expand All @@ -85,6 +88,21 @@ class KafkaTopicMirrorCommand(
}
}

private fun schemaRegistry(): SchemaRegistryClient? {
val configs = kafkaProperties.buildAdminProperties(null)

val urlString = configs["schema.registry.url"] as String
val urls = urlString.split(",".toRegex()).dropLastWhile { it.isEmpty() }

return SchemaRegistryClientFactory.newClient(
urls,
100_000,
listOf(ProtobufSchemaProvider()),
configs,
emptyMap()
)
}

@ShellMethod(key = ["mirror"], value = "unidirectional topic mirroring")
fun mirrorRaw() {
mirror {
Expand Down

0 comments on commit 30330b4

Please sign in to comment.