diff --git a/gbn/config.go b/gbn/config.go index 6a027b4d..1c5b68ec 100644 --- a/gbn/config.go +++ b/gbn/config.go @@ -2,6 +2,82 @@ package gbn import "time" +// TimeoutOptions can be used to modify the default timeout values used within +// the TimeoutManager. +type TimeoutOptions func(manager *TimeoutManager) + +// WithStaticResendTimeout is used to set a static resend timeout. This is the +// time to wait for ACKs before resending the queue. +func WithStaticResendTimeout(timeout time.Duration) TimeoutOptions { + return func(manager *TimeoutManager) { + manager.useStaticTimeout = true + manager.resendTimeout = timeout + } +} + +// WithResendMultiplier is used to set the resend multiplier. This is the +// multiplier we use when dynamically setting the resend timeout, based on how +// long it took for other party to respond. +// Note that when setting the resend timeout manually with the +// WithStaticResendTimeout option, this option will have no effect. +// Note that the passed multiplier must be greater than zero or this option will +// have no effect. +func WithResendMultiplier(multiplier int) TimeoutOptions { + return func(manager *TimeoutManager) { + if multiplier > 0 { + manager.resendMultiplier = multiplier + } + } +} + +// WithTimeoutUpdateFrequency is used to set the frequency of how many +// corresponding responses we need to receive until updating the resend timeout. +// Note that when setting the resend timeout manually with the WithTimeout +// option, this option will have no effect. +// Also note that the passed frequency must be greater than zero or this option +// will have no effect. +func WithTimeoutUpdateFrequency(frequency int) TimeoutOptions { + return func(manager *TimeoutManager) { + if frequency > 0 { + manager.timeoutUpdateFrequency = frequency + } + } +} + +// WithHandshakeTimeout is used to set the timeout used during the handshake. +// If the timeout is reached without response from the peer then the handshake +// will be aborted and restarted. +func WithHandshakeTimeout(timeout time.Duration) TimeoutOptions { + return func(manager *TimeoutManager) { + manager.handshakeTimeout = timeout + } +} + +// WithKeepalivePing is used to send a ping packet if no packets have been +// received from the other side for the given duration. This helps keep the +// connection alive and also ensures that the connection is closed if the +// other side does not respond to the ping in a timely manner. After the ping +// the connection will be closed if the other side does not respond within +// time duration. +func WithKeepalivePing(ping, pong time.Duration) TimeoutOptions { + return func(manager *TimeoutManager) { + manager.pingTime = ping + manager.pongTime = pong + } +} + +// WithBoostPercent is used to set the boost percent that the timeout manager +// will use to boost the resend timeout & handshake timeout every time a resend +// is required due to not receiving a response within the current timeout. +func WithBoostPercent(boostPercent float32) TimeoutOptions { + return func(manager *TimeoutManager) { + if boostPercent > 0 { + manager.resendBoostPercent = boostPercent + manager.handshakeBoostPercent = boostPercent + } + } +} + // config holds the configuration values for an instance of GoBackNConn. type config struct { // n is the window size. The sender can send a maximum of n packets @@ -26,10 +102,6 @@ type config struct { // between packets. maxChunkSize int - // resendTimeout is the duration that will be waited before resending - // the packets in the current queue. - resendTimeout time.Duration - // recvFromStream is the function that will be used to acquire the next // available packet. recvFromStream recvBytesFunc @@ -42,13 +114,7 @@ type config struct { // been received and processed. onFIN func() - // handshakeTimeout is the time after which the server or client - // will abort and restart the handshake if the expected response is - // not received from the peer. - handshakeTimeout time.Duration - - pingTime time.Duration - pongTime time.Duration + timeoutOptions []TimeoutOptions } // newConfig constructs a new config struct. @@ -56,11 +122,9 @@ func newConfig(sendFunc sendBytesFunc, recvFunc recvBytesFunc, n uint8) *config { return &config{ - n: n, - s: n + 1, - recvFromStream: recvFunc, - sendToStream: sendFunc, - resendTimeout: defaultResendTimeout, - handshakeTimeout: defaultHandshakeTimeout, + n: n, + s: n + 1, + recvFromStream: recvFunc, + sendToStream: sendFunc, } } diff --git a/gbn/gbn_client.go b/gbn/gbn_client.go index c1032e26..1c57c859 100644 --- a/gbn/gbn_client.go +++ b/gbn/gbn_client.go @@ -99,6 +99,7 @@ func (g *GoBackNConn) clientHandshake() error { var ( resp Message respSYN *PacketSYN + resent bool ) handshake: for { @@ -115,6 +116,9 @@ handshake: return err } + // Notify the timeout manager that we sent a SYN. + g.timeoutManager.Sent(msg, resent) + for { // Wait for SYN g.log.Debugf("Waiting for SYN") @@ -128,11 +132,14 @@ handshake: default: } + timeout := g.timeoutManager.GetHandshakeTimeout() + var b []byte select { - case <-time.After(g.cfg.handshakeTimeout): + case <-time.After(timeout): g.log.Debugf("SYN resendTimeout. Resending " + "SYN.") + resent = true continue handshake case <-g.quit: @@ -171,6 +178,10 @@ handshake: return io.EOF } + // Notify the timeout manager we've received the SYN response from the + // counterparty. + g.timeoutManager.Received(resp) + // Send SYNACK g.log.Debugf("Sending SYNACK") synack, err := new(PacketSYNACK).Serialize() diff --git a/gbn/gbn_conn.go b/gbn/gbn_conn.go index 50a0cc93..744462ca 100644 --- a/gbn/gbn_conn.go +++ b/gbn/gbn_conn.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "io" - "math" "sync" "time" @@ -21,12 +20,7 @@ var ( ) const ( - DefaultN = 20 - defaultHandshakeTimeout = 100 * time.Millisecond - defaultResendTimeout = 100 * time.Millisecond - finSendTimeout = 1000 * time.Millisecond - DefaultSendTimeout = math.MaxInt64 - DefaultRecvTimeout = math.MaxInt64 + DefaultN = 20 ) type sendBytesFunc func(ctx context.Context, b []byte) error @@ -46,10 +40,6 @@ type GoBackNConn struct { recvDataChan chan *PacketData sendDataChan chan *PacketData - sendTimeout time.Duration - recvTimeout time.Duration - timeoutsMu sync.RWMutex - log btclog.Logger // receivedACKSignal channel is used to signal that the queue size has @@ -70,6 +60,10 @@ type GoBackNConn struct { // remoteClosed is closed if the remote party initiated the FIN sequence. remoteClosed chan struct{} + // timeoutManager is used to manage all the timeouts used by the + // GoBackNConn. + timeoutManager *TimeoutManager + // quit is used to stop the normal operations of the connection. // Once closed, the send and receive streams will still be available // for the FIN sequence. @@ -89,12 +83,12 @@ func newGoBackNConn(ctx context.Context, cfg *config, prefix := fmt.Sprintf("(%s)", loggerPrefix) plog := build.NewPrefixLog(prefix, log) + timeoutManager := NewTimeOutManager(plog, cfg.timeoutOptions...) + g := &GoBackNConn{ cfg: cfg, recvDataChan: make(chan *PacketData, cfg.n), sendDataChan: make(chan *PacketData), - recvTimeout: DefaultRecvTimeout, - sendTimeout: DefaultSendTimeout, receivedACKSignal: make(chan struct{}), resendSignal: make(chan struct{}, 1), remoteClosed: make(chan struct{}), @@ -102,50 +96,49 @@ func newGoBackNConn(ctx context.Context, cfg *config, cancel: cancel, log: plog, quit: make(chan struct{}), + timeoutManager: timeoutManager, } - g.sendQueue = newQueue(&queueCfg{ - s: cfg.n + 1, - timeout: cfg.resendTimeout, - log: plog, - sendPkt: func(packet *PacketData) error { - return g.sendPacket(g.ctx, packet) + g.sendQueue = newQueue( + &queueCfg{ + s: cfg.n + 1, + log: plog, + sendPkt: func(packet *PacketData) error { + return g.sendPacket(g.ctx, packet, true) + }, }, - }) + timeoutManager, + ) return g } -// setN sets the current N to use. This _must_ be set before the handshake is -// completed. -func (g *GoBackNConn) setN(n uint8) { - g.cfg.n = n - g.cfg.s = n + 1 - g.recvDataChan = make(chan *PacketData, n) - g.sendQueue = newQueue(&queueCfg{ - s: n + 1, - timeout: g.cfg.resendTimeout, - log: g.log, - sendPkt: func(packet *PacketData) error { - return g.sendPacket(g.ctx, packet) - }, - }) -} - // SetSendTimeout sets the timeout used in the Send function. func (g *GoBackNConn) SetSendTimeout(timeout time.Duration) { - g.timeoutsMu.Lock() - defer g.timeoutsMu.Unlock() - - g.sendTimeout = timeout + g.timeoutManager.SetSendTimeout(timeout) } // SetRecvTimeout sets the timeout used in the Recv function. func (g *GoBackNConn) SetRecvTimeout(timeout time.Duration) { - g.timeoutsMu.Lock() - defer g.timeoutsMu.Unlock() + g.timeoutManager.SetRecvTimeout(timeout) +} - g.recvTimeout = timeout +// setN sets the current N to use. This _must_ be set before the handshake is +// completed. +func (g *GoBackNConn) setN(n uint8) { + g.cfg.n = n + g.cfg.s = n + 1 + g.recvDataChan = make(chan *PacketData, n) + g.sendQueue = newQueue( + &queueCfg{ + s: n + 1, + log: g.log, + sendPkt: func(packet *PacketData) error { + return g.sendPacket(g.ctx, packet, true) + }, + }, + g.timeoutManager, + ) } // Send blocks until an ack is received for the packet sent N packets before. @@ -157,9 +150,7 @@ func (g *GoBackNConn) Send(data []byte) error { default: } - g.timeoutsMu.RLock() - ticker := time.NewTimer(g.sendTimeout) - g.timeoutsMu.RUnlock() + ticker := time.NewTimer(g.timeoutManager.GetSendTimeout()) defer ticker.Stop() sendPacket := func(packet *PacketData) error { @@ -221,9 +212,7 @@ func (g *GoBackNConn) Recv() ([]byte, error) { msg *PacketData ) - g.timeoutsMu.RLock() - ticker := time.NewTimer(g.recvTimeout) - g.timeoutsMu.RUnlock() + ticker := time.NewTimer(g.timeoutManager.GetRecvTimeout()) defer ticker.Stop() for { @@ -250,22 +239,16 @@ func (g *GoBackNConn) Recv() ([]byte, error) { func (g *GoBackNConn) start() { g.log.Debugf("Starting") - pingTime := time.Duration(math.MaxInt64) - if g.cfg.pingTime != 0 { - pingTime = g.cfg.pingTime - } - - g.pingTicker = NewIntervalAwareForceTicker(pingTime) + g.pingTicker = NewIntervalAwareForceTicker( + g.timeoutManager.GetPingTime(), + ) g.pingTicker.Resume() - pongTime := time.Duration(math.MaxInt64) - if g.cfg.pongTime != 0 { - pongTime = g.cfg.pongTime - } - - g.pongTicker = NewIntervalAwareForceTicker(pongTime) + g.pongTicker = NewIntervalAwareForceTicker( + g.timeoutManager.GetPongTime(), + ) - g.resendTicker = time.NewTicker(g.cfg.resendTimeout) + g.resendTicker = time.NewTicker(g.timeoutManager.GetResendTimeout()) g.wg.Add(1) go func() { @@ -322,10 +305,12 @@ func (g *GoBackNConn) Close() error { g.log.Tracef("Try sending FIN") ctxc, cancel := context.WithTimeout( - g.ctx, finSendTimeout, + g.ctx, g.timeoutManager.GetFinSendTimeout(), ) defer cancel() - if err := g.sendPacket(ctxc, &PacketFIN{}); err != nil { + + err := g.sendPacket(ctxc, &PacketFIN{}, false) + if err != nil { g.log.Errorf("Error sending FIN: %v", err) } } @@ -353,7 +338,9 @@ func (g *GoBackNConn) Close() error { } // sendPacket serializes a message and writes it to the underlying send stream. -func (g *GoBackNConn) sendPacket(ctx context.Context, msg Message) error { +func (g *GoBackNConn) sendPacket(ctx context.Context, msg Message, + isResend bool) error { + b, err := msg.Serialize() if err != nil { return fmt.Errorf("serialize error: %s", err) @@ -364,6 +351,9 @@ func (g *GoBackNConn) sendPacket(ctx context.Context, msg Message) error { return fmt.Errorf("error calling sendToStream: %s", err) } + // Notify the timeout manager that a message has been sent. + g.timeoutManager.Sent(msg, isResend) + return nil } @@ -387,7 +377,7 @@ func (g *GoBackNConn) sendPacketsForever() error { // execute. That can happen if the function was awaiting the // expected ACK for a long time, or times out while awaiting the // catch up. - g.resendTicker.Reset(g.cfg.resendTimeout) + g.resendTicker.Reset(g.timeoutManager.GetResendTimeout()) // Also drain the resend signal channel, as resendTicker.Reset // doesn't drain the channel if the ticker ticked during the @@ -423,11 +413,26 @@ func (g *GoBackNConn) sendPacketsForever() error { continue case <-g.pingTicker.Ticks(): + // If we have expected a sync after sending the previous + // ping, both the pingTicker and pongTicker may have + // ticked when waiting to sync. In that case, we can't + // be sure which of the signals we receive over first in + // the above select. We therefore need to check if the + // pong ticker has ticked here to ensure that it get's + // prioritized over the ping ticker. + select { + case <-g.pongTicker.Ticks(): + return errKeepaliveTimeout + default: + } // Start the pong timer. g.pongTicker.Reset() g.pongTicker.Resume() + // Also reset the ping timer. + g.pingTicker.Reset() + g.log.Tracef("Sending a PING packet") packet = &PacketData{ @@ -445,7 +450,7 @@ func (g *GoBackNConn) sendPacketsForever() error { g.sendQueue.addPacket(packet) g.log.Tracef("Sending data %d", packet.Seq) - if err := g.sendPacket(g.ctx, packet); err != nil { + if err := g.sendPacket(g.ctx, packet, false); err != nil { return err } @@ -507,6 +512,9 @@ func (g *GoBackNConn) receivePacketsForever() error { // nolint:gocyclo return fmt.Errorf("deserialize error: %s", err) } + // Notify the timeout manager that a message has been received. + g.timeoutManager.Received(msg) + // Reset the ping & pong timer if any packet is received. // If ping/pong is disabled, this is a no-op. g.pingTicker.Reset() @@ -514,7 +522,7 @@ func (g *GoBackNConn) receivePacketsForever() error { // nolint:gocyclo g.pongTicker.Pause() } - g.resendTicker.Reset(g.cfg.resendTimeout) + g.resendTicker.Reset(g.timeoutManager.GetResendTimeout()) switch m := msg.(type) { case *PacketData: @@ -531,7 +539,8 @@ func (g *GoBackNConn) receivePacketsForever() error { // nolint:gocyclo Seq: m.Seq, } - if err = g.sendPacket(g.ctx, ack); err != nil { + err = g.sendPacket(g.ctx, ack, false) + if err != nil { return err } @@ -572,8 +581,9 @@ func (g *GoBackNConn) receivePacketsForever() error { // nolint:gocyclo // the resend, and therefore won't react to the // NACK we send here in time. sinceSent := time.Since(lastNackTime) - recentlySent := sinceSent < - g.cfg.resendTimeout*2 + + timeout := g.timeoutManager.GetResendTimeout() + recentlySent := sinceSent < timeout*2 if lastNackSeq == g.recvSeq && recentlySent { g.log.Tracef("Recently sent NACK") @@ -589,7 +599,8 @@ func (g *GoBackNConn) receivePacketsForever() error { // nolint:gocyclo Seq: g.recvSeq, } - if err = g.sendPacket(g.ctx, nack); err != nil { + err = g.sendPacket(g.ctx, nack, false) + if err != nil { return err } diff --git a/gbn/gbn_server.go b/gbn/gbn_server.go index 68e3b5b7..dc53ca9d 100644 --- a/gbn/gbn_server.go +++ b/gbn/gbn_server.go @@ -82,7 +82,9 @@ func (g *GoBackNConn) serverHandshake() error { // nolint:gocyclo }() var n uint8 + var resent bool +handshakeLoop: for { g.log.Debugf("Waiting for client SYN") select { @@ -110,6 +112,24 @@ func (g *GoBackNConn) serverHandshake() error { // nolint:gocyclo switch msg.(type) { case *PacketSYN: + + case *PacketSYNACK, *PacketData: + // If we receive a SYNACK or DATA packet after we have + // restarted the handshake, we can be sure that the + // client has received our SYN and has completed the + // handshake. We can therefore complete the handshake + // ourselves. + if resent { + g.log.Tracef("Received %T after restarting "+ + "handshake", msg) + g.timeoutManager.Received(msg) + + break handshakeLoop + } + + g.log.Tracef("Expected SYN, got %T", msg) + + continue default: g.log.Tracef("Expected SYN, got %T", msg) continue @@ -131,6 +151,9 @@ func (g *GoBackNConn) serverHandshake() error { // nolint:gocyclo return err } + // Notify the timeout manager that we sent a SYN. + g.timeoutManager.Sent(msg, resent) + // Wait for SYNACK g.log.Debugf("Waiting for client SYNACK") select { @@ -143,9 +166,11 @@ func (g *GoBackNConn) serverHandshake() error { // nolint:gocyclo } select { - case <-time.After(g.cfg.handshakeTimeout): + case <-time.After(g.timeoutManager.GetHandshakeTimeout()): g.log.Debugf("SYNCACK resendTimeout. Abort and wait " + "for client to re-initiate") + resent = true + continue case err := <-errChan: return err @@ -163,9 +188,17 @@ func (g *GoBackNConn) serverHandshake() error { // nolint:gocyclo switch msg.(type) { case *PacketSYNACK: + g.log.Debugf("Received SYNACK") + + // Notify the timeout manager we've received the SYNACK + // response from the counterparty. + g.timeoutManager.Received(msg) + break case *PacketSYN: g.log.Debugf("Received SYN. Resend SYN.") + resent = true + goto recvClientSYN default: return io.EOF @@ -173,8 +206,6 @@ func (g *GoBackNConn) serverHandshake() error { // nolint:gocyclo break } - g.log.Debugf("Received SYNACK") - // Set all variables that are dependent on the value of N that we get // from the client g.setN(n) diff --git a/gbn/options.go b/gbn/options.go index 4adcb3a0..ca55b955 100644 --- a/gbn/options.go +++ b/gbn/options.go @@ -1,7 +1,5 @@ package gbn -import "time" - type Option func(conn *config) // WithMaxSendSize is used to set the maximum payload size in bytes per packet. @@ -14,33 +12,11 @@ func WithMaxSendSize(size int) Option { } } -// WithTimeout is used to set the resend timeout. This is the time to wait -// for ACKs before resending the queue. -func WithTimeout(timeout time.Duration) Option { - return func(conn *config) { - conn.resendTimeout = timeout - } -} - -// WithHandshakeTimeout is used to set the timeout used during the handshake. -// If the timeout is reached without response from the peer then the handshake -// will be aborted and restarted. -func WithHandshakeTimeout(timeout time.Duration) Option { - return func(conn *config) { - conn.handshakeTimeout = timeout - } -} - -// WithKeepalivePing is used to send a ping packet if no packets have been -// received from the other side for the given duration. This helps keep the -// connection alive and also ensures that the connection is closed if the -// other side does not respond to the ping in a timely manner. After the ping -// the connection will be closed if the other side does not respond within -// time duration. -func WithKeepalivePing(ping, pong time.Duration) Option { +// WithTimeoutOptions is used to set the different timeout options that will be +// used within gbn package. +func WithTimeoutOptions(opts ...TimeoutOptions) Option { return func(conn *config) { - conn.pingTime = ping - conn.pongTime = pong + conn.timeoutOptions = opts } } diff --git a/gbn/queue.go b/gbn/queue.go index 99954ba3..0bbe654c 100644 --- a/gbn/queue.go +++ b/gbn/queue.go @@ -17,8 +17,6 @@ type queueCfg struct { // no way to tell. s uint8 - timeout time.Duration - log btclog.Logger sendPkt func(packet *PacketData) error @@ -29,6 +27,8 @@ type queueCfg struct { type queue struct { cfg *queueCfg + timeoutManager *TimeoutManager + // content is the current content of the queue. This is always a slice // of length s but can contain nil elements if the queue isn't full. content []*PacketData @@ -59,18 +59,19 @@ type queue struct { } // newQueue creates a new queue. -func newQueue(cfg *queueCfg) *queue { +func newQueue(cfg *queueCfg, timeoutManager *TimeoutManager) *queue { if cfg.log == nil { cfg.log = log } q := &queue{ - cfg: cfg, - content: make([]*PacketData, cfg.s), - quit: make(chan struct{}), + cfg: cfg, + content: make([]*PacketData, cfg.s), + quit: make(chan struct{}), + timeoutManager: timeoutManager, } - q.syncer = newSyncer(cfg.s, cfg.log, cfg.timeout, q.quit) + q.syncer = newSyncer(cfg.s, cfg.log, timeoutManager, q.quit) return q } @@ -108,7 +109,7 @@ func (q *queue) addPacket(packet *PacketData) { // two parties to be seen as synced; this may fail in which case the caller is // expected to call resend again. func (q *queue) resend() error { - if time.Since(q.lastResend) < q.cfg.timeout { + if time.Since(q.lastResend) < q.timeoutManager.GetHandshakeTimeout() { q.cfg.log.Tracef("Resent the queue recently.") return nil diff --git a/gbn/queue_test.go b/gbn/queue_test.go index be8e53f4..f70b32d0 100644 --- a/gbn/queue_test.go +++ b/gbn/queue_test.go @@ -10,7 +10,7 @@ import ( ) func TestQueueSize(t *testing.T) { - q := newQueue(&queueCfg{s: 4}) + q := newQueue(&queueCfg{s: 4}, NewTimeOutManager(nil)) require.Equal(t, uint8(0), q.size()) @@ -33,16 +33,18 @@ func TestQueueResend(t *testing.T) { resentPackets := make(map[uint8]struct{}) queueTimeout := time.Second * 1 + tm := NewTimeOutManager(nil) + tm.resendTimeout = queueTimeout + cfg := &queueCfg{ - s: 5, - timeout: queueTimeout, + s: 5, sendPkt: func(packet *PacketData) error { resentPackets[packet.Seq] = struct{}{} return nil }, } - q := newQueue(cfg) + q := newQueue(cfg, tm) pkt1 := &PacketData{Seq: 1} pkt2 := &PacketData{Seq: 2} diff --git a/gbn/syncer.go b/gbn/syncer.go index 2289023a..2fd9dae1 100644 --- a/gbn/syncer.go +++ b/gbn/syncer.go @@ -99,9 +99,9 @@ const ( // When either of the 3 conditions above are met, we will consider both parties // to be in sync. type syncer struct { - s uint8 - log btclog.Logger - timeout time.Duration + s uint8 + log btclog.Logger + timeoutManager *TimeoutManager state syncState @@ -127,20 +127,20 @@ type syncer struct { } // newSyncer creates a new syncer instance. -func newSyncer(s uint8, prefixLogger btclog.Logger, timeout time.Duration, - quit chan struct{}) *syncer { +func newSyncer(s uint8, prefixLogger btclog.Logger, + timeoutManager *TimeoutManager, quit chan struct{}) *syncer { if prefixLogger == nil { prefixLogger = log } return &syncer{ - s: s, - log: prefixLogger, - timeout: timeout, - state: syncStateIdle, - cancel: make(chan struct{}), - quit: quit, + s: s, + log: prefixLogger, + timeoutManager: timeoutManager, + state: syncStateIdle, + cancel: make(chan struct{}), + quit: quit, } } @@ -210,7 +210,9 @@ func (c *syncer) waitForSync() { case <-c.cancel: c.log.Tracef("sync canceled or reset") - case <-time.After(c.timeout * awaitingTimeoutMultiplier): + case <-time.After( + c.timeoutManager.GetResendTimeout() * awaitingTimeoutMultiplier, + ): c.log.Tracef("Timed out while waiting for sync") } @@ -291,7 +293,7 @@ func (c *syncer) proceedAfterTime() { return - case <-time.After(c.timeout): + case <-time.After(c.timeoutManager.GetResendTimeout()): c.mu.Lock() defer c.mu.Unlock() diff --git a/gbn/syncer_test.go b/gbn/syncer_test.go index eaca3ffb..bcc32823 100644 --- a/gbn/syncer_test.go +++ b/gbn/syncer_test.go @@ -19,7 +19,10 @@ func TestSyncer(t *testing.T) { syncTimeout := time.Second * 1 expectedNACK := uint8(3) - syncer := newSyncer(5, nil, syncTimeout, make(chan struct{})) + tm := NewTimeOutManager(nil) + tm.resendTimeout = syncTimeout + + syncer := newSyncer(5, nil, tm, make(chan struct{})) // Let's first test the scenario where we don't receive the expected // ACK/NACK after initiating the resend. This should trigger a timeout diff --git a/gbn/timeout_manager.go b/gbn/timeout_manager.go new file mode 100644 index 00000000..121eeceb --- /dev/null +++ b/gbn/timeout_manager.go @@ -0,0 +1,513 @@ +package gbn + +import ( + "math" + "sync" + "time" + + "github.com/btcsuite/btclog" +) + +const ( + defaultHandshakeTimeout = 1000 * time.Millisecond + defaultResendTimeout = 1000 * time.Millisecond + minimumResendTimeout = 1000 * time.Millisecond + defaultFinSendTimeout = 1000 * time.Millisecond + defaultResendMultiplier = 5 + defaultTimeoutUpdateFrequency = 100 + defaultBoostPercent = 0.5 + DefaultSendTimeout = math.MaxInt64 + DefaultRecvTimeout = math.MaxInt64 +) + +// TimeoutBooster is used to boost a timeout by a given percentage value. +// The timeout will be boosted by the percentage value of the boostPercent any +// time the Boost function is called, and is cumulative. +type TimeoutBooster struct { + // boostPercent defines the percentage value the original timeout will + // be boosted any time the Boost function is called. + boostPercent float32 + + // boostCount defines the number of times the timeout has been boosted. + boostCount int + + // originalTimeout defines the base timeout value that is boosted. + originalTimeout time.Duration + + // withBoostFrequencyLimit is used to indicate whether there is a cap to + // how often the timeout can be boosted, which is the duration of the + // original timeout. + withBoostFrequencyLimit bool + + // lastBoost defines the time when the last boost that had any affect + // was applied. + lastBoost time.Time + + mu sync.Mutex +} + +// NewTimeoutBooster creates a new timeout booster. The originalTimeout defines +// the base timeout value that is boosted. The timeout will be boosted by the +// percentage value of the boostPercent any time the Boost function is called. +// Finally if the withBoostFrequencyLimit is set, then there is a cap to how +// often the timeout can be boosted, which is the duration of the original +// timeout. +func NewTimeoutBooster(originalTimeout time.Duration, boostPercent float32, + withBoostFrequencyLimit bool) *TimeoutBooster { + + return &TimeoutBooster{ + boostPercent: boostPercent, + originalTimeout: originalTimeout, + boostCount: 0, + withBoostFrequencyLimit: withBoostFrequencyLimit, + } +} + +// Boost boosts the timeout by the boost percent. If the withBoostFrequencyLimit +// is set, then the boost will only be applied if the duration of the original +// timeout has passed since the last boost that had any affect was applied. +func (b *TimeoutBooster) Boost() { + b.mu.Lock() + defer b.mu.Unlock() + + if b.withBoostFrequencyLimit { + if time.Since(b.lastBoost) < b.originalTimeout { + return + } + } + + b.lastBoost = time.Now() + b.boostCount++ +} + +// Reset removes the current applied boost, and sets the original timeout to the +// passed timeout. It also restarts the frequency limit timeout if the +// withBoostFrequencyLimit was set to true when initializing the TimeoutBooster. +func (b *TimeoutBooster) Reset(newTimeout time.Duration) { + b.mu.Lock() + defer b.mu.Unlock() + + b.boostCount = 0 + b.originalTimeout = newTimeout + + // We'll also restart the frequency timeout, to ensure that any message + // we immediately resend after resetting the booster won't boost the + // timeout. + if b.withBoostFrequencyLimit { + b.lastBoost = time.Now() + } +} + +// GetCurrentTimeout returns the value of the timeout, with the boost applied. +func (b *TimeoutBooster) GetCurrentTimeout() time.Duration { + b.mu.Lock() + defer b.mu.Unlock() + + increase := time.Duration( + float32(b.originalTimeout) * b.boostPercent * + float32(b.boostCount), + ) + + return b.originalTimeout + increase +} + +// TimeoutManager manages the different timeouts used by the gbn package. +type TimeoutManager struct { + // useStaticTimeout is used to indicate whether the resendTimeout + // has been manually set, and if so, should not be updated dynamically. + useStaticTimeout bool + + // hasSetDynamicTimeout is used to indicate whether the resendTimeout + // has ever been set dynamically. + hasSetDynamicTimeout bool + + // resendTimeout defines the current resend timeout used by the + // timeout manager. + // The resend timeout is the duration that will be waited before + // resending the packets in the current queue. The timeout is set + // dynamically, and is set to the time it took for the other party to + // respond, multiplied by the resendMultiplier. + resendTimeout time.Duration + + // resendMultiplier defines the multiplier used when multiplying the + // duration it took for the other party to respond when setting the + // resendTimeout. + resendMultiplier int + + // latestSentSYNTime is used to keep track of the time when the latest + // SYN message was sent. This is used to dynamically set the resend + // timeout, based on how long it took for the other party to respond to + // the SYN message. + latestSentSYNTime time.Time + + // latestSentSYNTimeMu should be locked when updating or accessing the + // latestSentSYNTime field. + latestSentSYNTimeMu sync.Mutex + + // resendBooster is used to boost the resend timeout when we timeout + // when sending a data packet before receiving a response. The resend + // timeout will remain boosted until it is updated dynamically, as the + // timeout set during the dynamic update most accurately reflects the + // current response time. + resendBooster *TimeoutBooster + + // resendBoostPercent is the percentage value the resend timeout will be + // boosted by, any time the Boost function is called for the + // resendBooster. + resendBoostPercent float32 + + // handshakeBooster is used to boost the handshake timeout if we timeout + // when sending the SYN message before receiving the corresponding + // response. The handshake timeout will remain boosted throughout the + // lifespan of the connection if it's boosted. + // The handshake timeout is the time after which the server or client + // will abort and restart the handshake if the expected response is + // not received from the peer. + handshakeBooster *TimeoutBooster + + // handshakeBoostPercent is the percentage value the handshake timeout + // will be by boosted, any time the Boost function is called for the + // handshakeBooster. + handshakeBoostPercent float32 + + // handshakeTimeout is the time after which the server or client + // will abort and restart the handshake if the expected response is + // not received from the peer. + handshakeTimeout time.Duration + + // finSendTimeout is the timeout after which the created context for + // sending a FIN message will be time out. + finSendTimeout time.Duration + + // sendTimeout defines the max time we will wait to send a msg before + // timing out. + sendTimeout time.Duration + + // recvTimeout defines the max time we will wait to receive a msg before + // timing out. + recvTimeout time.Duration + + // pingTime represents at which frequency we will send pings to the + // counterparty if we've received no packet. + pingTime time.Duration + + // pongTime represents how long we will wait for the expect a pong + // response after we've sent a ping. If no response is received within + // the time limit, we will close the connection. + pongTime time.Duration + + // responseCounter represents the current number of corresponding + // responses received since last updating the resend timeout. + responseCounter int + + // timeoutUpdateFrequency represents the frequency of how many + // corresponding responses we need to receive until the resend timeout + // will be updated. + timeoutUpdateFrequency int + + log btclog.Logger + + sentTimes map[uint8]time.Time + sentTimesMu sync.Mutex + + // mu should be locked when updating or accessing any of timeout + // manager's timeout fields. It should also be held when accessing any + // of the timeout manager's fields that get updated throughout the + // lifecycle of the timeout manager after initialization, that doesn't + // have a dedicated mutex. + // + // Note that the lock order for this mutex is before any of the other + // mutexes in the timeout manager. + mu sync.RWMutex +} + +// NewTimeOutManager creates a new timeout manager. +func NewTimeOutManager(logger btclog.Logger, + timeoutOpts ...TimeoutOptions) *TimeoutManager { + + if logger == nil { + logger = log + } + + m := &TimeoutManager{ + log: logger, + resendTimeout: defaultResendTimeout, + handshakeTimeout: defaultHandshakeTimeout, + resendBoostPercent: defaultBoostPercent, + handshakeBoostPercent: defaultBoostPercent, + useStaticTimeout: false, + resendMultiplier: defaultResendMultiplier, + finSendTimeout: defaultFinSendTimeout, + recvTimeout: DefaultRecvTimeout, + sendTimeout: DefaultSendTimeout, + sentTimes: make(map[uint8]time.Time), + timeoutUpdateFrequency: defaultTimeoutUpdateFrequency, + } + + for _, opt := range timeoutOpts { + opt(m) + } + + // When we are resending packets, it's likely that we'll resend a range + // of packets. As we don't want every packet in that range to boost the + // resend timeout, we'll initialize the resend booster with a ticker, + // which will ensure that only the first resent packet in the range will + // boost the resend timeout. + m.resendBooster = NewTimeoutBooster( + m.resendTimeout, + m.resendBoostPercent, + true, + ) + + m.handshakeBooster = NewTimeoutBooster( + m.handshakeTimeout, + m.handshakeBoostPercent, + false, + ) + + return m +} + +// Sent should be called when a message is sent by the connection. The resent +// parameter should be set to true if the message is a resent message. +func (m *TimeoutManager) Sent(msg Message, resent bool) { + if m.useStaticTimeout { + return + } + + sentAt := time.Now() + + // We will dynamically update the resend timeout throughout the lifetime + // of the connection, to ensure that it reflects the current response + // time. Therefore, we'll keep track of when we sent a package, and when + // we receive the corresponding response. + // If we're resending a message, we can't know if a corresponding + // response is the response to the resent message, or the original + // message. Therefore, we never update the resend timeout after + // resending a message. + switch msg := msg.(type) { + case *PacketSYN: + m.latestSentSYNTimeMu.Lock() + defer m.latestSentSYNTimeMu.Unlock() + + if !resent { + m.latestSentSYNTime = sentAt + + return + } + + // If we've resent the SYN, we'll reset the latestSentSYNTime to + // the zero value, to ensure that we don't update the resend + // timeout based on the corresponding response, as we can't know + // if the response is for the resent SYN or the original SYN. + m.latestSentSYNTime = time.Time{} + + // We'll also temporarily boost the handshake timeout while + // we're resending the SYN message. This might occur multiple + // times until we receive the corresponding response. + m.handshakeBooster.Boost() + + case *PacketData: + m.sentTimesMu.Lock() + defer m.sentTimesMu.Unlock() + + if resent { + // If we're resending a data packet, we'll delete the + // sent time for the sequence, to ensure that we won't + // update the resend timeout when we receive the + // corresponding response. + delete(m.sentTimes, msg.Seq) + + m.resendBooster.Boost() + + return + } + + m.sentTimes[msg.Seq] = sentAt + } +} + +// Received should be called when a message is received by the connection. +func (m *TimeoutManager) Received(msg Message) { + if m.useStaticTimeout { + return + } + + receivedAt := time.Now() + + // We lock the TimeoutManager's mu as soon as Received is executed, to + // ensure that any GetResendTimeout call we receive concurrently after + // this Received call, will return an updated resend timeout if this + // Received call does update the timeout. + m.mu.Lock() + defer m.mu.Unlock() + + switch msg := msg.(type) { + case *PacketSYN, *PacketSYNACK: + m.latestSentSYNTimeMu.Lock() + + if m.latestSentSYNTime.IsZero() { + m.latestSentSYNTimeMu.Unlock() + + return + } + + responseTime := receivedAt.Sub(m.latestSentSYNTime) + + m.latestSentSYNTime = time.Time{} + + m.latestSentSYNTimeMu.Unlock() + + m.updateResendTimeoutUnsafe(responseTime) + + case *PacketACK: + m.sentTimesMu.Lock() + + sentTime, ok := m.sentTimes[msg.Seq] + if !ok { + m.sentTimesMu.Unlock() + + return + } + + delete(m.sentTimes, msg.Seq) + + m.sentTimesMu.Unlock() + + m.responseCounter++ + + reachedFrequency := m.responseCounter% + m.timeoutUpdateFrequency == 0 + + // In case we never set the resend timeout dynamically in the + // handshake due to needing to resend the SYN, or if we've + // reached received the number of packages matching the + // timeoutUpdateFrequency, we'll update the resend timeout. + if !m.hasSetDynamicTimeout || reachedFrequency { + m.responseCounter = 0 + + m.updateResendTimeoutUnsafe(receivedAt.Sub(sentTime)) + } + } +} + +// updateResendTimeout updates the resend timeout based on the given response +// time. The resend timeout will be only be updated if the given response time +// is greater than the default resend timeout, after being multiplied by the +// resendMultiplier. +// +// NOTE: This function TimeoutManager mu must be held when calling this +// function. +func (m *TimeoutManager) updateResendTimeoutUnsafe(responseTime time.Duration) { + m.hasSetDynamicTimeout = true + + multipliedTimeout := time.Duration(m.resendMultiplier) * responseTime + + if multipliedTimeout < minimumResendTimeout { + m.log.Tracef("Setting resendTimeout to minimumResendTimeout "+ + "%v as the new dynamic timeout %v is not greater than "+ + "the minimum resendTimeout.", + m.resendTimeout, multipliedTimeout) + multipliedTimeout = minimumResendTimeout + } + + m.log.Tracef("Updating resendTimeout to %v", multipliedTimeout) + + m.resendTimeout = multipliedTimeout + + // Also update and reset the resend booster, as the new dynamic + // resend timeout most accurately reflects the current response + // time. + m.resendBooster.Reset(multipliedTimeout) +} + +// GetResendTimeout returns the current resend timeout. +func (m *TimeoutManager) GetResendTimeout() time.Duration { + m.mu.RLock() + defer m.mu.RUnlock() + + resendTimeout := m.resendBooster.GetCurrentTimeout() + + m.log.Debugf("Returning resendTimeout %v", resendTimeout) + + return resendTimeout +} + +// GetHandshakeTimeout returns the handshake timeout. +func (m *TimeoutManager) GetHandshakeTimeout() time.Duration { + m.mu.RLock() + defer m.mu.RUnlock() + + handshake := m.handshakeBooster.GetCurrentTimeout() + + m.log.Debugf("Returning handshakeTimeout %v", handshake) + + return handshake +} + +// GetFinSendTimeout returns the fin send timeout. +func (m *TimeoutManager) GetFinSendTimeout() time.Duration { + m.mu.RLock() + defer m.mu.RUnlock() + + return m.finSendTimeout +} + +// GetSendTimeout returns the send timeout. +func (m *TimeoutManager) GetSendTimeout() time.Duration { + m.mu.RLock() + defer m.mu.RUnlock() + + return m.sendTimeout +} + +// GetRecvTimeout returns the recv timeout. +func (m *TimeoutManager) GetRecvTimeout() time.Duration { + m.mu.RLock() + defer m.mu.RUnlock() + + return m.recvTimeout +} + +// GetPingTime returns the ping time, representing at which frequency we will +// send pings to the counterparty if we've received no packet. +func (m *TimeoutManager) GetPingTime() time.Duration { + m.mu.RLock() + defer m.mu.RUnlock() + + if m.pingTime == 0 { + return time.Duration(math.MaxInt64) + } + + return m.pingTime +} + +// GetPongTime returns the pong timeout, representing how long we will wait for +// the expect a pong response after we've sent a ping. If no response is +// received within the time limit, we will close the connection. +func (m *TimeoutManager) GetPongTime() time.Duration { + m.mu.RLock() + defer m.mu.RUnlock() + + if m.pongTime == 0 { + return time.Duration(math.MaxInt64) + } + + return m.pongTime +} + +// SetSendTimeout sets the send timeout. +func (m *TimeoutManager) SetSendTimeout(timeout time.Duration) { + m.mu.Lock() + defer m.mu.Unlock() + + m.sendTimeout = timeout +} + +// SetRecvTimeout sets the receive timeout. +func (m *TimeoutManager) SetRecvTimeout(timeout time.Duration) { + m.mu.Lock() + defer m.mu.Unlock() + + m.recvTimeout = timeout +} diff --git a/gbn/timeout_manager_test.go b/gbn/timeout_manager_test.go new file mode 100644 index 00000000..f200dd9b --- /dev/null +++ b/gbn/timeout_manager_test.go @@ -0,0 +1,393 @@ +package gbn + +import ( + "sync" + "testing" + "time" + + "github.com/lightningnetwork/lnd/lntest/wait" + "github.com/stretchr/testify/require" +) + +// BenchmarkTimeoutMgrSynchronously benchmarks the timeout manager when sending +// and receiving messages synchronously. +func BenchmarkTimeoutMgrSynchronously(b *testing.B) { + // Create a new timeout manager to use for the test. We set the timeout + // update frequency 2, so that the resend timeout is dynamically set + // every other message. + tm := NewTimeOutManager(nil, WithTimeoutUpdateFrequency(2)) + + for n := 0; n < b.N; n++ { + msg := &PacketData{Seq: uint8(n)} + + tm.Sent(msg, false) + tm.Received(msg) + } +} + +// BenchmarkTimeoutMgrConcurrently benchmarks the timeout manager when sending +// and receiving messages concurrently. +func BenchmarkTimeoutMgrConcurrently(b *testing.B) { + // Create a new timeout manager to use for the test. We set the timeout + // update frequency 2, so that the resend timeout is dynamically set + // every other message. + tm := NewTimeOutManager(nil, WithTimeoutUpdateFrequency(2)) + + var wg sync.WaitGroup + for n := 0; n < b.N; n++ { + wg.Add(1) + go func(seq uint8) { + defer wg.Done() + + msg := &PacketData{Seq: seq} + + tm.Sent(msg, false) + tm.Received(msg) + }(uint8(n)) + } + + wg.Wait() +} + +// TestStressTestTimeoutMgr tests that the timeout manager can handle a large +// number of concurrent Sent & Received calls, to ensure that the functions does +// not cause any deadlocks. +func TestStressTestTimeoutMgr(t *testing.T) { + t.Parallel() + + tm := NewTimeOutManager(nil, WithTimeoutUpdateFrequency(2)) + + var wg sync.WaitGroup + for n := 0; n < 100000; n++ { + wg.Add(1) + go func(seq uint8) { + defer wg.Done() + + msg := &PacketData{Seq: seq} + + tm.Sent(msg, false) + tm.Received(msg) + }(uint8(n)) + } + + wg.Wait() +} + +// TestDynamicTimeout ensures that the resend timeout is dynamically set as +// expected in the timeout manager, with the SYN message that's sent with the +// handshake. +func TestSYNDynamicTimeout(t *testing.T) { + t.Parallel() + + // Create a new timeout manager to use for the test. + tm := NewTimeOutManager(nil) + + // First, we'll ensure that the resend timeout doesn't change if we + // don't send and receive messages. + noResendTimeoutChange(t, tm, time.Second) + + // Next, we'll simulate that a SYN message has been sent and received. + // This should change the resend timeout given that the new timeout is + // greater than the minimum allowed timeout. + initialResendTimeout := tm.GetResendTimeout() + + synMsg := &PacketSYN{N: 20} + + sendAndReceive(t, tm, synMsg, synMsg, false) + + // The resend timeout should now have dynamically changed. Since the + // sendAndReceive function waits for one second before simulating the + // response, execution of the function must have more than 1 sec. + // We are then sure that the resend timeout has been dynamically + // set to a value greater default 1 second resend timeout. + resendTimeout := tm.GetResendTimeout() + require.Greater(t, resendTimeout, initialResendTimeout) + + // Let's also test that the resend timeout is dynamically set to the + // expected value, and that the resend multiplier works as expected. If + // we set the resend multiplier to 10, then send and receive a response + // after 1 second, then the resend timeout should be around 10 seconds. + tm.resendMultiplier = 10 + + sendAndReceive(t, tm, synMsg, synMsg, false) + + // As it takes a short amount of time to simulate the send and receive + // of the message, we'll accept a set resend timeout within a range of + // 10-11 seconds as correct. + resendTimeout = tm.GetResendTimeout() + require.InDelta(t, time.Second*10, resendTimeout, float64(time.Second)) + + // We'll also test that the resend timeout isn't dynamically set if + // the new timeout is less than the minimum allowed resend timeout. + tm.resendMultiplier = 1 + + sendAndReceiveWithDuration( + t, tm, minimumResendTimeout/10, synMsg, synMsg, false, + ) + + newTimeout := tm.GetResendTimeout() + require.Equal(t, minimumResendTimeout, newTimeout) + + // Then we'll test that the resend timeout isn't dynamically set if + // when simulating a that the SYN message has been resent, but that the + // handshake timeout is boosted. + tm.handshakeBooster.boostPercent = 0.2 + originalHandshakeTimeout := tm.GetHandshakeTimeout() + + sendAndReceive(t, tm, synMsg, synMsg, true) + + unchangedResendTimeout := tm.GetResendTimeout() + require.Equal(t, newTimeout, unchangedResendTimeout) + + newHandshakeTimeout := tm.GetHandshakeTimeout() + require.Equal( + t, + time.Duration(float32(originalHandshakeTimeout)*1.2), + newHandshakeTimeout, + ) +} + +// TestDataPackageDynamicTimeout ensures that the resend timeout is dynamically +// set as expected in the timeout manager, when PacketData messages and their +// corresponding response are exchanged between the counterparties. +func TestDataPackageDynamicTimeout(t *testing.T) { + t.Parallel() + + // Create a new timeout manager to use for the test. We set the timeout + // update frequency to a high value so that we're sure that it's not the + // reason for the first the resend timeout change. + tm := NewTimeOutManager(nil, WithTimeoutUpdateFrequency(1000)) + + // Next, we'll simulate that a data packet has been sent and received. + // This should change the resend timeout despite the timeout update + // frequency being set to a high value, as we never set the resend + // timeout with in the handshake with by a SYN msg + response. + initialResendTimeout := tm.GetResendTimeout() + + msg := &PacketData{Seq: 20} + response := &PacketACK{Seq: 20} + + sendAndReceive(t, tm, msg, response, false) + + // The resend timeout should now have dynamically changed. + resendTimeout := tm.GetResendTimeout() + require.NotEqual(t, initialResendTimeout, resendTimeout) + + // Now let's test that the timeout update frequency works as expected. + // If we set it to 2, we should only update the resend timeout on the + // second data packet send + receive (as the receive counter in the + // timeout manager was just reset above when setting the resend + // timeout). + tm.timeoutUpdateFrequency = 2 + + // We set resend multiplier to a high value, to ensure that the resend + // timeout is guaranteed to be set to a greater value then the previous + // resend timeout. + tm.resendMultiplier = 10 + + // The first send and receive should not change the resend timeout. + sendAndReceive(t, tm, msg, response, false) + + unchangedResendTimeout := tm.GetResendTimeout() + require.Equal(t, resendTimeout, unchangedResendTimeout) + + // The second send and receive should however change the resend timeout. + sendAndReceive(t, tm, msg, response, false) + + newResendTimeout := tm.GetResendTimeout() + require.NotEqual(t, resendTimeout, newResendTimeout) + + // Finally let's test that the resend timeout isn't dynamically set when + // simulating that the data packet has been resent. The resend timeout + // shouldn't be boosted either, as the resend timeout is only boosted + // if we resend a packet after the duration of the previous resend time. + tm.timeoutUpdateFrequency = 1 + tm.resendMultiplier = 100 + + sendAndReceive(t, tm, msg, response, true) + + unchangedResendTimeout = tm.GetResendTimeout() + require.Equal(t, newResendTimeout, unchangedResendTimeout) +} + +// TestResendBooster tests that the resend timeout booster works as expected, +// and that timeout manager's resendTimeout get's boosted when we need to resend +// a packet again due to not receiving a response within the resend timeout. +func TestResendBooster(t *testing.T) { + t.Parallel() + + tm := NewTimeOutManager(nil) + setResendTimeout := time.Millisecond * 1000 + tm.resendTimeout = setResendTimeout + + initialResendTimeout := tm.GetResendTimeout() + msg := &PacketData{Seq: 20} + response := &PacketACK{Seq: 20} + + // As the resend timeout won't be dynamically set when we are resending + // packets, we'll first test that the resend timeout didn't get + // dynamically updated by a resent data packet. This will however + // boost the resend timeout, so let's initially set the boost percent + // to 0 so we can test that the resend timeout wasn't set. + tm.timeoutUpdateFrequency = 1 + tm.resendMultiplier = 1 + + tm.resendBooster.boostPercent = 0 + + sendAndReceiveWithDuration( + t, tm, time.Millisecond, msg, response, true, + ) + + unchangedResendTimeout := tm.GetResendTimeout() + require.Equal(t, initialResendTimeout, unchangedResendTimeout) + + // Now let's change the boost percent to a non-zero value and test that + // the resend timeout was boosted as expected. + tm.resendBooster.boostPercent = 0.1 + + changedResendTimeout := tm.GetResendTimeout() + + require.Equal( + t, + time.Duration(float32(initialResendTimeout)*1.1), + changedResendTimeout, + ) + + // Now let's resend another packet again, which shouldn't boost the + // resend timeout again, as the duration of the previous resend timeout + // hasn't passed. + sendAndReceiveWithDuration( + t, tm, time.Millisecond, msg, response, true, + ) + + unchangedResendTimeout = tm.GetResendTimeout() + + require.Equal( + t, + time.Duration(float32(initialResendTimeout)*1.1), + unchangedResendTimeout, + ) + + // Now let's wait for the duration of the previous resend timeout and + // then resend another packet. This should boost the resend timeout + // once more, as the duration of the previous resend timeout has passed. + err := wait.Invariant(func() bool { + currentResendTimeout := tm.GetResendTimeout() + + return unchangedResendTimeout == currentResendTimeout + }, setResendTimeout) + require.NoError(t, err) + + sendAndReceiveWithDuration( + t, tm, time.Millisecond, msg, response, true, + ) + + changedResendTimeout = tm.GetResendTimeout() + + require.Equal( + t, + time.Duration(float32(initialResendTimeout)*1.2), + changedResendTimeout, + ) + + // Now let's verify that in case the resend timeout is dynamically set, + // the boost of the resend timeout is reset. Note that we're not + // simulating a resend here, as that will dynamically set the resend + // timeout as the timeout update frequency is set to 1. + sendAndReceiveWithDuration( + t, tm, time.Second, msg, response, false, + ) + + newResendTimeout := tm.GetResendTimeout() + + require.NotEqual(t, changedResendTimeout, newResendTimeout) + require.Equal(t, 0, tm.resendBooster.boostCount) + + // Finally let's check that the resend timeout isn't boosted if we + // simulate a resend before the duration of the newly set resend + // timeout hasn't passed. + sendAndReceiveWithDuration( + t, tm, time.Millisecond, msg, response, true, + ) + + require.Equal(t, 0, tm.resendBooster.boostCount) + + // But if we wait for the duration of the newly set resend timeout and + // then simulate a resend, then the resend timeout should be boosted. + err = wait.Invariant(func() bool { + currentResendTimeout := tm.GetResendTimeout() + + return newResendTimeout == currentResendTimeout + }, newResendTimeout) + require.NoError(t, err) + + sendAndReceiveWithDuration( + t, tm, time.Millisecond, msg, response, true, + ) + + require.Equal(t, 1, tm.resendBooster.boostCount) +} + +// TestStaticTimeout ensures that the resend timeout isn't dynamically set if a +// static timeout has been set. +func TestStaticTimeout(t *testing.T) { + t.Parallel() + + // Create a new timeout manager with a set static resend timeout to use + // for the test. + staticTimeout := time.Second * 2 + tm := NewTimeOutManager(nil, WithStaticResendTimeout(staticTimeout)) + + synMsg := &PacketSYN{N: 20} + + // Then ensure that the resend timeout isn't dynamically set if we send + // and receive messages after setting a static timeout. + sendAndReceive(t, tm, synMsg, synMsg, false) + + resendTimeout := tm.GetResendTimeout() + require.Equal(t, staticTimeout, resendTimeout) +} + +// sendAndReceive simulates that a SYN message has been sent for the passed the +// timeout manager, and then waits for one second before a simulating the SYN +// response. While waiting, the function asserts that the resend timeout hasn't +// changed. +func sendAndReceive(t *testing.T, tm *TimeoutManager, msg Message, + response Message, resent bool) { + + t.Helper() + + sendAndReceiveWithDuration(t, tm, time.Second, msg, response, resent) +} + +// sendAndReceive simulates that a SYN message has been sent for the passed the +// timeout manager, and then waits for specified delay before a simulating the +// SYN response. While waiting, the function asserts that the resend timeout +// hasn't changed. +func sendAndReceiveWithDuration(t *testing.T, tm *TimeoutManager, + responseDelay time.Duration, msg Message, response Message, + resent bool) { + + t.Helper() + + tm.Sent(msg, resent) + + noResendTimeoutChange(t, tm, responseDelay) + + tm.Received(response) +} + +// noResendTimeoutChange asserts that the resend timeout hasn't changed for the +// passed timeout manager for the specified duration. +func noResendTimeoutChange(t *testing.T, tm *TimeoutManager, + duration time.Duration) { + + t.Helper() + + resendTimeout := tm.GetResendTimeout() + + err := wait.Invariant(func() bool { + return resendTimeout == tm.GetResendTimeout() + }, duration) + require.NoError(t, err) +} diff --git a/itest/client_harness.go b/itest/client_harness.go index 004b0847..b8a19eba 100644 --- a/itest/client_harness.go +++ b/itest/client_harness.go @@ -119,6 +119,10 @@ func (c *clientHarness) start() error { } func (c *clientHarness) cleanup() error { - c.cancel() + // We cancel the context after closing the connection, as it's used + // during the connection closing process. We defer the cancel to make + // sure it's always canceled. + defer c.cancel() + return c.grpcConn.Close() } diff --git a/itest/connection_test.go b/itest/connection_test.go index 4ca048d0..bba36578 100644 --- a/itest/connection_test.go +++ b/itest/connection_test.go @@ -15,7 +15,7 @@ var ( defaultMessage = []byte("some default message") ) -const defaultTimeout = 30 * time.Second +const defaultTimeout = 60 * time.Second // testHappyPath ensures that client and server are able to communicate // as expected in the case where no connections are dropped. diff --git a/mailbox/client_conn.go b/mailbox/client_conn.go index 445daa3c..26d8a067 100644 --- a/mailbox/client_conn.go +++ b/mailbox/client_conn.go @@ -46,6 +46,14 @@ const ( // to receive ACKS from the peer before resending the queue. gbnTimeout = 1000 * time.Millisecond + // gbnResendMultiplier is the multiplier that we want the gbn + // connection to use when dynamically setting the resend timeout. + gbnResendMultiplier = 5 + + // gbnTimeoutUpdateFrequency is the frequency representing the number of + // packages + responses we want, before we update the resend timeout. + gbnTimeoutUpdateFrequency = 200 + // gbnN is the queue size, N, that the gbn server will use. The gbn // server will send up to N packets before requiring an ACK for the // first packet in the queue. @@ -74,6 +82,12 @@ const ( // gbnPongTimout is the time after sending the pong message that we will // timeout if we do not receive any message from our peer. gbnPongTimeout = 3 * time.Second + + // gbnBoostPercent is the percentage value that the resend and handshake + // timeout will be boosted any time we need to resend a packet due to + // the corresponding response not being received within the previous + // timeout. + gbnBoostPercent = 0.5 ) // ClientStatus is a description of the connection status of the client. @@ -166,10 +180,16 @@ func NewClientConn(ctx context.Context, sid [64]byte, serverHost string, } c.gbnOptions = []gbn.Option{ - gbn.WithTimeout(gbnTimeout), - gbn.WithHandshakeTimeout(gbnHandshakeTimeout), - gbn.WithKeepalivePing( - gbnClientPingTimeout, gbnPongTimeout, + gbn.WithTimeoutOptions( + gbn.WithResendMultiplier(gbnResendMultiplier), + gbn.WithTimeoutUpdateFrequency( + gbnTimeoutUpdateFrequency, + ), + gbn.WithHandshakeTimeout(gbnHandshakeTimeout), + gbn.WithKeepalivePing( + gbnClientPingTimeout, gbnPongTimeout, + ), + gbn.WithBoostPercent(gbnBoostPercent), ), gbn.WithOnFIN(func() { // We force the connection to set a new status after diff --git a/mailbox/server_conn.go b/mailbox/server_conn.go index ac75055c..75d90c23 100644 --- a/mailbox/server_conn.go +++ b/mailbox/server_conn.go @@ -80,10 +80,16 @@ func NewServerConn(ctx context.Context, serverHost string, cancel: cancel, quit: make(chan struct{}), gbnOptions: []gbn.Option{ - gbn.WithTimeout(gbnTimeout), - gbn.WithHandshakeTimeout(gbnHandshakeTimeout), - gbn.WithKeepalivePing( - gbnServerPingTimeout, gbnPongTimeout, + gbn.WithTimeoutOptions( + gbn.WithResendMultiplier(gbnResendMultiplier), + gbn.WithTimeoutUpdateFrequency( + gbnTimeoutUpdateFrequency, + ), + gbn.WithHandshakeTimeout(gbnHandshakeTimeout), + gbn.WithKeepalivePing( + gbnServerPingTimeout, gbnPongTimeout, + ), + gbn.WithBoostPercent(gbnBoostPercent), ), }, status: ServerStatusNotConnected,