From 59e22879348a993dbc0db213fa604416b2562098 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Thu, 27 Jun 2019 17:46:58 +0200 Subject: [PATCH 1/4] dht dynamic mode switching. We now listen to local routability events to dynamically switch our DHT mode. Private/unknown routability transition us to client mode; public routability puts us in server mode. --- dht.go | 65 ++++++++++++++++++++++++++++++++++++++++++++++------ dht_test.go | 66 ++++++++++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 121 insertions(+), 10 deletions(-) diff --git a/dht.go b/dht.go index 64ef1e5cb..61dbc15ac 100644 --- a/dht.go +++ b/dht.go @@ -86,6 +86,7 @@ type IpfsDHT struct { subscriptions struct { evtPeerIdentification event.Subscription + evtLocalRoutability event.Subscription } } @@ -119,14 +120,20 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er // remove ourselves from network notifs. dht.host.Network().StopNotify((*subscriberNotifee)(dht)) - if dht.subscriptions.evtPeerIdentification != nil { - _ = dht.subscriptions.evtPeerIdentification.Close() + for _, sub := range []event.Subscription{ + dht.subscriptions.evtPeerIdentification, + dht.subscriptions.evtLocalRoutability, + } { + if sub != nil { + _ = sub.Close() + } } return nil }) dht.proc.AddChild(subnot.Process(ctx)) dht.proc.AddChild(dht.providers.Process()) + dht.proc.AddChild(dht.dynamicModeSwitching(ctx)) dht.Validator = cfg.Validator dht.mode = ModeClient @@ -187,16 +194,33 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p bucketSize: bucketSize, } + dht.setupEventSubscribers() + + dht.ctx = dht.newContextWithLocalTags(ctx) + + return dht +} + +func (dht *IpfsDHT) setupEventSubscribers() { var err error - evts := []interface{}{&event.EvtPeerIdentificationCompleted{}, &event.EvtPeerIdentificationFailed{}} - dht.subscriptions.evtPeerIdentification, err = h.EventBus().Subscribe(evts, eventbus.BufSize(256)) + evts := []interface{}{ + &event.EvtPeerIdentificationCompleted{}, + &event.EvtPeerIdentificationFailed{}, + } + dht.subscriptions.evtPeerIdentification, err = dht.host.EventBus().Subscribe(evts, eventbus.BufSize(256)) if err != nil { logger.Errorf("dht not subscribed to peer identification events; things will fail; err: %s", err) } - dht.ctx = dht.newContextWithLocalTags(ctx) - - return dht + evts = []interface{}{ + &event.EvtLocalRoutabilityPublic{}, + &event.EvtLocalRoutabilityPrivate{}, + &event.EvtLocalRoutabilityUnknown{}, + } + dht.subscriptions.evtLocalRoutability, err = dht.host.EventBus().Subscribe(evts, eventbus.BufSize(256)) + if err != nil { + logger.Errorf("dht not subscribed to local routability events; dynamic mode switching will not work; err: %s", err) + } } // putValueToPeer stores the given key/value pair at the peer 'p' @@ -498,6 +522,33 @@ func (dht *IpfsDHT) SetMode(m DHTMode) error { } } +func (dht *IpfsDHT) dynamicModeSwitching(ctx context.Context) goprocess.Process { + proc := goprocessctx.WithContext(ctx) + watch := func(proc goprocess.Process) { + for { + select { + case ev := <-dht.subscriptions.evtLocalRoutability.Out(): + var err error + switch ev.(type) { + case event.EvtLocalRoutabilityPrivate, event.EvtLocalRoutabilityUnknown: + err = dht.SetMode(ModeClient) + case event.EvtLocalRoutabilityPublic: + err = dht.SetMode(ModeServer) + } + if err == nil { + logger.Infof("processed event %T, switched DHT mode successfully", ev) + } else { + logger.Infof("processed event %T, switching DHT mode failed; err: %s", ev, err) + } + case <-proc.Closing(): + return + } + } + } + proc.Go(watch) + return proc +} + func (dht *IpfsDHT) moveToServerMode() error { dht.mode = ModeServer for _, p := range dht.protocols { diff --git a/dht_test.go b/dht_test.go index 0d9320d05..4826b6b6b 100644 --- a/dht_test.go +++ b/dht_test.go @@ -12,11 +12,12 @@ import ( "testing" "time" + "github.com/libp2p/go-libp2p-core/event" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peerstore" "github.com/libp2p/go-libp2p-core/routing" - multistream "github.com/multiformats/go-multistream" + "github.com/multiformats/go-multistream" "golang.org/x/xerrors" @@ -26,12 +27,12 @@ import ( opts "github.com/libp2p/go-libp2p-kad-dht/opts" pb "github.com/libp2p/go-libp2p-kad-dht/pb" - cid "github.com/ipfs/go-cid" + "github.com/ipfs/go-cid" u "github.com/ipfs/go-ipfs-util" kb "github.com/libp2p/go-libp2p-kbucket" record "github.com/libp2p/go-libp2p-record" swarmt "github.com/libp2p/go-libp2p-swarm/testing" - ci "github.com/libp2p/go-libp2p-testing/ci" + "github.com/libp2p/go-libp2p-testing/ci" travisci "github.com/libp2p/go-libp2p-testing/ci/travis" bhost "github.com/libp2p/go-libp2p/p2p/host/basic" ma "github.com/multiformats/go-multiaddr" @@ -1405,3 +1406,62 @@ func TestModeChange(t *testing.T) { err = clientOnly.Ping(ctx, clientToServer.PeerID()) assert.NotNil(t, err) } + +func TestDynamicModeSwitching(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + prober := setupDHT(ctx, t, true) // our test harness + node := setupDHT(ctx, t, true) // the node under test + prober.Host().Peerstore().AddAddrs(node.PeerID(), node.Host().Addrs(), peerstore.AddressTTL) + if _, err := prober.Host().Network().DialPeer(ctx, node.PeerID()); err != nil { + t.Fatal(err) + } + + var err error + var emitters struct { + evtLocalRoutabilityPrivate event.Emitter + evtLocalRoutabilityPublic event.Emitter + evtLocalRoutabilityUnknown event.Emitter + } + + emitters.evtLocalRoutabilityPublic, err = node.host.EventBus().Emitter(new(event.EvtLocalRoutabilityPublic)) + if err != nil { + t.Fatal(err) + } + emitters.evtLocalRoutabilityPrivate, err = node.host.EventBus().Emitter(new(event.EvtLocalRoutabilityPrivate)) + if err != nil { + t.Fatal(err) + } + emitters.evtLocalRoutabilityUnknown, err = node.host.EventBus().Emitter(new(event.EvtLocalRoutabilityUnknown)) + if err != nil { + t.Fatal(err) + } + + emitters.evtLocalRoutabilityPrivate.Emit(event.EvtLocalRoutabilityPrivate{}) + time.Sleep(500 * time.Millisecond) + + err = prober.Ping(ctx, node.PeerID()) + assert.True(t, xerrors.Is(err, multistream.ErrNotSupported)) + if l := len(prober.RoutingTable().ListPeers()); l != 0 { + t.Errorf("expected routing table length to be 0; instead is %d", l) + } + + emitters.evtLocalRoutabilityPublic.Emit(event.EvtLocalRoutabilityPublic{}) + time.Sleep(500 * time.Millisecond) + + err = prober.Ping(ctx, node.PeerID()) + assert.Nil(t, err) + if l := len(prober.RoutingTable().ListPeers()); l != 1 { + t.Errorf("expected routing table length to be 1; instead is %d", l) + } + + emitters.evtLocalRoutabilityUnknown.Emit(event.EvtLocalRoutabilityUnknown{}) + time.Sleep(500 * time.Millisecond) + + err = prober.Ping(ctx, node.PeerID()) + assert.True(t, xerrors.Is(err, multistream.ErrNotSupported)) + if l := len(prober.RoutingTable().ListPeers()); l != 0 { + t.Errorf("expected routing table length to be 0; instead is %d", l) + } +} From a429183f03ab21c6880fa2238bc8c5a4ca74e20a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Fri, 28 Jun 2019 02:31:28 +0200 Subject: [PATCH 2/4] debounce dynamic dht mode switching to mitigate flapping. --- dht.go | 27 ++++++++++++++++++++++----- dht_test.go | 17 +++++++++++++++++ 2 files changed, 39 insertions(+), 5 deletions(-) diff --git a/dht.go b/dht.go index 61dbc15ac..a4f636b7e 100644 --- a/dht.go +++ b/dht.go @@ -42,6 +42,10 @@ import ( var logger = logging.Logger("dht") +// DynamicModeSwitchDebouncePeriod is the amount of time to wait before making a dynamic dht mode switch effective. +// It helps mitigate flapping from routability events. +var DynamicModeSwitchDebouncePeriod = 1 * time.Minute + // NumBootstrapQueries defines the number of random dht queries to do to // collect members of the routing table. const NumBootstrapQueries = 5 @@ -525,21 +529,34 @@ func (dht *IpfsDHT) SetMode(m DHTMode) error { func (dht *IpfsDHT) dynamicModeSwitching(ctx context.Context) goprocess.Process { proc := goprocessctx.WithContext(ctx) watch := func(proc goprocess.Process) { + var ( + target DHTMode + debounce <-chan time.Time + ) for { select { case ev := <-dht.subscriptions.evtLocalRoutability.Out(): - var err error switch ev.(type) { case event.EvtLocalRoutabilityPrivate, event.EvtLocalRoutabilityUnknown: - err = dht.SetMode(ModeClient) + target = ModeClient case event.EvtLocalRoutabilityPublic: - err = dht.SetMode(ModeServer) + target = ModeServer + } + if debounce == nil { + debounce = time.After(DynamicModeSwitchDebouncePeriod) } + logger.Infof("processed event %T; scheduled dht mode switch", ev) + + case <-debounce: + err := dht.SetMode(target) + // NOTE: the mode will be printed out as a decimal. if err == nil { - logger.Infof("processed event %T, switched DHT mode successfully", ev) + logger.Infof("switched DHT mode successfully; new mode: %d", target) } else { - logger.Infof("processed event %T, switching DHT mode failed; err: %s", ev, err) + logger.Warningf("switching DHT mode failed; new mode: %d, err: %s", target, err) } + debounce, target = nil, 0 + case <-proc.Closing(): return } diff --git a/dht_test.go b/dht_test.go index 4826b6b6b..089024623 100644 --- a/dht_test.go +++ b/dht_test.go @@ -1408,6 +1408,9 @@ func TestModeChange(t *testing.T) { } func TestDynamicModeSwitching(t *testing.T) { + // remove the debounce period. + DynamicModeSwitchDebouncePeriod = 0 + ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -1464,4 +1467,18 @@ func TestDynamicModeSwitching(t *testing.T) { if l := len(prober.RoutingTable().ListPeers()); l != 0 { t.Errorf("expected routing table length to be 0; instead is %d", l) } + + ////////////////////////////////////////////////////////////// + // let's activate the debounce, to check we do not flap. + // receiving a "routability public" event should have no immediate effect now. + DynamicModeSwitchDebouncePeriod = 1 * time.Minute + + emitters.evtLocalRoutabilityPublic.Emit(event.EvtLocalRoutabilityPublic{}) + time.Sleep(500 * time.Millisecond) + + err = prober.Ping(ctx, node.PeerID()) + assert.True(t, xerrors.Is(err, multistream.ErrNotSupported)) + if l := len(prober.RoutingTable().ListPeers()); l != 0 { + t.Errorf("expected routing table length to be 0; instead is %d", l) + } } From 87f6b0c908de352e67034476588d2bc67171aa60 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Fri, 28 Jun 2019 18:24:30 +0200 Subject: [PATCH 3/4] actually debounce now. --- dht.go | 27 ++++++++++++++++++++------- dht_test.go | 49 ++++++++++++++++++++++++++++--------------------- 2 files changed, 48 insertions(+), 28 deletions(-) diff --git a/dht.go b/dht.go index a4f636b7e..990d74023 100644 --- a/dht.go +++ b/dht.go @@ -530,9 +530,23 @@ func (dht *IpfsDHT) dynamicModeSwitching(ctx context.Context) goprocess.Process proc := goprocessctx.WithContext(ctx) watch := func(proc goprocess.Process) { var ( - target DHTMode - debounce <-chan time.Time + debouncer = time.NewTimer(0) + target DHTMode ) + defer debouncer.Stop() + + stopTimer := func() { + if debouncer.Stop() { + return + } + select { + case <-debouncer.C: + default: + } + } + + stopTimer() + for { select { case ev := <-dht.subscriptions.evtLocalRoutability.Out(): @@ -542,12 +556,11 @@ func (dht *IpfsDHT) dynamicModeSwitching(ctx context.Context) goprocess.Process case event.EvtLocalRoutabilityPublic: target = ModeServer } - if debounce == nil { - debounce = time.After(DynamicModeSwitchDebouncePeriod) - } + stopTimer() + debouncer.Reset(DynamicModeSwitchDebouncePeriod) logger.Infof("processed event %T; scheduled dht mode switch", ev) - case <-debounce: + case <-debouncer.C: err := dht.SetMode(target) // NOTE: the mode will be printed out as a decimal. if err == nil { @@ -555,7 +568,7 @@ func (dht *IpfsDHT) dynamicModeSwitching(ctx context.Context) goprocess.Process } else { logger.Warningf("switching DHT mode failed; new mode: %d, err: %s", target, err) } - debounce, target = nil, 0 + target = 0 case <-proc.Closing(): return diff --git a/dht_test.go b/dht_test.go index 089024623..f0ce614ed 100644 --- a/dht_test.go +++ b/dht_test.go @@ -1441,44 +1441,51 @@ func TestDynamicModeSwitching(t *testing.T) { t.Fatal(err) } + assertDHTClient := func() { + err = prober.Ping(ctx, node.PeerID()) + assert.True(t, xerrors.Is(err, multistream.ErrNotSupported)) + if l := len(prober.RoutingTable().ListPeers()); l != 0 { + t.Errorf("expected routing table length to be 0; instead is %d", l) + } + } + + assertDHTServer := func() { + err = prober.Ping(ctx, node.PeerID()) + assert.Nil(t, err) + if l := len(prober.RoutingTable().ListPeers()); l != 1 { + t.Errorf("expected routing table length to be 1; instead is %d", l) + } + } + emitters.evtLocalRoutabilityPrivate.Emit(event.EvtLocalRoutabilityPrivate{}) time.Sleep(500 * time.Millisecond) - err = prober.Ping(ctx, node.PeerID()) - assert.True(t, xerrors.Is(err, multistream.ErrNotSupported)) - if l := len(prober.RoutingTable().ListPeers()); l != 0 { - t.Errorf("expected routing table length to be 0; instead is %d", l) - } + assertDHTClient() emitters.evtLocalRoutabilityPublic.Emit(event.EvtLocalRoutabilityPublic{}) time.Sleep(500 * time.Millisecond) - err = prober.Ping(ctx, node.PeerID()) - assert.Nil(t, err) - if l := len(prober.RoutingTable().ListPeers()); l != 1 { - t.Errorf("expected routing table length to be 1; instead is %d", l) - } + assertDHTServer() emitters.evtLocalRoutabilityUnknown.Emit(event.EvtLocalRoutabilityUnknown{}) time.Sleep(500 * time.Millisecond) - err = prober.Ping(ctx, node.PeerID()) - assert.True(t, xerrors.Is(err, multistream.ErrNotSupported)) - if l := len(prober.RoutingTable().ListPeers()); l != 0 { - t.Errorf("expected routing table length to be 0; instead is %d", l) - } + assertDHTClient() ////////////////////////////////////////////////////////////// // let's activate the debounce, to check we do not flap. // receiving a "routability public" event should have no immediate effect now. - DynamicModeSwitchDebouncePeriod = 1 * time.Minute + DynamicModeSwitchDebouncePeriod = 2 * time.Second emitters.evtLocalRoutabilityPublic.Emit(event.EvtLocalRoutabilityPublic{}) time.Sleep(500 * time.Millisecond) - err = prober.Ping(ctx, node.PeerID()) - assert.True(t, xerrors.Is(err, multistream.ErrNotSupported)) - if l := len(prober.RoutingTable().ListPeers()); l != 0 { - t.Errorf("expected routing table length to be 0; instead is %d", l) - } + // the debounce has prevented us from switching modes too soon. + assertDHTClient() + + // wait so that the debounce fires. + time.Sleep(1750 * time.Millisecond) + + // after the debounce period elapses, we will have switched modes. + assertDHTServer() } From 68686f3d38a07f8d80468e4febb1cd6ab10dd55b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Fri, 28 Jun 2019 18:57:57 +0200 Subject: [PATCH 4/4] cleanup goprocess usage. --- dht.go | 141 ++++++++++++++++-------------------------- subscriber_notifee.go | 27 ++++---- 2 files changed, 70 insertions(+), 98 deletions(-) diff --git a/dht.go b/dht.go index 41371343b..d3ee3a8c4 100644 --- a/dht.go +++ b/dht.go @@ -87,11 +87,6 @@ type IpfsDHT struct { modeLk sync.Mutex bucketSize int - - subscriptions struct { - evtPeerIdentification event.Subscription - evtLocalRoutability event.Subscription - } } // Assert that IPFS assumptions about interfaces aren't broken. These aren't a @@ -122,26 +117,13 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er } } - // set up event subscribers. - dht.setupEventSubscribers() - - dht.proc = goprocessctx.WithContextAndTeardown(ctx, func() error { - for _, sub := range []event.Subscription{ - dht.subscriptions.evtPeerIdentification, - dht.subscriptions.evtLocalRoutability, - } { - if sub != nil { - _ = sub.Close() - } - } - return nil - }) + dht.proc = goprocessctx.WithContext(ctx) // register for network notifs. - dht.proc.AddChild((*subscriberNotifee)(dht).Process()) + dht.proc.Go((*subscriberNotifee)(dht).subscribe) // switch modes dynamically based on local routability changes. - dht.proc.AddChild(dht.dynamicModeSwitching(ctx)) + dht.proc.Go(dht.dynamicModeSwitching) // handle protocol changes dht.proc.Go(dht.handleProtocolChanges) @@ -205,28 +187,6 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p return dht } -func (dht *IpfsDHT) setupEventSubscribers() { - var err error - evts := []interface{}{ - &event.EvtPeerIdentificationCompleted{}, - &event.EvtPeerIdentificationFailed{}, - } - dht.subscriptions.evtPeerIdentification, err = dht.host.EventBus().Subscribe(evts, eventbus.BufSize(256)) - if err != nil { - logger.Errorf("dht not subscribed to peer identification events; things will fail; err: %s", err) - } - - evts = []interface{}{ - &event.EvtLocalRoutabilityPublic{}, - &event.EvtLocalRoutabilityPrivate{}, - &event.EvtLocalRoutabilityUnknown{}, - } - dht.subscriptions.evtLocalRoutability, err = dht.host.EventBus().Subscribe(evts, eventbus.BufSize(256)) - if err != nil { - logger.Errorf("dht not subscribed to local routability events; dynamic mode switching will not work; err: %s", err) - } -} - // putValueToPeer stores the given key/value pair at the peer 'p' func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID, rec *recpb.Record) error { pmes := pb.NewMessage(pb.Message_PUT_VALUE, rec.Key, 0) @@ -526,57 +486,64 @@ func (dht *IpfsDHT) SetMode(m DHTMode) error { } } -func (dht *IpfsDHT) dynamicModeSwitching(ctx context.Context) goprocess.Process { - proc := goprocessctx.WithContext(ctx) - watch := func(proc goprocess.Process) { - var ( - debouncer = time.NewTimer(0) - target DHTMode - ) - defer debouncer.Stop() +func (dht *IpfsDHT) dynamicModeSwitching(proc goprocess.Process) { + evts := []interface{}{ + &event.EvtLocalRoutabilityPublic{}, + &event.EvtLocalRoutabilityPrivate{}, + &event.EvtLocalRoutabilityUnknown{}, + } - stopTimer := func() { - if debouncer.Stop() { - return - } - select { - case <-debouncer.C: - default: - } - } + sub, err := dht.host.EventBus().Subscribe(evts, eventbus.BufSize(256)) + if err != nil { + logger.Errorf("dht not subscribed to local routability events; dynamic mode switching will not work; err: %s", err) + } + defer sub.Close() - stopTimer() + var ( + debouncer = time.NewTimer(0) + target DHTMode + ) + defer debouncer.Stop() - for { - select { - case ev := <-dht.subscriptions.evtLocalRoutability.Out(): - switch ev.(type) { - case event.EvtLocalRoutabilityPrivate, event.EvtLocalRoutabilityUnknown: - target = ModeClient - case event.EvtLocalRoutabilityPublic: - target = ModeServer - } - stopTimer() - debouncer.Reset(DynamicModeSwitchDebouncePeriod) - logger.Infof("processed event %T; scheduled dht mode switch", ev) - - case <-debouncer.C: - err := dht.SetMode(target) - // NOTE: the mode will be printed out as a decimal. - if err == nil { - logger.Infof("switched DHT mode successfully; new mode: %d", target) - } else { - logger.Warningf("switching DHT mode failed; new mode: %d, err: %s", target, err) - } - target = 0 + stopTimer := func() { + if debouncer.Stop() { + return + } + select { + case <-debouncer.C: + default: + } + } - case <-proc.Closing(): - return + stopTimer() + + for { + select { + case ev := <-sub.Out(): + switch ev.(type) { + case event.EvtLocalRoutabilityPrivate, event.EvtLocalRoutabilityUnknown: + target = ModeClient + case event.EvtLocalRoutabilityPublic: + target = ModeServer + } + stopTimer() + debouncer.Reset(DynamicModeSwitchDebouncePeriod) + logger.Infof("processed event %T; scheduled dht mode switch", ev) + + case <-debouncer.C: + err := dht.SetMode(target) + // NOTE: the mode will be printed out as a decimal. + if err == nil { + logger.Infof("switched DHT mode successfully; new mode: %d", target) + } else { + logger.Warningf("switching DHT mode failed; new mode: %d, err: %s", target, err) } + target = 0 + + case <-proc.Closing(): + return } } - proc.Go(watch) - return proc } func (dht *IpfsDHT) moveToServerMode() error { diff --git a/subscriber_notifee.go b/subscriber_notifee.go index 9e83cf04a..f54b2ee7a 100644 --- a/subscriber_notifee.go +++ b/subscriber_notifee.go @@ -2,6 +2,7 @@ package dht import ( "github.com/jbenet/goprocess" + "github.com/libp2p/go-eventbus" "github.com/libp2p/go-libp2p-core/event" "github.com/libp2p/go-libp2p-core/network" ma "github.com/multiformats/go-multiaddr" @@ -16,23 +17,27 @@ func (nn *subscriberNotifee) DHT() *IpfsDHT { return (*IpfsDHT)(nn) } -func (nn *subscriberNotifee) Process() goprocess.Process { +func (nn *subscriberNotifee) subscribe(proc goprocess.Process) { dht := nn.DHT() - proc := goprocess.Go(nn.subscribe) dht.host.Network().Notify(nn) - proc.SetTeardown(func() error { - dht.host.Network().StopNotify(nn) - return nil - }) - return proc -} + defer dht.host.Network().StopNotify(nn) + + var err error + evts := []interface{}{ + &event.EvtPeerIdentificationCompleted{}, + &event.EvtPeerIdentificationFailed{}, + } + + sub, err := dht.host.EventBus().Subscribe(evts, eventbus.BufSize(256)) + if err != nil { + logger.Errorf("dht not subscribed to peer identification events; things will fail; err: %s", err) + } + defer sub.Close() -func (nn *subscriberNotifee) subscribe(proc goprocess.Process) { - dht := nn.DHT() for { select { - case evt, more := <-dht.subscriptions.evtPeerIdentification.Out(): + case evt, more := <-sub.Out(): if !more { return }