Skip to content
This repository has been archived by the owner on May 11, 2022. It is now read-only.

Commit

Permalink
Merge pull request #109 from libp2p/clean-shutdown
Browse files Browse the repository at this point in the history
remove context from constructor, implement a proper Close method
  • Loading branch information
marten-seemann authored Aug 30, 2021
2 parents 3723bd5 + 7dfcab4 commit 1247ac6
Show file tree
Hide file tree
Showing 9 changed files with 63 additions and 1,097 deletions.
41 changes: 31 additions & 10 deletions autonat.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@ var log = logging.Logger("autonat")

// AmbientAutoNAT is the implementation of ambient NAT autodiscovery
type AmbientAutoNAT struct {
ctx context.Context
host host.Host

*config

ctx context.Context
ctxCancel context.CancelFunc // is closed when Close is called
backgroundRunning chan struct{} // is closed when the background go routine exits

inboundConn chan network.Conn
observations chan autoNATResult
// status is an autoNATResult reflecting current status.
Expand All @@ -50,7 +53,6 @@ type AmbientAutoNAT struct {

// StaticAutoNAT is a simple AutoNAT implementation when a single NAT status is desired.
type StaticAutoNAT struct {
ctx context.Context
host host.Host
reachability network.Reachability
service *autoNATService
Expand All @@ -62,7 +64,7 @@ type autoNATResult struct {
}

// New creates a new NAT autodiscovery system attached to a host
func New(ctx context.Context, h host.Host, options ...Option) (AutoNAT, error) {
func New(h host.Host, options ...Option) (AutoNAT, error) {
var err error
conf := new(config)
conf.host = h
Expand All @@ -84,7 +86,7 @@ func New(ctx context.Context, h host.Host, options ...Option) (AutoNAT, error) {

var service *autoNATService
if (!conf.forceReachability || conf.reachability == network.ReachabilityPublic) && conf.dialer != nil {
service, err = newAutoNATService(ctx, conf)
service, err = newAutoNATService(conf)
if err != nil {
return nil, err
}
Expand All @@ -95,19 +97,21 @@ func New(ctx context.Context, h host.Host, options ...Option) (AutoNAT, error) {
emitReachabilityChanged.Emit(event.EvtLocalReachabilityChanged{Reachability: conf.reachability})

return &StaticAutoNAT{
ctx: ctx,
host: h,
reachability: conf.reachability,
service: service,
}, nil
}

ctx, cancel := context.WithCancel(context.Background())
as := &AmbientAutoNAT{
ctx: ctx,
host: h,
config: conf,
inboundConn: make(chan network.Conn, 5),
observations: make(chan autoNATResult, 1),
ctx: ctx,
ctxCancel: cancel,
backgroundRunning: make(chan struct{}),
host: h,
config: conf,
inboundConn: make(chan network.Conn, 5),
observations: make(chan autoNATResult, 1),

emitReachabilityChanged: emitReachabilityChanged,
service: service,
Expand Down Expand Up @@ -159,6 +163,7 @@ func ipInList(candidate ma.Multiaddr, list []ma.Multiaddr) bool {
}

func (as *AmbientAutoNAT) background() {
defer close(as.backgroundRunning)
// wait a bit for the node to come online and establish some connections
// before starting autodetection
delay := as.config.bootDelay
Expand Down Expand Up @@ -426,6 +431,15 @@ func (as *AmbientAutoNAT) getPeerToProbe() peer.ID {
return candidates[0]
}

func (as *AmbientAutoNAT) Close() error {
as.ctxCancel()
if as.service != nil {
as.service.Disable()
}
<-as.backgroundRunning
return nil
}

func shufflePeers(peers []peer.ID) {
for i := range peers {
j := rand.Intn(i + 1)
Expand All @@ -445,3 +459,10 @@ func (s *StaticAutoNAT) PublicAddr() (ma.Multiaddr, error) {
}
return nil, errors.New("no available address")
}

func (s *StaticAutoNAT) Close() error {
if s.service != nil {
s.service.Disable()
}
return nil
}
4 changes: 2 additions & 2 deletions autonat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func makeAutoNAT(ctx context.Context, t *testing.T, ash host.Host) (host.Host, A
h := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
h.Peerstore().AddAddrs(ash.ID(), ash.Addrs(), time.Minute)
h.Peerstore().AddProtocols(ash.ID(), AutoNATProto)
a, _ := New(ctx, h, WithSchedule(100*time.Millisecond, time.Second), WithoutStartupDelay())
a, _ := New(h, WithSchedule(100*time.Millisecond, time.Second), WithoutStartupDelay())
a.(*AmbientAutoNAT).config.dialPolicy.allowSelfDials = true
a.(*AmbientAutoNAT).config.throttlePeerPeriod = 100 * time.Millisecond
return h, a
Expand Down Expand Up @@ -279,7 +279,7 @@ func TestStaticNat(t *testing.T) {
h := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
s, _ := h.EventBus().Subscribe(&event.EvtLocalReachabilityChanged{})

nat, err := New(ctx, h, WithReachability(network.ReachabilityPrivate))
nat, err := New(h, WithReachability(network.ReachabilityPrivate))
if err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 2 additions & 0 deletions interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package autonat

import (
"context"
"io"

"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
Expand All @@ -16,6 +17,7 @@ type AutoNAT interface {
// PublicAddr returns the public dial address when NAT status is public and an
// error otherwise
PublicAddr() (ma.Multiaddr, error)
io.Closer
}

// Client is a stateless client interface to AutoNAT peers
Expand Down
27 changes: 13 additions & 14 deletions svc.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ var streamTimeout = 60 * time.Second

// AutoNATService provides NAT autodetection services to other peers
type autoNATService struct {
ctx context.Context
instance context.CancelFunc
instanceLock sync.Mutex
instanceLock sync.Mutex
instance context.CancelFunc
backgroundRunning chan struct{} // closed when background exits

config *config

Expand All @@ -33,18 +33,14 @@ type autoNATService struct {
}

// NewAutoNATService creates a new AutoNATService instance attached to a host
func newAutoNATService(ctx context.Context, c *config) (*autoNATService, error) {
func newAutoNATService(c *config) (*autoNATService, error) {
if c.dialer == nil {
return nil, errors.New("cannot create NAT service without a network")
}

as := &autoNATService{
ctx: ctx,
return &autoNATService{
config: c,
reqs: make(map[peer.ID]int),
}

return as, nil
}, nil
}

func (as *autoNATService) handleStream(s network.Stream) {
Expand Down Expand Up @@ -190,7 +186,7 @@ func (as *autoNATService) doDial(pi peer.AddrInfo) *pb.Message_DialResponse {
as.globalReqs++
as.mx.Unlock()

ctx, cancel := context.WithTimeout(as.ctx, as.config.dialTimeout)
ctx, cancel := context.WithTimeout(context.Background(), as.config.dialTimeout)
defer cancel()

as.config.dialer.Peerstore().ClearAddrs(pi.ID)
Expand All @@ -217,10 +213,11 @@ func (as *autoNATService) Enable() {
if as.instance != nil {
return
}
inst, cncl := context.WithCancel(as.ctx)
as.instance = cncl
ctx, cancel := context.WithCancel(context.Background())
as.instance = cancel
as.backgroundRunning = make(chan struct{})

go as.background(inst)
go as.background(ctx)
}

// Disable the autoNAT service if it is running.
Expand All @@ -230,10 +227,12 @@ func (as *autoNATService) Disable() {
if as.instance != nil {
as.instance()
as.instance = nil
<-as.backgroundRunning
}
}

func (as *autoNATService) background(ctx context.Context) {
defer close(as.backgroundRunning)
as.config.host.SetStreamHandler(AutoNATProto, as.handleStream)

timer := time.NewTimer(as.config.throttleResetPeriod)
Expand Down
16 changes: 8 additions & 8 deletions svc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ func makeAutoNATConfig(ctx context.Context, t *testing.T) *config {
return &c
}

func makeAutoNATService(ctx context.Context, t *testing.T, c *config) *autoNATService {
as, err := newAutoNATService(ctx, c)
func makeAutoNATService(t *testing.T, c *config) *autoNATService {
as, err := newAutoNATService(c)
if err != nil {
t.Fatal(err)
}
Expand All @@ -48,7 +48,7 @@ func TestAutoNATServiceDialError(t *testing.T) {
c := makeAutoNATConfig(ctx, t)
c.dialTimeout = 1 * time.Second
c.dialPolicy.allowSelfDials = false
_ = makeAutoNATService(ctx, t, c)
_ = makeAutoNATService(t, c)
hc, ac := makeAutoNATClient(ctx, t)
connect(t, c.host, hc)

Expand All @@ -67,7 +67,7 @@ func TestAutoNATServiceDialSuccess(t *testing.T) {
defer cancel()

c := makeAutoNATConfig(ctx, t)
_ = makeAutoNATService(ctx, t, c)
_ = makeAutoNATService(t, c)

hc, ac := makeAutoNATClient(ctx, t)
connect(t, c.host, hc)
Expand All @@ -87,7 +87,7 @@ func TestAutoNATServiceDialRateLimiter(t *testing.T) {
c.throttleResetPeriod = time.Second
c.throttleResetJitter = 0
c.throttlePeerMax = 1
_ = makeAutoNATService(ctx, t, c)
_ = makeAutoNATService(t, c)

hc, ac := makeAutoNATClient(ctx, t)
connect(t, c.host, hc)
Expand Down Expand Up @@ -124,7 +124,7 @@ func TestAutoNATServiceGlobalLimiter(t *testing.T) {
c.throttleResetJitter = 0
c.throttlePeerMax = 1
c.throttleGlobalMax = 5
_ = makeAutoNATService(ctx, t, c)
_ = makeAutoNATService(t, c)

hs := c.host

Expand Down Expand Up @@ -157,7 +157,7 @@ func TestAutoNATServiceRateLimitJitter(t *testing.T) {
c.throttleResetPeriod = 100 * time.Millisecond
c.throttleResetJitter = 100 * time.Millisecond
c.throttleGlobalMax = 1
svc := makeAutoNATService(ctx, t, c)
svc := makeAutoNATService(t, c)
svc.mx.Lock()
svc.globalReqs = 1
svc.mx.Unlock()
Expand All @@ -178,7 +178,7 @@ func TestAutoNATServiceStartup(t *testing.T) {

h := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
dh := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
an, err := New(ctx, h, EnableService(dh.Network()))
an, err := New(h, EnableService(dh.Network()))
an.(*AmbientAutoNAT).config.dialPolicy.allowSelfDials = true
if err != nil {
t.Fatal(err)
Expand Down
5 changes: 4 additions & 1 deletion test/autonat_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
//go:build ignore
// +build ignore

// This separate testing package helps to resolve a circular dependency potentially
// being created between libp2p and libp2p-autonat
package autonat_test
Expand Down Expand Up @@ -31,7 +34,7 @@ func TestAutonatRoundtrip(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if _, err := autonat.New(ctx, service, autonat.EnableService(dialback.Network())); err != nil {
if _, err := autonat.New(service, autonat.EnableService(dialback.Network())); err != nil {
t.Fatal(err)
}

Expand Down
3 changes: 3 additions & 0 deletions test/dummy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package autonat_test

// needed so that go test ./... doesn't error
6 changes: 0 additions & 6 deletions test/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,4 @@ module github.com/libp2p/go-libp2p-autonat/test

go 1.16

require (
github.com/libp2p/go-libp2p v0.14.4
github.com/libp2p/go-libp2p-autonat v0.4.2
github.com/libp2p/go-libp2p-core v0.8.6
)

replace github.com/libp2p/go-libp2p-autonat => ../
Loading

0 comments on commit 1247ac6

Please sign in to comment.