Skip to content

Commit

Permalink
fix: don't parse transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
ccamel committed Apr 12, 2022
1 parent e68f808 commit f2a2039
Showing 1 changed file with 9 additions and 6 deletions.
15 changes: 9 additions & 6 deletions src/main/kotlin/com/okp4/processor/cosmos/topology.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.okp4.processor.cosmos

import cosmos.tx.v1beta1.TxOuterClass
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder
Expand Down Expand Up @@ -39,11 +38,15 @@ fun topology(props: Properties): Topology {
).peek(
{ _, block -> logger.debug("→ block ${block.header.height} (${block.data.txsCount} txs)") },
Named.`as`("log")
).flatMapValues({ block ->
block.data.txsList.map { tx ->
TxOuterClass.Tx.parseFrom(tx).toByteArray()
}
}, Named.`as`("extract-transactions")).to(
).flatMapValues(
{ block ->
block.data.txsList
}, Named.`as`("extract-transactions")
).mapValues(
{ tx ->
tx.toByteArray()
}, Named.`as`("convert-transactions-to-bytearray")
).to(
topicOut, Produced.with(Serdes.String(), Serdes.ByteArray()).withName("output")
)
}.build()
Expand Down

0 comments on commit f2a2039

Please sign in to comment.