Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

basic_host: close swarm on Close #2916

Merged
merged 2 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 9 additions & 11 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ func (cfg *Config) addTransports() ([]fx.Option, error) {
fxopts = append(fxopts, cfg.QUICReuse...)
} else {
fxopts = append(fxopts,
fx.Provide(func(key quic.StatelessResetKey, tokenGenerator quic.TokenGeneratorKey, _ *swarm.Swarm, lifecycle fx.Lifecycle) (*quicreuse.ConnManager, error) {
fx.Provide(func(key quic.StatelessResetKey, tokenGenerator quic.TokenGeneratorKey, lifecycle fx.Lifecycle) (*quicreuse.ConnManager, error) {
var opts []quicreuse.Option
if !cfg.DisableMetrics {
opts = append(opts, quicreuse.EnableMetrics(cfg.PrometheusRegisterer))
Expand Down Expand Up @@ -469,18 +469,17 @@ func (cfg *Config) NewNode() (host.Host, error) {
fx.Provide(func() event.Bus {
return eventbus.NewBus(eventbus.WithMetricsTracer(eventbus.NewMetricsTracer(eventbus.WithRegisterer(cfg.PrometheusRegisterer))))
}),
fx.Provide(func(eventBus event.Bus, lifecycle fx.Lifecycle) (*swarm.Swarm, error) {
sw, err := cfg.makeSwarm(eventBus, !cfg.DisableMetrics)
if err != nil {
return nil, err
}
lifecycle.Append(fx.StopHook(sw.Close))
return sw, nil
fx.Provide(func() crypto.PrivKey {
return cfg.PeerKey
}),
// Make sure the swarm constructor depends on the quicreuse.ConnManager.
// That way, the ConnManager will be started before the swarm, and more importantly,
// the swarm will be stopped before the ConnManager.
fx.Decorate(func(sw *swarm.Swarm, _ *quicreuse.ConnManager, lifecycle fx.Lifecycle) *swarm.Swarm {
fx.Provide(func(eventBus event.Bus, _ *quicreuse.ConnManager, lifecycle fx.Lifecycle) (*swarm.Swarm, error) {
sw, err := cfg.makeSwarm(eventBus, !cfg.DisableMetrics)
if err != nil {
return nil, err
}
lifecycle.Append(fx.Hook{
OnStart: func(context.Context) error {
// TODO: This method succeeds if listening on one address succeeds. We
Expand All @@ -491,14 +490,13 @@ func (cfg *Config) NewNode() (host.Host, error) {
return sw.Close()
},
})
return sw
return sw, nil
}),
fx.Provide(cfg.newBasicHost),
fx.Provide(func(bh *bhost.BasicHost) host.Host {
return bh
}),
fx.Provide(func(h *swarm.Swarm) peer.ID { return h.LocalPeer() }),
fx.Provide(func(h *swarm.Swarm) crypto.PrivKey { return h.Peerstore().PrivKey(h.LocalPeer()) }),
}
transportOpts, err := cfg.addTransports()
if err != nil {
Expand Down
28 changes: 10 additions & 18 deletions libp2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

"github.com/libp2p/go-libp2p/core/connmgr"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -429,24 +428,15 @@ func TestMain(m *testing.M) {
}

func TestDialCircuitAddrWithWrappedResourceManager(t *testing.T) {
relay, err := New(EnableRelayService())
relay, err := New(EnableRelayService(), ForceReachabilityPublic())
require.NoError(t, err)
defer relay.Close()

// Fake that the relay is publicly reachable
emitterForRelay, err := relay.EventBus().Emitter(&event.EvtLocalReachabilityChanged{})
require.NoError(t, err)
defer emitterForRelay.Close()
emitterForRelay.Emit(event.EvtLocalReachabilityChanged{Reachability: network.ReachabilityPublic})

peerBehindRelay, err := New(EnableAutoRelayWithStaticRelays([]peer.AddrInfo{{ID: relay.ID(), Addrs: relay.Addrs()}}))
peerBehindRelay, err := New(
EnableAutoRelayWithStaticRelays([]peer.AddrInfo{{ID: relay.ID(), Addrs: relay.Addrs()}}),
ForceReachabilityPrivate())
require.NoError(t, err)
defer peerBehindRelay.Close()
// Emit an event to tell this peer it is private
emitterForPeerBehindRelay, err := peerBehindRelay.EventBus().Emitter(&event.EvtLocalReachabilityChanged{})
require.NoError(t, err)
defer emitterForPeerBehindRelay.Close()
emitterForPeerBehindRelay.Emit(event.EvtLocalReachabilityChanged{Reachability: network.ReachabilityPrivate})

// Use a wrapped resource manager to test that the circuit dialing works
// with it. Look at the PR introducing this test for context
Expand All @@ -467,10 +457,12 @@ func TestDialCircuitAddrWithWrappedResourceManager(t *testing.T) {
)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
res := <-ping.Ping(ctx, h, peerBehindRelay.ID())
require.NoError(t, res.Error)
defer cancel()
require.Eventually(t, func() bool {
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
res := <-ping.Ping(ctx, h, peerBehindRelay.ID())
return res.Error == nil
}, 5*time.Second, 50*time.Millisecond)
}

func TestHostAddrsFactoryAddsCerthashes(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions p2p/host/basic/basic_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -1080,6 +1080,10 @@ func (h *BasicHost) Close() error {
_ = h.emitters.evtLocalProtocolsUpdated.Close()
_ = h.emitters.evtLocalAddrsUpdated.Close()

if err := h.network.Close(); err != nil {
log.Errorf("swarm close failed: %v", err)
}

h.psManager.Close()
if h.Peerstore() != nil {
h.Peerstore().Close()
Expand Down
10 changes: 9 additions & 1 deletion p2p/host/basic/basic_host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,15 @@ func TestMultipleClose(t *testing.T) {

require.NoError(t, h.Close())
require.NoError(t, h.Close())
require.NoError(t, h.Close())
h2, err := NewHost(swarmt.GenSwarm(t), nil)
require.NoError(t, err)
defer h2.Close()
require.Error(t, h.Connect(context.Background(), peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}))
h.Network().Peerstore().AddAddrs(h2.ID(), h2.Addrs(), peerstore.PermanentAddrTTL)
_, err = h.NewStream(context.Background(), h2.ID())
require.Error(t, err)
require.Empty(t, h.Addrs())
require.Empty(t, h.AllAddrs())
}

func TestSignedPeerRecordWithNoListenAddrs(t *testing.T) {
Expand Down
Loading