From 8e90ed8c2c2e2067c56c1f5cd4fd1cb58ebe7b5b Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Thu, 17 Nov 2022 13:09:36 -0800 Subject: [PATCH] fix: autorelay: treat static relays as just another peer source (#1875) * Treat static relays as just another peer source * Actually call the options in WithStaticRelays * Increase timeout for CI --- p2p/host/autorelay/autorelay_test.go | 34 ++++++++++++++++++++++ p2p/host/autorelay/options.go | 40 ++++++++++++++------------ p2p/host/autorelay/relay_finder.go | 43 +++++----------------------- 3 files changed, 63 insertions(+), 54 deletions(-) diff --git a/p2p/host/autorelay/autorelay_test.go b/p2p/host/autorelay/autorelay_test.go index bbdec41d8f..005710c289 100644 --- a/p2p/host/autorelay/autorelay_test.go +++ b/p2p/host/autorelay/autorelay_test.go @@ -456,3 +456,37 @@ func TestIncorrectInit(t *testing.T) { }() _ = newPrivateNode(t) } + +func TestReconnectToStaticRelays(t *testing.T) { + cl := clock.NewMock() + var staticRelays []peer.AddrInfo + const numStaticRelays = 1 + relays := make([]host.Host, 0, numStaticRelays) + for i := 0; i < numStaticRelays; i++ { + r := newRelay(t) + t.Cleanup(func() { r.Close() }) + relays = append(relays, r) + staticRelays = append(staticRelays, peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()}) + } + + h := newPrivateNode(t, + autorelay.WithStaticRelays(staticRelays), + autorelay.WithClock(cl), + ) + + defer h.Close() + + cl.Add(time.Minute) + require.Eventually(t, func() bool { return numRelays(h) == 1 }, 10*time.Second, 50*time.Millisecond) + + relaysInUse := usedRelays(h) + oldRelay := relaysInUse[0] + for _, r := range relays { + if r.ID() == oldRelay { + r.Network().ClosePeer(h.ID()) + } + } + + cl.Add(time.Hour) + require.Eventually(t, func() bool { return numRelays(h) == 1 }, 10*time.Second, 100*time.Millisecond) +} diff --git a/p2p/host/autorelay/options.go b/p2p/host/autorelay/options.go index 2cfaf0b519..fd13284979 100644 --- a/p2p/host/autorelay/options.go +++ b/p2p/host/autorelay/options.go @@ -14,8 +14,7 @@ type config struct { clock clock.Clock peerSource func(ctx context.Context, num int) <-chan peer.AddrInfo // minimum interval used to call the peerSource callback - minInterval time.Duration - staticRelays []peer.AddrInfo + minInterval time.Duration // see WithMinCandidates minCandidates int // see WithMaxCandidates @@ -44,25 +43,33 @@ var defaultConfig = config{ } var ( - errStaticRelaysMinCandidates = errors.New("cannot use WithMinCandidates and WithStaticRelays") - errStaticRelaysPeerSource = errors.New("cannot use WithPeerSource and WithStaticRelays") + errAlreadyHavePeerSource = errors.New("can only use a single WithPeerSource or WithStaticRelays") ) type Option func(*config) error func WithStaticRelays(static []peer.AddrInfo) Option { return func(c *config) error { - if c.setMinCandidates { - return errStaticRelaysMinCandidates - } if c.peerSource != nil { - return errStaticRelaysPeerSource - } - if len(c.staticRelays) > 0 { - return errors.New("can't set static relays, static relays already configured") + return errAlreadyHavePeerSource } - c.minCandidates = len(static) - c.staticRelays = static + + WithPeerSource(func(ctx context.Context, numPeers int) <-chan peer.AddrInfo { + if len(static) < numPeers { + numPeers = len(static) + } + c := make(chan peer.AddrInfo, numPeers) + defer close(c) + + for i := 0; i < numPeers; i++ { + c <- static[i] + } + return c + }, 30*time.Second)(c) + WithMinCandidates(len(static))(c) + WithMaxCandidates(len(static))(c) + WithNumRelays(len(static))(c) + return nil } } @@ -80,8 +87,8 @@ func WithStaticRelays(static []peer.AddrInfo) Option { // If the channel is canceled you MUST close the output channel at some point. func WithPeerSource(f func(ctx context.Context, numPeers int) <-chan peer.AddrInfo, minInterval time.Duration) Option { return func(c *config) error { - if len(c.staticRelays) > 0 { - return errStaticRelaysPeerSource + if c.peerSource != nil { + return errAlreadyHavePeerSource } c.peerSource = f c.minInterval = minInterval @@ -113,9 +120,6 @@ func WithMaxCandidates(n int) Option { // This is to make sure that we don't just randomly connect to the first candidate that we discover. func WithMinCandidates(n int) Option { return func(c *config) error { - if len(c.staticRelays) > 0 { - return errStaticRelaysMinCandidates - } if n > c.maxCandidates { n = c.maxCandidates } diff --git a/p2p/host/autorelay/relay_finder.go b/p2p/host/autorelay/relay_finder.go index 533c748b42..851d1422e5 100644 --- a/p2p/host/autorelay/relay_finder.go +++ b/p2p/host/autorelay/relay_finder.go @@ -83,7 +83,7 @@ type relayFinder struct { } func newRelayFinder(host *basic.BasicHost, peerSource func(context.Context, int) <-chan peer.AddrInfo, conf *config) *relayFinder { - if peerSource == nil && len(conf.staticRelays) == 0 { + if peerSource == nil { panic("Can not create a new relayFinder. Need a Peer Source fn or a list of static relays. Refer to the documentation around `libp2p.EnableAutoRelay`") } @@ -103,19 +103,11 @@ func newRelayFinder(host *basic.BasicHost, peerSource func(context.Context, int) } func (rf *relayFinder) background(ctx context.Context) { - if rf.usesStaticRelay() { - rf.refCount.Add(1) - go func() { - defer rf.refCount.Done() - rf.handleStaticRelays(ctx) - }() - } else { - rf.refCount.Add(1) - go func() { - defer rf.refCount.Done() - rf.findNodes(ctx) - }() - } + rf.refCount.Add(1) + go func() { + defer rf.refCount.Done() + rf.findNodes(ctx) + }() rf.refCount.Add(1) go func() { @@ -274,23 +266,6 @@ func (rf *relayFinder) findNodes(ctx context.Context) { } } -func (rf *relayFinder) handleStaticRelays(ctx context.Context) { - sem := make(chan struct{}, 4) - var wg sync.WaitGroup - wg.Add(len(rf.conf.staticRelays)) - for _, pi := range rf.conf.staticRelays { - sem <- struct{}{} - go func(pi peer.AddrInfo) { - defer wg.Done() - defer func() { <-sem }() - rf.handleNewNode(ctx, pi) - }(pi) - } - wg.Wait() - log.Debug("processed all static relays") - rf.notifyNewCandidate() -} - func (rf *relayFinder) notifyMaybeConnectToRelay() { select { case rf.maybeConnectToRelayTrigger <- struct{}{}: @@ -450,7 +425,7 @@ func (rf *relayFinder) maybeConnectToRelay(ctx context.Context) { } rf.candidateMx.Lock() - if !rf.usesStaticRelay() && len(rf.relays) == 0 && len(rf.candidates) < rf.conf.minCandidates && rf.conf.clock.Since(rf.bootTime) < rf.conf.bootDelay { + if len(rf.relays) == 0 && len(rf.candidates) < rf.conf.minCandidates && rf.conf.clock.Since(rf.bootTime) < rf.conf.bootDelay { // During the startup phase, we don't want to connect to the first candidate that we find. // Instead, we wait until we've found at least minCandidates, and then select the best of those. // However, if that takes too long (longer than bootDelay), we still go ahead. @@ -643,10 +618,6 @@ func (rf *relayFinder) relayAddrs(addrs []ma.Multiaddr) []ma.Multiaddr { return raddrs } -func (rf *relayFinder) usesStaticRelay() bool { - return len(rf.conf.staticRelays) > 0 -} - func (rf *relayFinder) Start() error { rf.ctxCancelMx.Lock() defer rf.ctxCancelMx.Unlock()