diff --git a/build.gradle.kts b/build.gradle.kts index e71787a..5080366 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -60,6 +60,12 @@ dependencies { api("io.micrometer:micrometer-core:$micrometerVersion") api("io.micrometer:micrometer-registry-prometheus:$micrometerVersion") + val cosmosSdkVersion = "1.0-SNAPSHOT" + implementation("com.okp4.grpc:cosmos-sdk:$cosmosSdkVersion") + + val grpcVersion = "1.45.1" + api("io.grpc:grpc-protobuf:$grpcVersion") + testImplementation(kotlin("test")) val kotestVersion = "5.2.1" diff --git a/src/main/kotlin/com/okp4/processor/cosmos/topology.kt b/src/main/kotlin/com/okp4/processor/cosmos/topology.kt index 8a5dbb0..c70cd44 100644 --- a/src/main/kotlin/com/okp4/processor/cosmos/topology.kt +++ b/src/main/kotlin/com/okp4/processor/cosmos/topology.kt @@ -1,17 +1,18 @@ package com.okp4.processor.cosmos +import cosmos.tx.v1beta1.TxOuterClass import org.apache.kafka.common.serialization.Serdes -import org.apache.kafka.streams.KeyValue import org.apache.kafka.streams.StreamsBuilder import org.apache.kafka.streams.Topology import org.apache.kafka.streams.kstream.Consumed import org.apache.kafka.streams.kstream.Named import org.apache.kafka.streams.kstream.Produced import org.slf4j.LoggerFactory +import tendermint.types.BlockOuterClass import java.util.* /** - * Simple Kafka Stream Processor that consumes a message on a topic and returns a new message on another. + * Simple Kafka Stream Processor that consumes a block on a topic and returns his transactions on another. */ fun topology(props: Properties): Topology { val logger = LoggerFactory.getLogger("com.okp4.processor.cosmos.topology") @@ -24,9 +25,10 @@ fun topology(props: Properties): Topology { return StreamsBuilder() .apply { - stream(topicIn, Consumed.with(Serdes.String(), Serdes.String()).withName("input")) - .peek({ _, _ -> logger.info("Received a message") }, Named.`as`("log")) - .map({ k, v -> KeyValue(k, "Hello $v!") }, Named.`as`("map-value")) - .to(topicOut, Produced.with(Serdes.String(), Serdes.String()).withName("output")) + stream(topicIn, Consumed.with(Serdes.String(), Serdes.ByteArray()).withName("input")) + .mapValues({ v -> BlockOuterClass.Block.parseFrom(v) }, Named.`as`("block-deserialization")) + .peek({ _, block -> logger.debug("→ block ${block.header.height} (${block.data.txsCount} txs)") }, Named.`as`("log")) + .flatMapValues({ block -> block.data.txsList.map { tx -> TxOuterClass.Tx.parseFrom(tx).toByteArray() } }, Named.`as`("extract-transactions")) + .to(topicOut, Produced.with(Serdes.String(), Serdes.ByteArray()).withName("output")) }.build() }