Skip to content

Commit

Permalink
Allow passing through a message key for Kafka executor (#1021)
Browse files Browse the repository at this point in the history
* Allow passing through a message key for Kafka executor
* Add Kafka message key to website usage page in executors documentation
* Add unit test for with/without Kafka executor message key
  • Loading branch information
ConductorPete committed Nov 3, 2021
1 parent 6426961 commit 5703402
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 3 deletions.
2 changes: 2 additions & 0 deletions builtin/bins/dkron-executor-kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type Kafka struct {
// "executor": "kafka",
// "executor_config": {
// "brokerAddress": "192.168.59.103:9092", // kafka broker url
// "key": "",
// "message": "",
// "topic": "publishTopic"
// }
Expand Down Expand Up @@ -78,6 +79,7 @@ func (s *Kafka) ExecuteImpl(args *dktypes.ExecuteRequest) ([]byte, error) {

msg := &sarama.ProducerMessage{
Topic: args.Config["topic"],
Key: sarama.StringEncoder(args.Config["key"]),
Value: sarama.StringEncoder(args.Config["message"]),
}

Expand Down
24 changes: 22 additions & 2 deletions builtin/bins/dkron-executor-kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ import (
dktypes "github.com/distribworks/dkron/v3/plugin/types"
)

func TestProduceExecute(t *testing.T) {
func TestProduceExecuteWithKey(t *testing.T) {
pa := &dktypes.ExecuteRequest{
JobName: "testJob",
JobName: "testJobWithKey",
Config: map[string]string{
"topic": "test",
"brokerAddress": "testaddress",
"key": "testkey",
"message": "{\"hello\":11}",
"debug": "true",
},
Expand All @@ -25,3 +26,22 @@ func TestProduceExecute(t *testing.T) {
t.Fatal(err)
}
}

func TestProduceExecuteWithoutKey(t *testing.T) {
pa := &dktypes.ExecuteRequest{
JobName: "testJobWithoutKey",
Config: map[string]string{
"topic": "test",
"brokerAddress": "testaddress",
"message": "{\"hello\":11}",
"debug": "true",
},
}
kafka := &Kafka{}
output, err := kafka.Execute(pa, nil)
fmt.Println(string(output.Output))
fmt.Println(err)
if err != nil {
t.Fatal(err)
}
}
4 changes: 3 additions & 1 deletion website/content/usage/executors/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ Params

```
brokerAddress: "IP:port" of the broker
message: The message to produce
key: The key of the message to produce
message: The body of the message to produce
topic: The Kafka topic for this message
debug: Turns on debugging output if not empty
```
Expand All @@ -22,6 +23,7 @@ Example
"executor": "kafka",
"executor_config": {
"brokerAddress": "localhost:9092",
"key": "My key",
"message": "My message",
"topic": "my_topic"
}
Expand Down

0 comments on commit 5703402

Please sign in to comment.