Skip to content

Commit

Permalink
add kafka plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
Ergels Gaxhaj committed Feb 4, 2022
1 parent d39ffdd commit c2fe76f
Show file tree
Hide file tree
Showing 10 changed files with 1,634 additions and 0 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/cockroachdb/apd/v2 v2.0.2
github.com/coinbase/rosetta-sdk-go v0.7.2
github.com/confio/ics23/go v0.6.6
github.com/confluentinc/confluent-kafka-go v1.8.2
github.com/cosmos/btcutil v1.0.4
github.com/cosmos/cosmos-proto v1.0.0-alpha6
github.com/cosmos/cosmos-sdk/db v1.0.0-beta.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,8 @@ github.com/coinbase/rosetta-sdk-go v0.7.2 h1:uCNrASIyt7rV9bA3gzPG3JDlxVP5v/zLgi0
github.com/coinbase/rosetta-sdk-go v0.7.2/go.mod h1:wk9dvjZFSZiWSNkFuj3dMleTA1adLFotg5y71PhqKB4=
github.com/confio/ics23/go v0.6.6 h1:pkOy18YxxJ/r0XFDCnrl4Bjv6h4LkBSpLS6F38mrKL8=
github.com/confio/ics23/go v0.6.6/go.mod h1:E45NqnlpxGnpfTWL/xauN7MRwEE28T4Dd4uraToOaKg=
github.com/confluentinc/confluent-kafka-go v1.8.2 h1:PBdbvYpyOdFLehj8j+9ba7FL4c4Moxn79gy9cYKxG5E=
github.com/confluentinc/confluent-kafka-go v1.8.2/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg=
github.com/consensys/bavard v0.1.8-0.20210406032232-f3452dc9b572/go.mod h1:Bpd0/3mZuaj6Sj+PqrmIquiOKy397AKGThQPaGzNXAQ=
github.com/consensys/gnark-crypto v0.4.1-0.20210426202927-39ac3d4b3f1f/go.mod h1:815PAHg3wvysy0SyIqanF8gZ0Y1wjk/hrDHD/iT88+Q=
github.com/containerd/console v1.0.2/go.mod h1:ytZPjGgY2oeTkAONYafi2kSj0aYggsf8acV1PGKCbzQ=
Expand Down
44 changes: 44 additions & 0 deletions plugin/example_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,47 @@ print_data_to_stdout = false
# whether to operate in fire-and-forget or success/failure acknowledgement mode
# false == fire-and-forget; true == sends a message receipt success/fail signal
ack = "false"

###############################################################################
### Kafka Plugin configuration ###
###############################################################################

# The specific parameters for the Kafka streaming service plugin
[plugins.streaming.kafka]

# List of store keys we want to expose for this streaming service.
keys = []

# Optional topic prefix for the topic(s) where data will be stored
topic_prefix = "block"

# Flush and wait for outstanding messages and requests to complete delivery. (milliseconds)
flush_timeout_ms = 1500

# whether to operate in fire-and-forget or success/failure acknowledgement mode
# false == fire-and-forget; true == sends a message receipt success/fail signal
ack = "false"

# Producer configuration properties.
# The plugin uses confluent-kafka-go which is a lightweight wrapper around librdkafka.
# For a full list of producer configuration properties
# see https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
[plugins.streaming.kafka.producer]

# Initial list of brokers as a comma seperated list of broker host or host:port[, host:port[,...]]
bootstrap_servers = "localhost:9092"

# Client identifier
client_id = "my-app-id"

# This field indicates the number of acknowledgements the leader
# broker must receive from ISR brokers before responding to the request
acks = "all"

# When set to true, the producer will ensure that messages
# are successfully produced exactly once and in the original produce order.
# The following configuration properties are adjusted automatically (if not modified by the user)
# when idempotence is enabled: max.in.flight.requests.per.connection=5 (must be less than or equal to 5),
# retries=INT32_MAX (must be greater than 0), acks=all, queuing.strategy=fifo.
# Producer instantation will fail if user-supplied configuration is incompatible.
enable_idempotence = true
2 changes: 2 additions & 0 deletions plugin/loader/preload.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package loader

import (
pluginfile "github.com/cosmos/cosmos-sdk/plugin/plugins/file"
pluginkafka "github.com/cosmos/cosmos-sdk/plugin/plugins/kafka"
plugintrace "github.com/cosmos/cosmos-sdk/plugin/plugins/trace"
)

Expand All @@ -11,5 +12,6 @@ import (

func init() {
Preload(pluginfile.Plugins...)
Preload(pluginkafka.Plugins...)
Preload(plugintrace.Plugins...)
}
1 change: 1 addition & 0 deletions plugin/loader/preload_list
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@
# name go-path number of the sub-plugin or *

file github.com/cosmos/cosmos-sdk/plugin/plugins/file *
kafka github.com/cosmos/cosmos-sdk/plugin/plugins/kafka *
trace github.com/cosmos/cosmos-sdk/plugin/plugins/trace *
Loading

0 comments on commit c2fe76f

Please sign in to comment.