Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(outputs.graphite): Retry connecting to servers with failed send attempts #11439

Merged
merged 5 commits into from
Jul 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 80 additions & 24 deletions plugins/outputs/graphite/graphite.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import (
"crypto/tls"
_ "embed"
"errors"
"fmt"
"io"
"math/rand"
"net"
"strings"
"time"

"github.com/influxdata/telegraf"
Expand All @@ -34,6 +36,7 @@ type Graphite struct {

conns []net.Conn
tlsint.ClientConfig
failedServers []string
}

func (*Graphite) SampleConfig() string {
Expand All @@ -55,9 +58,31 @@ func (g *Graphite) Connect() error {
return err
}

// Only retry the failed servers
servers := g.Servers
if len(g.failedServers) > 0 {
servers = g.failedServers
// Remove failed server from exisiting connections
var workingConns []net.Conn
for _, conn := range g.conns {
var found bool
for _, server := range servers {
if conn.RemoteAddr().String() == server {
found = true
break
}
}
if !found {
workingConns = append(workingConns, conn)
}
}
g.conns = workingConns
}

// Get Connections
var conns []net.Conn
for _, server := range g.Servers {
var failedServers []string
for _, server := range servers {
// Dialer with timeout
d := net.Dialer{Timeout: time.Duration(g.Timeout) * time.Second}

Expand All @@ -71,9 +96,19 @@ func (g *Graphite) Connect() error {

if err == nil {
conns = append(conns, conn)
} else {
g.Log.Debugf("Failed to establish connection: %v", err)
failedServers = append(failedServers, server)
}
}
g.conns = conns

if len(g.failedServers) > 0 {
g.conns = append(g.conns, conns...)
g.failedServers = failedServers
} else {
g.conns = conns
}

return nil
}

Expand All @@ -90,29 +125,35 @@ func (g *Graphite) Close() error {
// We can detect that by finding an eof
// if not for this, we can happily write and flush without getting errors (in Go) but getting RST tcp packets back (!)
// props to Tv via the authors of carbon-relay-ng` for this trick.
func (g *Graphite) checkEOF(conn net.Conn) {
func (g *Graphite) checkEOF(conn net.Conn) error {
b := make([]byte, 1024)

if err := conn.SetReadDeadline(time.Now().Add(10 * time.Millisecond)); err != nil {
g.Log.Errorf("Couldn't set read deadline for connection %s. closing conn explicitly", conn)
_ = conn.Close()
return
g.Log.Debugf("Couldn't set read deadline for connection due to error %v with remote address %s. closing conn explicitly", err, conn.RemoteAddr().String())
err = conn.Close()
g.Log.Debugf("Failed to close the connection: %v", err)
return err
}
num, err := conn.Read(b)
if err == io.EOF {
g.Log.Errorf("Conn %s is closed. closing conn explicitly", conn)
_ = conn.Close()
return
g.Log.Debugf("Conn %s is closed. closing conn explicitly", conn.RemoteAddr().String())
err = conn.Close()
g.Log.Debugf("Failed to close the connection: %v", err)
return err
}
// just in case i misunderstand something or the remote behaves badly
if num != 0 {
g.Log.Infof("conn %s .conn.Read data? did not expect that. data: %s", conn, b[:num])
}
// Log non-timeout errors or close.
// Log non-timeout errors and close.
if e, ok := err.(net.Error); !(ok && e.Timeout()) {
g.Log.Errorf("conn %s checkEOF .conn.Read returned err != EOF, which is unexpected. closing conn. error: %s", conn, err)
_ = conn.Close()
g.Log.Debugf("conn %s checkEOF .conn.Read returned err != EOF, which is unexpected. closing conn. error: %s", conn, err)
err = conn.Close()
g.Log.Debugf("Failed to close the connection: %v", err)
return err
}

return nil
}

// Choose a random server in the cluster to write to until a successful write
Expand All @@ -135,10 +176,13 @@ func (g *Graphite) Write(metrics []telegraf.Metric) error {

err = g.send(batch)

// try to reconnect and retry to send
if err != nil {
g.Log.Error("Graphite: Reconnecting and retrying...")
_ = g.Connect()
// If a send failed for a server, try to reconnect to that server
if len(g.failedServers) > 0 {
g.Log.Debugf("Reconnecting and retrying for the following servers: %s", strings.Join(g.failedServers, ","))
err = g.Connect()
if err != nil {
return fmt.Errorf("Failed to reconnect: %v", err)
}
err = g.send(batch)
}

Expand All @@ -147,28 +191,40 @@ func (g *Graphite) Write(metrics []telegraf.Metric) error {

func (g *Graphite) send(batch []byte) error {
// This will get set to nil if a successful write occurs
err := errors.New("could not write to any Graphite server in cluster")
globalErr := errors.New("could not write to any Graphite server in cluster")

// Send data to a random server
p := rand.Perm(len(g.conns))
for _, n := range p {
if g.Timeout > 0 {
_ = g.conns[n].SetWriteDeadline(time.Now().Add(time.Duration(g.Timeout) * time.Second))
err := g.conns[n].SetWriteDeadline(time.Now().Add(time.Duration(g.Timeout) * time.Second))
if err != nil {
g.Log.Errorf("failed to set write deadline for %s: %v", g.conns[n].RemoteAddr().String(), err)
// Mark server as failed so a new connection will be made
g.failedServers = append(g.failedServers, g.conns[n].RemoteAddr().String())
}
}
err := g.checkEOF(g.conns[n])
if err != nil {
// Mark server as failed so a new connection will be made
g.failedServers = append(g.failedServers, g.conns[n].RemoteAddr().String())
break
}
g.checkEOF(g.conns[n])
if _, e := g.conns[n].Write(batch); e != nil {
// Error
g.Log.Errorf("Graphite Error: " + e.Error())
g.Log.Debugf("Graphite Error: " + e.Error())
// Close explicitly and let's try the next one
_ = g.conns[n].Close()
err := g.conns[n].Close()
g.Log.Debugf("Failed to close the connection: %v", err)
// Mark server as failed so a new connection will be made
g.failedServers = append(g.failedServers, g.conns[n].RemoteAddr().String())
} else {
// Success
err = nil
globalErr = nil
break
}
}

return err
return globalErr
}

func init() {
Expand Down
21 changes: 14 additions & 7 deletions plugins/outputs/graphite/graphite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ func TestGraphiteOK(t *testing.T) {
require.NoError(t, err3)
t.Log("Finished writing third data")
wg2.Wait()
g.Close()
err := g.Close()
require.NoError(t, err)
}

func TestGraphiteOkWithSeparatorDot(t *testing.T) {
Expand Down Expand Up @@ -160,7 +161,8 @@ func TestGraphiteOkWithSeparatorDot(t *testing.T) {
require.NoError(t, err3)
t.Log("Finished writing third data")
wg2.Wait()
g.Close()
err := g.Close()
require.NoError(t, err)
}

func TestGraphiteOkWithSeparatorUnderscore(t *testing.T) {
Expand Down Expand Up @@ -222,7 +224,8 @@ func TestGraphiteOkWithSeparatorUnderscore(t *testing.T) {
require.NoError(t, err3)
t.Log("Finished writing third data")
wg2.Wait()
g.Close()
err := g.Close()
require.NoError(t, err)
}

func TestGraphiteOKWithMultipleTemplates(t *testing.T) {
Expand Down Expand Up @@ -288,7 +291,8 @@ func TestGraphiteOKWithMultipleTemplates(t *testing.T) {
require.NoError(t, err3)
t.Log("Finished writing third data")
wg2.Wait()
g.Close()
err := g.Close()
require.NoError(t, err)
}

func TestGraphiteOkWithTags(t *testing.T) {
Expand Down Expand Up @@ -350,7 +354,8 @@ func TestGraphiteOkWithTags(t *testing.T) {
require.NoError(t, err3)
t.Log("Finished writing third data")
wg2.Wait()
g.Close()
err := g.Close()
require.NoError(t, err)
}

func TestGraphiteOkWithTagsAndSeparatorDot(t *testing.T) {
Expand Down Expand Up @@ -413,7 +418,8 @@ func TestGraphiteOkWithTagsAndSeparatorDot(t *testing.T) {
require.NoError(t, err3)
t.Log("Finished writing third data")
wg2.Wait()
g.Close()
err := g.Close()
require.NoError(t, err)
}

func TestGraphiteOkWithTagsAndSeparatorUnderscore(t *testing.T) {
Expand Down Expand Up @@ -476,7 +482,8 @@ func TestGraphiteOkWithTagsAndSeparatorUnderscore(t *testing.T) {
require.NoError(t, err3)
t.Log("Finished writing third data")
wg2.Wait()
g.Close()
err := g.Close()
require.NoError(t, err)
}

func TCPServer1(t *testing.T, wg *sync.WaitGroup) {
Expand Down