Skip to content

Commit

Permalink
feat(outputs.graphite): Retry connecting to servers with failed send …
Browse files Browse the repository at this point in the history
…attempts (#11439)
  • Loading branch information
sspaink authored Jul 25, 2022
1 parent a201ae4 commit beb18d9
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 31 deletions.
104 changes: 80 additions & 24 deletions plugins/outputs/graphite/graphite.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import (
"crypto/tls"
_ "embed"
"errors"
"fmt"
"io"
"math/rand"
"net"
"strings"
"time"

"github.com/influxdata/telegraf"
Expand All @@ -34,6 +36,7 @@ type Graphite struct {

conns []net.Conn
tlsint.ClientConfig
failedServers []string
}

func (*Graphite) SampleConfig() string {
Expand All @@ -55,9 +58,31 @@ func (g *Graphite) Connect() error {
return err
}

// Only retry the failed servers
servers := g.Servers
if len(g.failedServers) > 0 {
servers = g.failedServers
// Remove failed server from exisiting connections
var workingConns []net.Conn
for _, conn := range g.conns {
var found bool
for _, server := range servers {
if conn.RemoteAddr().String() == server {
found = true
break
}
}
if !found {
workingConns = append(workingConns, conn)
}
}
g.conns = workingConns
}

// Get Connections
var conns []net.Conn
for _, server := range g.Servers {
var failedServers []string
for _, server := range servers {
// Dialer with timeout
d := net.Dialer{Timeout: time.Duration(g.Timeout) * time.Second}

Expand All @@ -71,9 +96,19 @@ func (g *Graphite) Connect() error {

if err == nil {
conns = append(conns, conn)
} else {
g.Log.Debugf("Failed to establish connection: %v", err)
failedServers = append(failedServers, server)
}
}
g.conns = conns

if len(g.failedServers) > 0 {
g.conns = append(g.conns, conns...)
g.failedServers = failedServers
} else {
g.conns = conns
}

return nil
}

Expand All @@ -90,29 +125,35 @@ func (g *Graphite) Close() error {
// We can detect that by finding an eof
// if not for this, we can happily write and flush without getting errors (in Go) but getting RST tcp packets back (!)
// props to Tv via the authors of carbon-relay-ng` for this trick.
func (g *Graphite) checkEOF(conn net.Conn) {
func (g *Graphite) checkEOF(conn net.Conn) error {
b := make([]byte, 1024)

if err := conn.SetReadDeadline(time.Now().Add(10 * time.Millisecond)); err != nil {
g.Log.Errorf("Couldn't set read deadline for connection %s. closing conn explicitly", conn)
_ = conn.Close()
return
g.Log.Debugf("Couldn't set read deadline for connection due to error %v with remote address %s. closing conn explicitly", err, conn.RemoteAddr().String())
err = conn.Close()
g.Log.Debugf("Failed to close the connection: %v", err)
return err
}
num, err := conn.Read(b)
if err == io.EOF {
g.Log.Errorf("Conn %s is closed. closing conn explicitly", conn)
_ = conn.Close()
return
g.Log.Debugf("Conn %s is closed. closing conn explicitly", conn.RemoteAddr().String())
err = conn.Close()
g.Log.Debugf("Failed to close the connection: %v", err)
return err
}
// just in case i misunderstand something or the remote behaves badly
if num != 0 {
g.Log.Infof("conn %s .conn.Read data? did not expect that. data: %s", conn, b[:num])
}
// Log non-timeout errors or close.
// Log non-timeout errors and close.
if e, ok := err.(net.Error); !(ok && e.Timeout()) {
g.Log.Errorf("conn %s checkEOF .conn.Read returned err != EOF, which is unexpected. closing conn. error: %s", conn, err)
_ = conn.Close()
g.Log.Debugf("conn %s checkEOF .conn.Read returned err != EOF, which is unexpected. closing conn. error: %s", conn, err)
err = conn.Close()
g.Log.Debugf("Failed to close the connection: %v", err)
return err
}

return nil
}

// Choose a random server in the cluster to write to until a successful write
Expand All @@ -135,10 +176,13 @@ func (g *Graphite) Write(metrics []telegraf.Metric) error {

err = g.send(batch)

// try to reconnect and retry to send
if err != nil {
g.Log.Error("Graphite: Reconnecting and retrying...")
_ = g.Connect()
// If a send failed for a server, try to reconnect to that server
if len(g.failedServers) > 0 {
g.Log.Debugf("Reconnecting and retrying for the following servers: %s", strings.Join(g.failedServers, ","))
err = g.Connect()
if err != nil {
return fmt.Errorf("Failed to reconnect: %v", err)
}
err = g.send(batch)
}

Expand All @@ -147,28 +191,40 @@ func (g *Graphite) Write(metrics []telegraf.Metric) error {

func (g *Graphite) send(batch []byte) error {
// This will get set to nil if a successful write occurs
err := errors.New("could not write to any Graphite server in cluster")
globalErr := errors.New("could not write to any Graphite server in cluster")

// Send data to a random server
p := rand.Perm(len(g.conns))
for _, n := range p {
if g.Timeout > 0 {
_ = g.conns[n].SetWriteDeadline(time.Now().Add(time.Duration(g.Timeout) * time.Second))
err := g.conns[n].SetWriteDeadline(time.Now().Add(time.Duration(g.Timeout) * time.Second))
if err != nil {
g.Log.Errorf("failed to set write deadline for %s: %v", g.conns[n].RemoteAddr().String(), err)
// Mark server as failed so a new connection will be made
g.failedServers = append(g.failedServers, g.conns[n].RemoteAddr().String())
}
}
err := g.checkEOF(g.conns[n])
if err != nil {
// Mark server as failed so a new connection will be made
g.failedServers = append(g.failedServers, g.conns[n].RemoteAddr().String())
break
}
g.checkEOF(g.conns[n])
if _, e := g.conns[n].Write(batch); e != nil {
// Error
g.Log.Errorf("Graphite Error: " + e.Error())
g.Log.Debugf("Graphite Error: " + e.Error())
// Close explicitly and let's try the next one
_ = g.conns[n].Close()
err := g.conns[n].Close()
g.Log.Debugf("Failed to close the connection: %v", err)
// Mark server as failed so a new connection will be made
g.failedServers = append(g.failedServers, g.conns[n].RemoteAddr().String())
} else {
// Success
err = nil
globalErr = nil
break
}
}

return err
return globalErr
}

func init() {
Expand Down
21 changes: 14 additions & 7 deletions plugins/outputs/graphite/graphite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ func TestGraphiteOK(t *testing.T) {
require.NoError(t, err3)
t.Log("Finished writing third data")
wg2.Wait()
g.Close()
err := g.Close()
require.NoError(t, err)
}

func TestGraphiteOkWithSeparatorDot(t *testing.T) {
Expand Down Expand Up @@ -160,7 +161,8 @@ func TestGraphiteOkWithSeparatorDot(t *testing.T) {
require.NoError(t, err3)
t.Log("Finished writing third data")
wg2.Wait()
g.Close()
err := g.Close()
require.NoError(t, err)
}

func TestGraphiteOkWithSeparatorUnderscore(t *testing.T) {
Expand Down Expand Up @@ -222,7 +224,8 @@ func TestGraphiteOkWithSeparatorUnderscore(t *testing.T) {
require.NoError(t, err3)
t.Log("Finished writing third data")
wg2.Wait()
g.Close()
err := g.Close()
require.NoError(t, err)
}

func TestGraphiteOKWithMultipleTemplates(t *testing.T) {
Expand Down Expand Up @@ -288,7 +291,8 @@ func TestGraphiteOKWithMultipleTemplates(t *testing.T) {
require.NoError(t, err3)
t.Log("Finished writing third data")
wg2.Wait()
g.Close()
err := g.Close()
require.NoError(t, err)
}

func TestGraphiteOkWithTags(t *testing.T) {
Expand Down Expand Up @@ -350,7 +354,8 @@ func TestGraphiteOkWithTags(t *testing.T) {
require.NoError(t, err3)
t.Log("Finished writing third data")
wg2.Wait()
g.Close()
err := g.Close()
require.NoError(t, err)
}

func TestGraphiteOkWithTagsAndSeparatorDot(t *testing.T) {
Expand Down Expand Up @@ -413,7 +418,8 @@ func TestGraphiteOkWithTagsAndSeparatorDot(t *testing.T) {
require.NoError(t, err3)
t.Log("Finished writing third data")
wg2.Wait()
g.Close()
err := g.Close()
require.NoError(t, err)
}

func TestGraphiteOkWithTagsAndSeparatorUnderscore(t *testing.T) {
Expand Down Expand Up @@ -476,7 +482,8 @@ func TestGraphiteOkWithTagsAndSeparatorUnderscore(t *testing.T) {
require.NoError(t, err3)
t.Log("Finished writing third data")
wg2.Wait()
g.Close()
err := g.Close()
require.NoError(t, err)
}

func TCPServer1(t *testing.T, wg *sync.WaitGroup) {
Expand Down

0 comments on commit beb18d9

Please sign in to comment.