-
Notifications
You must be signed in to change notification settings - Fork 379
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add kafka and nats executer * Update nats and kafka interfaces to dkron v3 * Minor improvements to naming, log messages and documentation Co-authored-by: mmonier <mmonier@iotblue.net> Co-authored-by: Victor Castell <victor@victorcastell.com>
- Loading branch information
1 parent
453a94d
commit d17593b
Showing
13 changed files
with
356 additions
and
11 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
package main | ||
|
||
import ( | ||
"errors" | ||
"log" | ||
|
||
"github.com/Shopify/sarama" | ||
"github.com/armon/circbuf" | ||
|
||
dkplugin "github.com/distribworks/dkron/v3/plugin" | ||
dktypes "github.com/distribworks/dkron/v3/plugin/types" | ||
) | ||
|
||
const ( | ||
// maxBufSize limits how much data we collect from a handler. | ||
// This is to prevent Serf's memory from growing to an enormous | ||
// amount due to a faulty handler. | ||
maxBufSize = 500000 | ||
) | ||
|
||
// Kafka process kafka request | ||
type Kafka struct { | ||
} | ||
|
||
// Execute Process method of the plugin | ||
// "executor": "kafka", | ||
// "executor_config": { | ||
// "brokerAddress": "192.168.59.103:9092", // kafka broker url | ||
// "message": "", | ||
// "topic": "publishTopic" | ||
// } | ||
func (s *Kafka) Execute(args *dktypes.ExecuteRequest, cb dkplugin.StatusHelper) (*dktypes.ExecuteResponse, error) { | ||
|
||
out, err := s.ExecuteImpl(args) | ||
resp := &dktypes.ExecuteResponse{Output: out} | ||
if err != nil { | ||
resp.Error = err.Error() | ||
} | ||
return resp, nil | ||
} | ||
|
||
// ExecuteImpl produce message on Kafka broker | ||
func (s *Kafka) ExecuteImpl(args *dktypes.ExecuteRequest) ([]byte, error) { | ||
|
||
output, _ := circbuf.NewBuffer(maxBufSize) | ||
|
||
var debug bool | ||
if args.Config["debug"] != "" { | ||
debug = true | ||
log.Printf("config %#v\n\n", args.Config) | ||
} | ||
|
||
if args.Config["brokerAddress"] == "" { | ||
|
||
return output.Bytes(), errors.New("brokerAddress is empty") | ||
} | ||
|
||
if args.Config["topic"] == "" { | ||
return output.Bytes(), errors.New("topic is empty") | ||
} | ||
config := sarama.NewConfig() | ||
config.Producer.RequiredAcks = sarama.WaitForAll | ||
config.Producer.Retry.Max = 5 | ||
config.Producer.Return.Successes = true | ||
config.Producer.Return.Errors = true | ||
|
||
brokers := []string{args.Config["brokerAddress"]} | ||
producer, err := sarama.NewSyncProducer(brokers, config) | ||
if err != nil { | ||
// Should not reach here | ||
|
||
if debug { | ||
log.Printf("sarama %#v\n\n", config) | ||
} | ||
return output.Bytes(), err | ||
} | ||
defer producer.Close() | ||
|
||
msg := &sarama.ProducerMessage{ | ||
Topic: args.Config["topic"], | ||
Value: sarama.StringEncoder(args.Config["message"]), | ||
} | ||
|
||
_, _, err = producer.SendMessage(msg) | ||
|
||
if err != nil { | ||
return output.Bytes(), err | ||
} | ||
|
||
output.Write([]byte("Result: successfully produced the message on Kafka broker\n")) | ||
return output.Bytes(), nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
package main | ||
|
||
import ( | ||
"fmt" | ||
"testing" | ||
|
||
dktypes "github.com/distribworks/dkron/v3/plugin/types" | ||
) | ||
|
||
func TestProduceExecute(t *testing.T) { | ||
pa := &dktypes.ExecuteRequest{ | ||
JobName: "testJob", | ||
Config: map[string]string{ | ||
"topic": "test", | ||
"brokerAddress": "testaddress", | ||
"message": "{\"hello\":11}", | ||
"debug": "true", | ||
}, | ||
} | ||
kafka := &Kafka{} | ||
output, err := kafka.Execute(pa, nil) | ||
fmt.Println(string(output.Output)) | ||
fmt.Println(err) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
package main | ||
|
||
import ( | ||
dkplugin "github.com/distribworks/dkron/v3/plugin" | ||
"github.com/hashicorp/go-plugin" | ||
) | ||
|
||
func main() { | ||
plugin.Serve(&plugin.ServeConfig{ | ||
HandshakeConfig: dkplugin.Handshake, | ||
Plugins: map[string]plugin.Plugin{ | ||
"executor": &dkplugin.ExecutorPlugin{Executor: &Kafka{}}, | ||
}, | ||
|
||
// A non-nil value here enables gRPC serving for this plugin... | ||
GRPCServer: plugin.DefaultGRPCServer, | ||
}) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
package main | ||
|
||
import ( | ||
dkplugin "github.com/distribworks/dkron/v3/plugin" | ||
"github.com/hashicorp/go-plugin" | ||
) | ||
|
||
func main() { | ||
plugin.Serve(&plugin.ServeConfig{ | ||
HandshakeConfig: dkplugin.Handshake, | ||
Plugins: map[string]plugin.Plugin{ | ||
"executor": &dkplugin.ExecutorPlugin{Executor: &Nats{}}, | ||
}, | ||
|
||
// A non-nil value here enables gRPC serving for this plugin... | ||
GRPCServer: plugin.DefaultGRPCServer, | ||
}) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
package main | ||
|
||
import ( | ||
"errors" | ||
"log" | ||
|
||
"github.com/armon/circbuf" | ||
"github.com/nats-io/nats.go" | ||
|
||
dkplugin "github.com/distribworks/dkron/v3/plugin" | ||
dktypes "github.com/distribworks/dkron/v3/plugin/types" | ||
) | ||
|
||
const ( | ||
// maxBufSize limits how much data we collect from a handler. | ||
// This is to prevent Serf's memory from growing to an enormous | ||
// amount due to a faulty handler. | ||
maxBufSize = 500000 | ||
) | ||
|
||
// Nats process http request | ||
type Nats struct { | ||
} | ||
|
||
// Execute Process method of the plugin | ||
// "executor": "nats", | ||
// "executor_config": { | ||
// "url": "tls://nats.demo.io:4443", // nats server url | ||
// "message": "", | ||
// "subject": "Subject", | ||
// "userName":"test@hbh.dfg", | ||
// "password":"dfdffs" | ||
// } | ||
func (s *Nats) Execute(args *dktypes.ExecuteRequest, cb dkplugin.StatusHelper) (*dktypes.ExecuteResponse, error) { | ||
|
||
out, err := s.ExecuteImpl(args) | ||
resp := &dktypes.ExecuteResponse{Output: out} | ||
if err != nil { | ||
resp.Error = err.Error() | ||
} | ||
return resp, nil | ||
} | ||
|
||
// ExecuteImpl do http request | ||
func (s *Nats) ExecuteImpl(args *dktypes.ExecuteRequest) ([]byte, error) { | ||
|
||
output, _ := circbuf.NewBuffer(maxBufSize) | ||
|
||
var debug bool | ||
if args.Config["debug"] != "" { | ||
debug = true | ||
log.Printf("config %#v\n\n", args.Config) | ||
} | ||
|
||
if args.Config["url"] == "" { | ||
|
||
return output.Bytes(), errors.New("url is empty") | ||
} | ||
|
||
if args.Config["subject"] == "" { | ||
return output.Bytes(), errors.New("subject is empty") | ||
} | ||
nc, err := nats.Connect(args.Config["url"], nats.UserInfo(args.Config["userName"], args.Config["password"])) | ||
|
||
if err != nil { | ||
return output.Bytes(), errors.New("error connecting to NATS") | ||
} | ||
|
||
nc.Publish(args.Config["subject"], []byte(args.Config["message"])) | ||
|
||
output.Write([]byte("Result: Message successfully sent\n")) | ||
|
||
if debug { | ||
log.Printf("request %#v\n\n", nc) | ||
} | ||
|
||
return output.Bytes(), nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
package main | ||
|
||
import ( | ||
"fmt" | ||
"testing" | ||
|
||
dktypes "github.com/distribworks/dkron/v3/plugin/types" | ||
) | ||
|
||
func TestPublishExecute(t *testing.T) { | ||
pa := &dktypes.ExecuteRequest{ | ||
JobName: "testJob", | ||
Config: map[string]string{ | ||
"subject": "opcuaReadRequest", | ||
"url": "localhost:4222", | ||
"message": "{\"hello\":11}", | ||
"debug": "true", | ||
}, | ||
} | ||
nats := &Nats{} | ||
output, err := nats.Execute(pa, nil) | ||
fmt.Println(string(output.Output)) | ||
fmt.Println(err) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.