Skip to content

Commit

Permalink
fix: autorelay: treat static relays as just another peer source (libp…
Browse files Browse the repository at this point in the history
…2p#1875)

* Treat static relays as just another peer source

* Actually call the options in WithStaticRelays

* Increase timeout for CI
  • Loading branch information
MarcoPolo authored Nov 17, 2022
1 parent 0b4867c commit 8e90ed8
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 54 deletions.
34 changes: 34 additions & 0 deletions p2p/host/autorelay/autorelay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,3 +456,37 @@ func TestIncorrectInit(t *testing.T) {
}()
_ = newPrivateNode(t)
}

func TestReconnectToStaticRelays(t *testing.T) {
cl := clock.NewMock()
var staticRelays []peer.AddrInfo
const numStaticRelays = 1
relays := make([]host.Host, 0, numStaticRelays)
for i := 0; i < numStaticRelays; i++ {
r := newRelay(t)
t.Cleanup(func() { r.Close() })
relays = append(relays, r)
staticRelays = append(staticRelays, peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()})
}

h := newPrivateNode(t,
autorelay.WithStaticRelays(staticRelays),
autorelay.WithClock(cl),
)

defer h.Close()

cl.Add(time.Minute)
require.Eventually(t, func() bool { return numRelays(h) == 1 }, 10*time.Second, 50*time.Millisecond)

relaysInUse := usedRelays(h)
oldRelay := relaysInUse[0]
for _, r := range relays {
if r.ID() == oldRelay {
r.Network().ClosePeer(h.ID())
}
}

