From 0649aa0c3a771949f71e86ce4c5343d3291cc8ad Mon Sep 17 00:00:00 2001 From: Corentin Chary Date: Fri, 24 May 2024 13:43:05 +0200 Subject: [PATCH 1/2] uds: add exponential backoff for reconnect --- statsd/uds.go | 34 +++++++++++++++++++++++++++++++--- 1 file changed, 31 insertions(+), 3 deletions(-) diff --git a/statsd/uds.go b/statsd/uds.go index 09518992..5f6afa20 100644 --- a/statsd/uds.go +++ b/statsd/uds.go @@ -21,6 +21,8 @@ type udsWriter struct { conn net.Conn // write timeout writeTimeout time.Duration + // current write timeout + currentWriteTimeout time.Duration // connect timeout connectTimeout time.Duration sync.RWMutex // used to lock conn / writer can replace it @@ -29,7 +31,14 @@ type udsWriter struct { // newUDSWriter returns a pointer to a new udsWriter given a socket file path as addr. func newUDSWriter(addr string, writeTimeout time.Duration, connectTimeout time.Duration, transport string) (*udsWriter, error) { // Defer connection to first Write - writer := &udsWriter{addr: addr, transport: transport, conn: nil, writeTimeout: writeTimeout, connectTimeout: connectTimeout} + writer := &udsWriter{ + addr: addr, + transport: transport, + conn: nil, + writeTimeout: writeTimeout, + currentWriteTimeout: writeTimeout, + connectTimeout: connectTimeout, + } return writer, nil } @@ -70,7 +79,7 @@ func (w *udsWriter) Write(data []byte) (int, error) { // When using streams the deadline will only make us drop the packet if we can't write it at all, // once we've started writing we need to finish. - conn.SetWriteDeadline(time.Now().Add(w.writeTimeout)) + _ = conn.SetWriteDeadline(time.Now().Add(w.currentWriteTimeout)) // When using streams, we append the length of the packet to the data if stream { @@ -82,7 +91,7 @@ func (w *udsWriter) Write(data []byte) (int, error) { // W need to be able to finish to write partially written packets once we have started. // But we will reset the connection if we can't write anything at all for a long time. - w.conn.SetWriteDeadline(time.Now().Add(w.connectTimeout)) + _ = w.conn.SetWriteDeadline(time.Now().Add(w.connectTimeout)) // Continue writing only if we've written the length of the packet if err == nil { @@ -96,11 +105,28 @@ func (w *udsWriter) Write(data []byte) (int, error) { } if w.shouldCloseConnection(err, partialWrite) { + w.maybeIncreaseTimeout(stream) w.unsetConnection() } return n, err } +func (w *udsWriter) maybeIncreaseTimeout(stream bool) { + if stream { + // For uds-stream we want to gradually increase the write timeout + // up to the connect timeout to avoid overloading the agent with new connections. + // We increase by 20% or 500ms up to the connection timeout. + incr := w.currentWriteTimeout / 100 * 20 + if incr == time.Duration(0) { + incr = time.Millisecond * 500 + } + w.currentWriteTimeout += incr + if w.currentWriteTimeout > w.connectTimeout { + w.currentWriteTimeout = w.connectTimeout + } + } +} + func (w *udsWriter) Close() error { if w.conn != nil { return w.conn.Close() @@ -154,6 +180,8 @@ func (w *udsWriter) ensureConnection() (net.Conn, error) { if err != nil { return nil, err } + // reset the write timeout + w.currentWriteTimeout = w.writeTimeout w.conn = newConn w.transport = newConn.RemoteAddr().Network() return newConn, nil From 1785ac407bc8abfc6b9340640fc9862c0dd90f4c Mon Sep 17 00:00:00 2001 From: Corentin Chary Date: Mon, 27 May 2024 16:06:13 +0200 Subject: [PATCH 2/2] Validate that write timeout is greater than connection timeout --- statsd/statsd.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/statsd/statsd.go b/statsd/statsd.go index bfdef0f9..f10508a3 100644 --- a/statsd/statsd.go +++ b/statsd/statsd.go @@ -370,6 +370,9 @@ func parseAgentURL(agentURL string) string { func createWriter(addr string, writeTimeout time.Duration, connectTimeout time.Duration) (Transport, string, error) { addr = resolveAddr(addr) + if writeTimeout > connectTimeout { + return nil, "", errors.New("write timeout cannot be greater than connect timeout") + } if addr == "" { return nil, "", errors.New("No address passed and autodetection from environment failed") }