Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

uds: add exponential backoff write timeout #307

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 31 additions & 3 deletions statsd/uds.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ type udsWriter struct {
conn net.Conn
// write timeout
writeTimeout time.Duration
// current write timeout
currentWriteTimeout time.Duration
// connect timeout
connectTimeout time.Duration
sync.RWMutex // used to lock conn / writer can replace it
Expand All @@ -29,7 +31,14 @@ type udsWriter struct {
// newUDSWriter returns a pointer to a new udsWriter given a socket file path as addr.
func newUDSWriter(addr string, writeTimeout time.Duration, connectTimeout time.Duration, transport string) (*udsWriter, error) {
// Defer connection to first Write
writer := &udsWriter{addr: addr, transport: transport, conn: nil, writeTimeout: writeTimeout, connectTimeout: connectTimeout}
writer := &udsWriter{
addr: addr,
transport: transport,
conn: nil,
writeTimeout: writeTimeout,
currentWriteTimeout: writeTimeout,
connectTimeout: connectTimeout,
}
return writer, nil
}

Expand Down Expand Up @@ -70,7 +79,7 @@ func (w *udsWriter) Write(data []byte) (int, error) {

// When using streams the deadline will only make us drop the packet if we can't write it at all,
// once we've started writing we need to finish.
conn.SetWriteDeadline(time.Now().Add(w.writeTimeout))
_ = conn.SetWriteDeadline(time.Now().Add(w.currentWriteTimeout))

// When using streams, we append the length of the packet to the data
if stream {
Expand All @@ -82,7 +91,7 @@ func (w *udsWriter) Write(data []byte) (int, error) {

// W need to be able to finish to write partially written packets once we have started.
// But we will reset the connection if we can't write anything at all for a long time.
w.conn.SetWriteDeadline(time.Now().Add(w.connectTimeout))
_ = w.conn.SetWriteDeadline(time.Now().Add(w.connectTimeout))

// Continue writing only if we've written the length of the packet
if err == nil {
Expand All @@ -96,11 +105,28 @@ func (w *udsWriter) Write(data []byte) (int, error) {
}

if w.shouldCloseConnection(err, partialWrite) {
w.maybeIncreaseTimeout(stream)
w.unsetConnection()
}
return n, err
}

func (w *udsWriter) maybeIncreaseTimeout(stream bool) {
if stream {
// For uds-stream we want to gradually increase the write timeout
// up to the connect timeout to avoid overloading the agent with new connections.
// We increase by 20% or 500ms up to the connection timeout.
incr := w.currentWriteTimeout / 100 * 20
if incr == time.Duration(0) {
incr = time.Millisecond * 500
}
w.currentWriteTimeout += incr
if w.currentWriteTimeout > w.connectTimeout {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assumes that connection timeout is larger than the write timeout, although this might not be always the case. Would it make sense to ensure that connectTimeout is at least as big as writeTimeout, or perhaps use a separate upper bound?

w.currentWriteTimeout = w.connectTimeout
}
}
}

func (w *udsWriter) Close() error {
if w.conn != nil {
return w.conn.Close()
Expand Down Expand Up @@ -154,6 +180,8 @@ func (w *udsWriter) ensureConnection() (net.Conn, error) {
if err != nil {
return nil, err
}
// reset the write timeout
w.currentWriteTimeout = w.writeTimeout
w.conn = newConn
w.transport = newConn.RemoteAddr().Network()
return newConn, nil
Expand Down
Loading