cl.Add(time.Hour)
require.Eventually(t, func() bool { return numRelays(h) == 1 }, 10*time.Second, 100*time.Millisecond)
}
40 changes: 22 additions & 18 deletions p2p/host/autorelay/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ type config struct {
clock clock.Clock
peerSource func(ctx context.Context, num int) <-chan peer.AddrInfo
// minimum interval used to call the peerSource callback
minInterval time.Duration
staticRelays []peer.AddrInfo
minInterval time.Duration
// see WithMinCandidates
minCandidates int
// see WithMaxCandidates
Expand Down Expand Up @@ -44,25 +43,33 @@ var defaultConfig = config{
}

var (
errStaticRelaysMinCandidates = errors.New("cannot use WithMinCandidates and WithStaticRelays")
errStaticRelaysPeerSource = errors.New("cannot use WithPeerSource and WithStaticRelays")
errAlreadyHavePeerSource = errors.New("can only use a single WithPeerSource or WithStaticRelays")
)

type Option func(*config) error

func WithStaticRelays(static []peer.AddrInfo) Option {
return func(c *config) error {
if c.setMinCandidates {
return errStaticRelaysMinCandidates
}
if c.peerSource != nil {
return errStaticRelaysPeerSource
}
if len(c.staticRelays) > 0 {
return errors.New("can't set static relays, static relays already configured")
return errAlreadyHavePeerSource
}
c.minCandidates = len(static)
c.staticRelays = static

WithPeerSource(func(ctx context.Context, numPeers int) <-chan peer.AddrInfo {
if len(static) < numPeers {
numPeers = len(static)
}
c := make(chan peer.AddrInfo, numPeers)
defer close(c)

for i := 0; i < numPeers; i++ {
c <- static[i]
}
return c
}, 30*time.Second)(c)
WithMinCandidates(len(static))(c)
WithMaxCandidates(len(static))(c)
WithNumRelays(len(static))(c)

return nil
}
}
Expand All @@ -80,8 +87,8 @@ func WithStaticRelays(static []peer.AddrInfo) Option {
// If the channel is canceled you MUST close the output channel at some point.
func WithPeerSource(f func(ctx context.Context, numPeers int) <-chan peer.AddrInfo, minInterval time.Duration) Option {
return func(c *config) error {
if len(c.staticRelays) > 0 {
return errStaticRelaysPeerSource
if c.peerSource != nil {
return errAlreadyHavePeerSource
}
c.peerSource = f
c.minInterval = minInterval
Expand Down Expand Up @@ -113,9 +120,6 @@ func WithMaxCandidates(n int) Option {
// This is to make sure that we don't just randomly connect to the first candidate that we discover.
func WithMinCandidates(n int) Option {
return func(c *config) error {
if len(c.staticRelays) > 0 {
return errStaticRelaysMinCandidates
}
if n > c.maxCandidates {
n = c.maxCandidates
}
Expand Down
43 changes: 7 additions & 36 deletions p2p/host/autorelay/relay_finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type relayFinder struct {
}

func newRelayFinder(host *basic.BasicHost, peerSource func(context.Context, int) <-chan peer.AddrInfo, conf *config) *relayFinder {
if peerSource == nil && len(conf.staticRelays) == 0 {
if peerSource == nil {
panic("Can not create a new relayFinder. Need a Peer Source fn or a list of static relays. Refer to the documentation around `libp2p.EnableAutoRelay`")
}

Expand All @@ -103,19 +103,11 @@ func newRelayFinder(host *basic.BasicHost, peerSource func(context.Context, int)
}

func (rf *relayFinder) background(ctx context.Context) {
if rf.usesStaticRelay() {
rf.refCount.Add(1)
go func() {
defer rf.refCount.Done()
rf.handleStaticRelays(ctx)
}()
} else {
rf.refCount.Add(1)
go func() {
defer rf.refCount.Done()
rf.findNodes(ctx)
}()
}
rf.refCount.Add(1)
go func() {
defer rf.refCount.Done()
rf.findNodes(ctx)
}()

rf.refCount.Add(1)
go func() {
Expand Down Expand Up @@ -274,23 +266,6 @@ func (rf *relayFinder) findNodes(ctx context.Context) {
}
}

func (rf *relayFinder) handleStaticRelays(ctx context.Context) {
sem := make(chan struct{}, 4)
var wg sync.WaitGroup
wg.Add(len(rf.conf.staticRelays))
for _, pi := range rf.conf.staticRelays {
sem <- struct{}{}
go func(pi peer.AddrInfo) {
defer wg.Done()
defer func() { <-sem }()
rf.handleNewNode(ctx, pi)
}(pi)
}
wg.Wait()
log.Debug("processed all static relays")
rf.notifyNewCandidate()
}

func (rf *relayFinder) notifyMaybeConnectToRelay() {
select {
case rf.maybeConnectToRelayTrigger <- struct{}{}:
Expand Down Expand Up @@ -450,7 +425,7 @@ func (rf *relayFinder) maybeConnectToRelay(ctx context.Context) {
}

rf.candidateMx.Lock()
if !rf.usesStaticRelay() && len(rf.relays) == 0 && len(rf.candidates) < rf.conf.minCandidates && rf.conf.clock.Since(rf.bootTime) < rf.conf.bootDelay {
if len(rf.relays) == 0 && len(rf.candidates) < rf.conf.minCandidates && rf.conf.clock.Since(rf.bootTime) < rf.conf.bootDelay {
// During the startup phase, we don't want to connect to the first candidate that we find.
// Instead, we wait until we've found at least minCandidates, and then select the best of those.
// However, if that takes too long (longer than bootDelay), we still go ahead.
Expand Down Expand Up @@ -643,10 +618,6 @@ func (rf *relayFinder) relayAddrs(addrs []ma.Multiaddr) []ma.Multiaddr {
return raddrs
}

func (rf *relayFinder) usesStaticRelay() bool {
return len(rf.conf.staticRelays) > 0
}

func (rf *relayFinder) Start() error {
rf.ctxCancelMx.Lock()
defer rf.ctxCancelMx.Unlock()
Expand Down

0 comments on commit 8e90ed8

Please sign in to comment.