Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove goprocess from Host #865

Merged
merged 3 commits into from
Apr 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ require (
github.com/multiformats/go-multiaddr-dns v0.2.0
github.com/multiformats/go-multiaddr-net v0.1.3
github.com/multiformats/go-multistream v0.1.1
github.com/stretchr/testify v1.4.0
github.com/whyrusleeping/mdns v0.0.0-20190826153040-b9b60ed33aa9
)

Expand Down
71 changes: 39 additions & 32 deletions p2p/host/basic/basic_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"io"
"net"
"sync"
"time"

"github.com/libp2p/go-libp2p/p2p/protocol/identify"
Expand All @@ -21,8 +22,6 @@ import (
inat "github.com/libp2p/go-libp2p-nat"

logging "github.com/ipfs/go-log"
"github.com/jbenet/goprocess"
goprocessctx "github.com/jbenet/goprocess/context"

ma "github.com/multiformats/go-multiaddr"
madns "github.com/multiformats/go-multiaddr-dns"
Expand Down Expand Up @@ -68,6 +67,13 @@ const NATPortMap Option = iota
// * uses an identity service to send + receive node information
// * uses a nat service to establish NAT port mappings
type BasicHost struct {
ctx context.Context
ctxCancel context.CancelFunc
// ensures we shutdown ONLY once
closeSync sync.Once
// keep track of resources we need to wait on before shutting down
refCount sync.WaitGroup

network network.Network
mux *msmux.MultistreamMuxer
ids *identify.IDService
Expand All @@ -81,8 +87,6 @@ type BasicHost struct {

negtimeout time.Duration

proc goprocess.Process

emitters struct {
evtLocalProtocolsUpdated event.Emitter
evtLocalAddrsUpdated event.Emitter
Expand Down Expand Up @@ -128,6 +132,8 @@ type HostOpts struct {

// NewHost constructs a new *BasicHost and activates it by attaching its stream and connection handlers to the given inet.Network.
func NewHost(ctx context.Context, net network.Network, opts *HostOpts) (*BasicHost, error) {
hostCtx, cancel := context.WithCancel(ctx)

h := &BasicHost{
network: net,
mux: msmux.NewMultistreamMuxer(),
Expand All @@ -136,6 +142,8 @@ func NewHost(ctx context.Context, net network.Network, opts *HostOpts) (*BasicHo
maResolver: madns.DefaultResolver,
eventbus: eventbus.NewBus(),
addrChangeChan: make(chan struct{}, 1),
ctx: hostCtx,
ctxCancel: cancel,
}

var err error
Expand All @@ -146,28 +154,12 @@ func NewHost(ctx context.Context, net network.Network, opts *HostOpts) (*BasicHo
return nil, err
}

h.proc = goprocessctx.WithContextAndTeardown(ctx, func() error {
if h.natmgr != nil {
h.natmgr.Close()
}
if h.cmgr != nil {
h.cmgr.Close()
}
_ = h.emitters.evtLocalProtocolsUpdated.Close()
_ = h.emitters.evtLocalAddrsUpdated.Close()
return h.Network().Close()
})

if opts.MultistreamMuxer != nil {
h.mux = opts.MultistreamMuxer
}

// we can't set this as a default above because it depends on the *BasicHost.
h.ids = identify.NewIDService(
goprocessctx.WithProcessClosing(ctx, h.proc),
h,
identify.UserAgent(opts.UserAgent),
)
h.ids = identify.NewIDService(h, identify.UserAgent(opts.UserAgent))

if uint64(opts.NegotiationTimeout) != 0 {
h.negtimeout = opts.NegotiationTimeout
Expand Down Expand Up @@ -242,7 +234,8 @@ func New(net network.Network, opts ...interface{}) *BasicHost {

// Start starts background tasks in the host
func (h *BasicHost) Start() {
h.proc.Go(h.background)
h.refCount.Add(1)
go h.background()
}

// newConnHandler is the remote-opened conn handler for inet.Network
Expand Down Expand Up @@ -343,7 +336,9 @@ func makeUpdatedAddrEvent(prev, current []ma.Multiaddr) *event.EvtLocalAddresses
return &evt
}

func (h *BasicHost) background(p goprocess.Process) {
func (h *BasicHost) background() {
defer h.refCount.Done()

// periodically schedules an IdentifyPush to update our peers for changes
// in our address set (if needed)
ticker := time.NewTicker(10 * time.Second)
Expand All @@ -356,7 +351,7 @@ func (h *BasicHost) background(p goprocess.Process) {
select {
case <-ticker.C:
case <-h.addrChangeChan:
case <-p.Closing():
case <-h.ctx.Done():
return
}

Expand Down Expand Up @@ -805,14 +800,26 @@ func (h *BasicHost) AllAddrs() []ma.Multiaddr {

// Close shuts down the Host's services (network, etc).
func (h *BasicHost) Close() error {
// You're thinking of adding some teardown logic here, right? Well
// don't! Add any process teardown logic to the teardown function in the
// constructor.
//
// This:
// 1. May be called multiple times.
// 2. May _never_ be called if the host is stopped by the context.
return h.proc.Close()
h.closeSync.Do(func() {
h.ctxCancel()
if h.natmgr != nil {
h.natmgr.Close()
}
if h.cmgr != nil {
h.cmgr.Close()
}
if h.ids != nil {
h.ids.Close()
}

_ = h.emitters.evtLocalProtocolsUpdated.Close()
_ = h.emitters.evtLocalAddrsUpdated.Close()
h.Network().Close()

h.refCount.Wait()
})

return nil
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
}

type streamWrapper struct {
Expand Down
11 changes: 11 additions & 0 deletions p2p/host/basic/basic_host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
ma "github.com/multiformats/go-multiaddr"
madns "github.com/multiformats/go-multiaddr-dns"
"github.com/stretchr/testify/require"
)

func TestHostDoubleClose(t *testing.T) {
Expand Down Expand Up @@ -80,6 +81,16 @@ func TestHostSimple(t *testing.T) {
}
}

func TestMultipleClose(t *testing.T) {
ctx := context.Background()
h := New(swarmt.GenSwarm(t, ctx))

require.NoError(t, h.Close())
require.NoError(t, h.Close())
require.NoError(t, h.Close())

}

func TestProtocolHandlerEvents(t *testing.T) {
ctx := context.Background()
h := New(swarmt.GenSwarm(t, ctx))
Expand Down
35 changes: 24 additions & 11 deletions p2p/protocol/identify/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"sync"
"time"

"github.com/libp2p/go-eventbus"
ic "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/helpers"
Expand All @@ -17,6 +16,7 @@ import (
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/protocol"

"github.com/libp2p/go-eventbus"
pb "github.com/libp2p/go-libp2p/p2p/protocol/identify/pb"

ggio "github.com/gogo/protobuf/io"
Expand Down Expand Up @@ -71,7 +71,12 @@ type IDService struct {
Host host.Host
UserAgent string

ctx context.Context
ctx context.Context
ctxCancel context.CancelFunc
// ensure we shutdown ONLY once
closeSync sync.Once
// track resources that need to be shut down before we shut down
refCount sync.WaitGroup

// connections undergoing identification
// for wait purposes
Expand All @@ -94,7 +99,7 @@ type IDService struct {

// NewIDService constructs a new *IDService and activates it by
// attaching its stream handler to the given host.Host.
func NewIDService(ctx context.Context, h host.Host, opts ...Option) *IDService {
func NewIDService(h host.Host, opts ...Option) *IDService {
var cfg config
for _, opt := range opts {
opt(&cfg)
Expand All @@ -105,13 +110,15 @@ func NewIDService(ctx context.Context, h host.Host, opts ...Option) *IDService {
userAgent = cfg.userAgent
}

hostCtx, cancel := context.WithCancel(context.Background())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

being able to have the ID service run within a larger context continues to seem valuable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see why. It's a separate service and exposes a Close function we can call to shut it down. This is similar to the DHT<-> Routing Table relationship and the same idea works fine for us there.

What are your concerns here ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if a consumer is spinning up a bunch of separate services, it's nice to have them all wired up to a central context, and then cancel that context to shut things down, rather than explicitly calling close on each service.

Copy link
Contributor Author

@aarshkshah1992 aarshkshah1992 Apr 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see where you are coming from but we've had problems in the past using contexts for cancellation across service boundaries. Please take a look at libp2p/go-libp2p-kbucket#50 (comment) & the linked discussions in that comment.

The most important benefit to me is being able to control the order in which the services, sub-components etc. are shut down.

s := &IDService{
Host: h,
UserAgent: userAgent,

ctx: ctx,
ctx: hostCtx,
ctxCancel: cancel,
currid: make(map[network.Conn]chan struct{}),
observedAddrs: NewObservedAddrSet(ctx),
observedAddrs: NewObservedAddrSet(hostCtx),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Stebalien

Should we also have a Close for the ObservedAddrSet that we can call in IDService.Close()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some day, yes. But not today.

}

// handle local protocol handler updates, and push deltas to peers.
Expand All @@ -120,6 +127,7 @@ func NewIDService(ctx context.Context, h host.Host, opts ...Option) *IDService {
if err != nil {
log.Warningf("identify service not subscribed to local protocol handlers updates; err: %s", err)
} else {
s.refCount.Add(1)
go s.handleEvents()
}

Expand All @@ -143,14 +151,19 @@ func NewIDService(ctx context.Context, h host.Host, opts ...Option) *IDService {
return s
}

// Close shuts down the IDService
func (ids *IDService) Close() error {
ids.closeSync.Do(func() {
ids.ctxCancel()
ids.refCount.Wait()
})
return nil
}

func (ids *IDService) handleEvents() {
sub := ids.subscription
defer func() {
_ = sub.Close()
// drain the channel.
for range sub.Out() {
}
}()
defer ids.refCount.Done()
defer sub.Close()

for {
select {
Expand Down
31 changes: 22 additions & 9 deletions p2p/protocol/identify/id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@ func subtestIDService(t *testing.T) {
h1p := h1.ID()
h2p := h2.ID()

ids1 := identify.NewIDService(ctx, h1)
ids2 := identify.NewIDService(ctx, h2)
ids1 := identify.NewIDService(h1)
ids2 := identify.NewIDService(h2)
defer ids1.Close()
defer ids2.Close()

testKnowsAddrs(t, h1, h2p, []ma.Multiaddr{}) // nothing
testKnowsAddrs(t, h2, h1p, []ma.Multiaddr{}) // nothing
Expand Down Expand Up @@ -253,9 +255,14 @@ func TestLocalhostAddrFiltering(t *testing.T) {
Addrs: p2addrs[1:],
})

_ = identify.NewIDService(ctx, p1)
ids2 := identify.NewIDService(ctx, p2)
ids3 := identify.NewIDService(ctx, p3)
ids1 := identify.NewIDService(p1)
ids2 := identify.NewIDService(p2)
ids3 := identify.NewIDService(p3)
defer func() {
ids1.Close()
ids2.Close()
ids3.Close()
}()

conns := p2.Network().ConnsToPeer(id1)
if len(conns) == 0 {
Expand Down Expand Up @@ -291,8 +298,12 @@ func TestIdentifyDeltaOnProtocolChange(t *testing.T) {

h2.SetStreamHandler(protocol.TestingID, func(_ network.Stream) {})

ids1 := identify.NewIDService(ctx, h1)
_ = identify.NewIDService(ctx, h2)
ids1 := identify.NewIDService(h1)
ids2 := identify.NewIDService(h2)
defer func() {
ids1.Close()
ids2.Close()
}()

if err := h1.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -404,8 +415,10 @@ func TestIdentifyDeltaWhileIdentifyingConn(t *testing.T) {
defer h2.Close()
defer h1.Close()

_ = identify.NewIDService(ctx, h1)
ids2 := identify.NewIDService(ctx, h2)
ids1 := identify.NewIDService(h1)
ids2 := identify.NewIDService(h2)
defer ids1.Close()
defer ids2.Close()

// replace the original identify handler by one that blocks until we close the block channel.
// this allows us to control how long identify runs.
Expand Down