diff --git a/src/main/kotlin/com/okp4/processor/cosmos/topology.kt b/src/main/kotlin/com/okp4/processor/cosmos/topology.kt index aa38fa6..7057875 100644 --- a/src/main/kotlin/com/okp4/processor/cosmos/topology.kt +++ b/src/main/kotlin/com/okp4/processor/cosmos/topology.kt @@ -1,6 +1,5 @@ package com.okp4.processor.cosmos -import cosmos.tx.v1beta1.TxOuterClass import org.apache.kafka.common.serialization.Serdes import org.apache.kafka.streams.KeyValue import org.apache.kafka.streams.StreamsBuilder @@ -39,11 +38,15 @@ fun topology(props: Properties): Topology { ).peek( { _, block -> logger.debug("→ block ${block.header.height} (${block.data.txsCount} txs)") }, Named.`as`("log") - ).flatMapValues({ block -> - block.data.txsList.map { tx -> - TxOuterClass.Tx.parseFrom(tx).toByteArray() - } - }, Named.`as`("extract-transactions")).to( + ).flatMapValues( + { block -> + block.data.txsList + }, Named.`as`("extract-transactions") + ).mapValues( + { tx -> + tx.toByteArray() + }, Named.`as`("convert-transactions-to-bytearray") + ).to( topicOut, Produced.with(Serdes.String(), Serdes.ByteArray()).withName("output") ) }.build()