Skip to content

Commit

Permalink
feat: add topology logic
Browse files Browse the repository at this point in the history
splitting blocks into transactions
  • Loading branch information
ingvaar committed Apr 11, 2022
1 parent ca8f26e commit 51a5d14
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 6 deletions.
6 changes: 6 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ dependencies {
api("io.micrometer:micrometer-core:$micrometerVersion")
api("io.micrometer:micrometer-registry-prometheus:$micrometerVersion")

val cosmosSdkVersion = "1.0-SNAPSHOT"
implementation("com.okp4.grpc:cosmos-sdk:$cosmosSdkVersion")

val grpcVersion = "1.45.1"
api("io.grpc:grpc-protobuf:$grpcVersion")

testImplementation(kotlin("test"))

val kotestVersion = "5.2.1"
Expand Down
14 changes: 8 additions & 6 deletions src/main/kotlin/com/okp4/processor/cosmos/topology.kt
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
package com.okp4.processor.cosmos

import cosmos.tx.v1beta1.TxOuterClass
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.kstream.Consumed
import org.apache.kafka.streams.kstream.Named
import org.apache.kafka.streams.kstream.Produced
import org.slf4j.LoggerFactory
import tendermint.types.BlockOuterClass
import java.util.*

/**
* Simple Kafka Stream Processor that consumes a message on a topic and returns a new message on another.
* Simple Kafka Stream Processor that consumes a block on a topic and returns his transactions on another.
*/
fun topology(props: Properties): Topology {
val logger = LoggerFactory.getLogger("com.okp4.processor.cosmos.topology")
Expand All @@ -24,9 +25,10 @@ fun topology(props: Properties): Topology {

return StreamsBuilder()
.apply {
stream(topicIn, Consumed.with(Serdes.String(), Serdes.String()).withName("input"))
.peek({ _, _ -> logger.info("Received a message") }, Named.`as`("log"))
.map({ k, v -> KeyValue(k, "Hello $v!") }, Named.`as`("map-value"))
.to(topicOut, Produced.with(Serdes.String(), Serdes.String()).withName("output"))
stream(topicIn, Consumed.with(Serdes.String(), Serdes.ByteArray()).withName("input"))
.mapValues({ v -> BlockOuterClass.Block.parseFrom(v) }, Named.`as`("block-deserialization"))
.peek({ _, block -> logger.debug("→ block ${block.header.height} (${block.data.txsCount} txs)") }, Named.`as`("log"))
.flatMapValues({ block -> block.data.txsList.map { tx -> TxOuterClass.Tx.parseFrom(tx).toByteArray() } }, Named.`as`("extract-transactions"))
.to(topicOut, Produced.with(Serdes.String(), Serdes.ByteArray()).withName("output"))
}.build()
}

0 comments on commit 51a5d14

Please sign in to comment.