Skip to content

Commit

Permalink
feat: Multiple brokers (#125)
Browse files Browse the repository at this point in the history
* multiple brokers

* disable cgo for build target

* linting
  • Loading branch information
soerenschneider authored Jun 2, 2022
1 parent 27a834c commit 1cce676
Show file tree
Hide file tree
Showing 19 changed files with 63 additions and 54 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ clean:
rm -rf ./$(BUILD_DIR)

build: version-info
go build -ldflags="-X '$(MODULE)/internal.BuildVersion=${VERSION}' -X '$(MODULE)/internal.CommitHash=${COMMIT_HASH}'" -o $(BINARY_NAME_SERVER) cmd/server/server.go
go build -ldflags="-X '$(MODULE)/internal.BuildVersion=${VERSION}' -X '$(MODULE)/internal.CommitHash=${COMMIT_HASH}'" -o $(BINARY_NAME_CLIENT) cmd/client/client.go
CGO_ENABLED=0 go build -ldflags="-X '$(MODULE)/internal.BuildVersion=${VERSION}' -X '$(MODULE)/internal.CommitHash=${COMMIT_HASH}'" -o $(BINARY_NAME_SERVER) cmd/server/server.go
CGO_ENABLED=0 go build -ldflags="-X '$(MODULE)/internal.BuildVersion=${VERSION}' -X '$(MODULE)/internal.CommitHash=${COMMIT_HASH}'" -o $(BINARY_NAME_CLIENT) cmd/client/client.go

release: clean version-info cross-build-client cross-build-server
sha256sum $(BUILD_DIR)/dyndns-* > $(CHECKSUM_FILE)
Expand Down
26 changes: 14 additions & 12 deletions client/client.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package client

import (
"errors"
"fmt"
"github.com/rs/zerolog/log"
"github.com/soerenschneider/dyndns/client/resolvers"
"github.com/soerenschneider/dyndns/internal/common"
"github.com/soerenschneider/dyndns/internal/events"
"github.com/soerenschneider/dyndns/internal/metrics"
"github.com/soerenschneider/dyndns/internal/verification"
"errors"
"fmt"
"github.com/rs/zerolog/log"
"time"
)

Expand All @@ -28,25 +28,25 @@ type State interface {
type Client struct {
signature verification.SignatureKeypair
resolver resolvers.IpResolver
dispatcher events.EventDispatch
dispatchers []events.EventDispatch
state State
lastStateChange time.Time
}

func NewClient(resolver resolvers.IpResolver, signature verification.SignatureKeypair, dispatcher events.EventDispatch) (*Client, error) {
func NewClient(resolver resolvers.IpResolver, signature verification.SignatureKeypair, dispatchers []events.EventDispatch) (*Client, error) {
if resolver == nil {
return nil, errors.New("no resolver provided")
}
if signature == nil {
return nil, errors.New("no signature provider given")
}
if dispatcher == nil {
return nil, errors.New("no dispatcher provided")
if dispatchers == nil || len(dispatchers) == 0 {
return nil, errors.New("no dispatchers provided")
}

c := Client{
resolver: resolver,
dispatcher: dispatcher,
dispatchers: dispatchers,
signature: signature,
state: &initialState{},
lastStateChange: time.Now(),
Expand Down Expand Up @@ -106,10 +106,12 @@ func (client *Client) Resolve(prev *common.ResolvedIp) (*common.ResolvedIp, erro
Signature: signature,
}

err := client.dispatcher.Notify(env)
if err != nil {
metrics.UpdateDispatchErrors.WithLabelValues(client.resolver.Host()).Inc()
return resolvedIp, fmt.Errorf("could not dispatch ip update notification: %v", err)
for _, dispatcher := range client.dispatchers {
err := dispatcher.Notify(env)
if err != nil {
metrics.UpdateDispatchErrors.WithLabelValues(client.resolver.Host()).Inc()
return resolvedIp, fmt.Errorf("could not dispatch ip update notification: %v", err)
}
}
metrics.UpdatesDispatched.Inc()
}
Expand Down
2 changes: 1 addition & 1 deletion client/resolvers/http_resolver.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package resolvers

import (
"github.com/soerenschneider/dyndns/internal/common"
"fmt"
"github.com/hashicorp/go-retryablehttp"
"github.com/rs/zerolog/log"
"github.com/soerenschneider/dyndns/internal/common"
"io/ioutil"
"math/rand"
"net"
Expand Down
2 changes: 1 addition & 1 deletion client/resolvers/interface_resolver.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package resolvers

import (
"github.com/soerenschneider/dyndns/internal/common"
"errors"
"fmt"
"github.com/soerenschneider/dyndns/internal/common"
"net"
"time"
)
Expand Down
4 changes: 2 additions & 2 deletions client/states.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package client

import (
"github.com/soerenschneider/dyndns/internal/common"
"github.com/soerenschneider/dyndns/internal/util"
"fmt"
"github.com/rs/zerolog/log"
"github.com/soerenschneider/dyndns/internal/common"
"github.com/soerenschneider/dyndns/internal/util"
"math/rand"
"time"
)
Expand Down
13 changes: 8 additions & 5 deletions cmd/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,16 @@ func RunClient(conf *conf.ClientConf) {
resolver, _ = resolvers.NewHttpResolver(conf.Host)
}

var dispatcher events.EventDispatch
dispatcher, err = mqtt.NewMqttDispatch(conf.Broker, conf.Host, fmt.Sprintf("dyndns/%s", conf.Host))
if err != nil {
log.Fatal().Msgf("Could not build mqtt dispatcher: %v", err)
var dispatchers []events.EventDispatch
for _, broker := range conf.Brokers {
dispatcher, err := mqtt.NewMqttDispatch(broker, conf.Host, fmt.Sprintf("dyndns/%s", conf.Host))
if err != nil {
log.Fatal().Msgf("Could not build mqtt dispatcher: %v", err)
}
dispatchers = append(dispatchers, dispatcher)
}

client, err := client.NewClient(resolver, keypair, dispatcher)
client, err := client.NewClient(resolver, keypair, dispatchers)
if err != nil {
log.Fatal().Msgf("could not build client: %v", err)
}
Expand Down
14 changes: 7 additions & 7 deletions cmd/server/server.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
package main

import (
"encoding/json"
"flag"
"fmt"
"github.com/aws/aws-sdk-go/aws/credentials"
paho "github.com/eclipse/paho.mqtt.golang"
"github.com/rs/zerolog/log"
"github.com/soerenschneider/dyndns/conf"
"github.com/soerenschneider/dyndns/internal"
"github.com/soerenschneider/dyndns/internal/common"
Expand All @@ -10,12 +16,6 @@ import (
"github.com/soerenschneider/dyndns/server"
"github.com/soerenschneider/dyndns/server/dns"
"github.com/soerenschneider/dyndns/server/vault"
"encoding/json"
"flag"
"fmt"
"github.com/aws/aws-sdk-go/aws/credentials"
paho "github.com/eclipse/paho.mqtt.golang"
"github.com/rs/zerolog/log"
"os"
"os/signal"
"syscall"
Expand Down Expand Up @@ -94,7 +94,7 @@ func RunServer(configPath string) {
}
conf.Print()

mqttServer, err := mqtt.NewMqttServer(conf.Broker, conf.ClientId, notificationTopic, HandleChangeRequest)
mqttServer, err := mqtt.NewMqttServer(conf.Brokers, conf.ClientId, notificationTopic, HandleChangeRequest)
if err != nil {
log.Fatal().Msgf("Could not build mqtt dispatcher: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion conf/client.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package conf

import (
"github.com/soerenschneider/dyndns/internal/metrics"
"encoding/json"
"errors"
"fmt"
"github.com/rs/zerolog/log"
"github.com/soerenschneider/dyndns/internal/metrics"
"io/ioutil"
)

Expand Down
12 changes: 7 additions & 5 deletions conf/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,20 @@ import (
)

type MqttConfig struct {
Broker string `json:"broker"`
ClientId string `json:"client_id"`
Brokers []string `json:"brokers"`
ClientId string `json:"client_id"`
}

func (conf *MqttConfig) Print() {
log.Info().Msgf("Broker=%s", conf.Broker)
log.Info().Msgf("Brokers=%v", conf.Brokers)
log.Info().Msgf("ClientId=%s", conf.ClientId)
}

func (conf *MqttConfig) Validate() error {
if !IsValidUrl(conf.Broker) {
return fmt.Errorf("no valid host given: %s", conf.Broker)
for _, broker := range conf.Brokers {
if !IsValidUrl(broker) {
return fmt.Errorf("no valid host given: %s", broker)
}
}

return nil
Expand Down
4 changes: 2 additions & 2 deletions conf/server.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package conf

import (
"github.com/soerenschneider/dyndns/internal/metrics"
"github.com/soerenschneider/dyndns/internal/verification"
"encoding/json"
"errors"
"fmt"
"github.com/rs/zerolog/log"
"github.com/soerenschneider/dyndns/internal/metrics"
"github.com/soerenschneider/dyndns/internal/verification"
"io/ioutil"
)

Expand Down
2 changes: 1 addition & 1 deletion internal/events/eventbus.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package events

import (
"github.com/soerenschneider/dyndns/internal/common"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/soerenschneider/dyndns/internal/common"
)

type EventDispatch interface {
Expand Down
12 changes: 7 additions & 5 deletions internal/events/mqtt/mqtt_dispatcher.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package mqtt

import (
"github.com/soerenschneider/dyndns/internal/common"
"github.com/soerenschneider/dyndns/internal/metrics"
"encoding/json"
"errors"
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/rs/zerolog/log"
"github.com/soerenschneider/dyndns/internal/common"
"github.com/soerenschneider/dyndns/internal/metrics"
"time"
)

Expand All @@ -27,7 +27,7 @@ var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err
metrics.MqttConnectionsLostTotal.Inc()
}

func NewMqttDispatch(broker, clientId, notificationTopic string) (*MqttBus, error) {
func NewMqttDispatch(broker string, clientId, notificationTopic string) (*MqttBus, error) {
opts := mqtt.NewClientOptions()
opts.AddBroker(broker)
opts.SetClientID(clientId)
Expand All @@ -47,9 +47,11 @@ func NewMqttDispatch(broker, clientId, notificationTopic string) (*MqttBus, erro
}, nil
}

func NewMqttServer(broker, clientId, notificationTopic string, handler func(client mqtt.Client, msg mqtt.Message)) (*MqttBus, error) {
func NewMqttServer(brokers []string, clientId, notificationTopic string, handler func(client mqtt.Client, msg mqtt.Message)) (*MqttBus, error) {
opts := mqtt.NewClientOptions()
opts.AddBroker(broker)
for _, broker := range brokers {
opts.AddBroker(broker)
}
opts.SetClientID(clientId)
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectLostHandler
Expand Down
2 changes: 1 addition & 1 deletion internal/util/logging.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package util

import (
"github.com/soerenschneider/dyndns/internal"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/soerenschneider/dyndns/internal"
"golang.org/x/crypto/ssh/terminal"
"os"
"time"
Expand Down
2 changes: 1 addition & 1 deletion internal/verification/common.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package verification

import (
"github.com/soerenschneider/dyndns/internal/common"
"encoding/base64"
"github.com/soerenschneider/dyndns/internal/common"
)

func DecodeBase64(input string) ([]byte, error) {
Expand Down
2 changes: 1 addition & 1 deletion internal/verification/ed25519.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package verification
import (
"crypto/ed25519"
"crypto/rand"
"github.com/soerenschneider/dyndns/internal/common"
"encoding/base64"
"encoding/json"
"fmt"
"github.com/rs/zerolog/log"
"github.com/soerenschneider/dyndns/internal/common"
"io/ioutil"
)

Expand Down
2 changes: 1 addition & 1 deletion server/dns/route53.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package dns

import (
"github.com/soerenschneider/dyndns/internal/common"
"errors"
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/route53"
"github.com/rs/zerolog/log"
"github.com/soerenschneider/dyndns/internal/common"
)

type Route53Propagator struct {
Expand Down
6 changes: 3 additions & 3 deletions server/server.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package server

import (
"errors"
"fmt"
"github.com/rs/zerolog/log"
"github.com/soerenschneider/dyndns/conf"
"github.com/soerenschneider/dyndns/internal/common"
"github.com/soerenschneider/dyndns/internal/events"
"github.com/soerenschneider/dyndns/internal/metrics"
"github.com/soerenschneider/dyndns/internal/util"
"github.com/soerenschneider/dyndns/internal/verification"
"github.com/soerenschneider/dyndns/server/dns"
"errors"
"fmt"
"github.com/rs/zerolog/log"
"time"
)

Expand Down
4 changes: 2 additions & 2 deletions server/vault/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package vault

import (
"bytes"
"github.com/soerenschneider/dyndns/conf"
"github.com/soerenschneider/dyndns/internal/metrics"
"encoding/json"
"errors"
"fmt"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/hashicorp/go-retryablehttp"
"github.com/rs/zerolog/log"
"github.com/soerenschneider/dyndns/conf"
"github.com/soerenschneider/dyndns/internal/metrics"
"io/ioutil"
"net/http"
"time"
Expand Down
2 changes: 1 addition & 1 deletion server/vault/client_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package vault

import (
"github.com/soerenschneider/dyndns/conf"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/soerenschneider/dyndns/conf"
"net/http"
"reflect"
"testing"
Expand Down

0 comments on commit 1cce676

Please sign in to comment.