Skip to content

Commit

Permalink
feat: decouple detection and change propagation (#210)
Browse files Browse the repository at this point in the history
* feat: decouple transmitting updates from detecting them

* fix: init map

* fix: run reconciler in goroutine

* fix: range gotcha
  • Loading branch information
soerenschneider authored Jan 27, 2023
1 parent b74e01d commit c687a2d
Show file tree
Hide file tree
Showing 10 changed files with 288 additions and 143 deletions.
23 changes: 7 additions & 16 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"github.com/rs/zerolog/log"
"github.com/soerenschneider/dyndns/client/resolvers"
"github.com/soerenschneider/dyndns/internal/common"
"github.com/soerenschneider/dyndns/internal/events"
"github.com/soerenschneider/dyndns/internal/metrics"
"github.com/soerenschneider/dyndns/internal/verification"
"time"
Expand All @@ -28,25 +27,25 @@ type State interface {
type Client struct {
signature verification.SignatureKeypair
resolver resolvers.IpResolver
dispatchers []events.EventDispatch
reconciler *Reconciler
state State
lastStateChange time.Time
}

func NewClient(resolver resolvers.IpResolver, signature verification.SignatureKeypair, dispatchers []events.EventDispatch) (*Client, error) {
func NewClient(resolver resolvers.IpResolver, signature verification.SignatureKeypair, reconciler *Reconciler) (*Client, error) {
if resolver == nil {
return nil, errors.New("no resolver provided")
}
if signature == nil {
return nil, errors.New("no signature provider given")
}
if dispatchers == nil || len(dispatchers) == 0 {
return nil, errors.New("no dispatchers provided")
if reconciler == nil {
return nil, errors.New("no reconciler provided")
}

c := Client{
resolver: resolver,
dispatchers: dispatchers,
reconciler: reconciler,
signature: signature,
state: &initialState{},
lastStateChange: time.Now(),
Expand Down Expand Up @@ -100,19 +99,11 @@ func (client *Client) Resolve(prev *common.ResolvedIp) (*common.ResolvedIp, erro

if client.state.EvaluateState(client, resolvedIp) {
signature := client.signature.Sign(*resolvedIp)
env := common.Envelope{
env := &common.Envelope{
PublicIp: *resolvedIp,
Signature: signature,
}

for _, dispatcher := range client.dispatchers {
err := dispatcher.Notify(env)
if err != nil {
metrics.UpdateDispatchErrors.WithLabelValues(client.resolver.Host()).Inc()
return resolvedIp, fmt.Errorf("could not dispatch ip update notification: %v", err)
}
}
metrics.UpdatesDispatched.Inc()
client.reconciler.RegisterUpdate(env)
}

return resolvedIp, nil
Expand Down
97 changes: 97 additions & 0 deletions client/reconciler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package client

import (
"errors"
"github.com/rs/zerolog/log"
"github.com/soerenschneider/dyndns/internal/common"
"github.com/soerenschneider/dyndns/internal/events"
"github.com/soerenschneider/dyndns/internal/metrics"
"sync"
"time"
)

type Reconciler struct {
env *common.Envelope
dispatchers map[string]events.EventDispatch
mutex sync.Mutex

pendingChanges map[string]events.EventDispatch
}

func NewReconciler(dispatchers map[string]events.EventDispatch) (*Reconciler, error) {
if len(dispatchers) < 1 {
return nil, errors.New("no dispatchers supplied")
}

return &Reconciler{
dispatchers: dispatchers,
mutex: sync.Mutex{},
}, nil
}

func (r *Reconciler) RegisterUpdate(env *common.Envelope) {
if env == nil {
return
}

r.mutex.Lock()
r.env = env

r.pendingChanges = make(map[string]events.EventDispatch, len(r.dispatchers))
for i, dispatcher := range r.dispatchers {
r.pendingChanges[i] = dispatcher
}
metrics.ReconcilersActive.WithLabelValues(env.PublicIp.Host).Set(float64(len(r.pendingChanges)))

r.mutex.Unlock()
r.dispatch()
}

func (r *Reconciler) dispatch() {
r.mutex.Lock()
defer r.mutex.Unlock()

if len(r.pendingChanges) == 0 {
return
}

metrics.ReconcilerTimestamp.WithLabelValues(r.env.PublicIp.Host).SetToCurrentTime()
log.Info().Msgf("Reconciling %d dispatchers", len(r.pendingChanges))

timeStart := time.Now()
wg := sync.WaitGroup{}
wg.Add(len(r.pendingChanges))
for key, _ := range r.pendingChanges {
dispatcher := r.pendingChanges[key]
go func(key string) {
err := dispatcher.Notify(r.env)
if err == nil {
r.pendingChanges[key] = nil
delete(r.pendingChanges, key)
metrics.UpdatesDispatched.Inc()
log.Info().Msgf("Reconciliation for dispatcher %s successful", key)
} else {
log.Warn().Msgf("Reconciliation for dispatcher %s failed: %v", key, err)
}
wg.Done()
}(key)
}

wg.Wait()
timeSpent := time.Now().Sub(timeStart)

log.Info().Msgf("Spent %v on reconciliation (%d dispatchers)", timeSpent, len(r.dispatchers))
metrics.ReconcilersActive.WithLabelValues(r.env.PublicIp.Host).Set(float64(len(r.pendingChanges)))
}

func (r *Reconciler) Run() {
interval := 1 * time.Minute
ticker := time.NewTicker(interval)

for {
select {
case <-ticker.C:
r.dispatch()
}
}
}
4 changes: 2 additions & 2 deletions client/states.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ func (state *ipNotConfirmedState) EvaluateState(context *Client, resolved *commo
}

log.Info().Msgf("DNS entry for host %s differs to new ip: %v", resolved.Host, resolved)
if state.checks%120 == 0 {
log.Info().Msgf("Verifying for %d minutes already, re-sending message..", int64(state.waitInterval.Seconds())*state.checks/60)
if state.checks%10 == 0 {
log.Info().Msgf("Verifying for %v minutes already, re-sending message..", int64(state.waitInterval.Seconds())*state.checks/60)
return true
}

Expand Down
24 changes: 18 additions & 6 deletions cmd/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,18 +107,30 @@ func RunClient(conf *conf.ClientConf) {
resolver, _ = resolvers.NewHttpResolver(conf.Host, conf.Urls)
}

var dispatchers []events.EventDispatch
dispatcher, err := mqtt.NewMqttDispatch(conf.Brokers, conf.ClientId, fmt.Sprintf("dyndns/%s", conf.Host), conf.TlsConfig())
if err != nil {
log.Error().Msgf("Could not build mqtt dispatcher: %v", err)
dispatchers := map[string]events.EventDispatch{}
for _, broker := range conf.Brokers {
dispatcher, err := mqtt.NewMqttClient(broker, conf.ClientId, fmt.Sprintf("dyndns/%s", conf.Host), conf.TlsConfig())
if err != nil {
log.Error().Msgf("Could not build mqtt dispatcher: %v", err)
} else {
dispatchers[broker] = dispatcher
}
}
dispatchers = append(dispatchers, dispatcher)

client, err := client.NewClient(resolver, keypair, dispatchers)
if len(dispatchers) == 0 {
log.Fatal().Msg("not a single dispatcher built, exiting")
}

reconciler, err := client.NewReconciler(dispatchers)
if err != nil {
log.Fatal().Err(err).Msg("could not build reconciler")
}
client, err := client.NewClient(resolver, keypair, reconciler)
if err != nil {
log.Fatal().Msgf("could not build client: %v", err)
}

go reconciler.Run()
if conf.Once {
_, err := client.ResolveSingle()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/events/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
)

type EventDispatch interface {
Notify(msg common.Envelope) error
Notify(msg *common.Envelope) error
}

type EventListener interface {
Expand Down
73 changes: 73 additions & 0 deletions internal/events/mqtt/mqtt_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
//go:build client

package mqtt

import (
"crypto/tls"
"encoding/json"
"errors"
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/rs/zerolog/log"
"github.com/soerenschneider/dyndns/internal/common"
"github.com/soerenschneider/dyndns/internal/metrics"
"time"
)

const publishWaitTimeout = 2 * time.Minute

type MqttClientBus struct {
client mqtt.Client
notificationTopic string
}

var onConnectHandler = func(c mqtt.Client) {
opts := c.OptionsReader()
log.Info().Msgf("Connected to brokers %v", opts.Servers())
mutex.Lock()
metrics.MqttBrokersConnectedTotal.Add(1)
mutex.Unlock()
}

func NewMqttClient(broker string, clientId, notificationTopic string, tlsConfig *tls.Config) (*MqttClientBus, error) {
opts := mqtt.NewClientOptions()
opts.AddBroker(broker)
opts.SetClientID(clientId)

opts.OnConnectionLost = connectLostHandler
opts.OnConnect = onConnectHandler
opts.AutoReconnect = true

if tlsConfig != nil {
opts.SetTLSConfig(tlsConfig)
}

client := mqtt.NewClient(opts)
token := client.Connect()
if token.WaitTimeout(60*time.Second) && token.Error() != nil {
return nil, fmt.Errorf("could not connect to %s: %v", broker, token.Error())
}

return &MqttClientBus{
client: client,
notificationTopic: notificationTopic,
}, nil
}

func (d *MqttClientBus) Notify(msg *common.Envelope) error {
payload, err := json.Marshal(msg)
if err != nil {
return fmt.Errorf("could not marshal envelope: %v", err)
}
opts := d.client.OptionsReader()
log.Debug().Msgf("Sending %v to %v", string(payload), opts.Servers())

token := d.client.Publish(d.notificationTopic, 1, true, payload)
ok := token.WaitTimeout(publishWaitTimeout)
if !ok {
return errors.New("received timeout when trying to publish the message")
}
log.Debug().Msgf("Dispatched message to %v", opts.Servers())

return nil
}
19 changes: 19 additions & 0 deletions internal/events/mqtt/mqtt_common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package mqtt

import (
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/rs/zerolog/log"
"github.com/soerenschneider/dyndns/internal/metrics"
"sync"
)

var mutex sync.Mutex

var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
opts := client.OptionsReader()
log.Info().Msgf("Connection lost from %v: %v", opts.Servers(), err)
metrics.MqttConnectionsLostTotal.Inc()
mutex.Lock()
defer mutex.Unlock()
metrics.MqttBrokersConnectedTotal.Sub(1)
}
Loading

0 comments on commit c687a2d

Please sign in to comment.