diff --git a/v2/coord/coordinator.go b/v2/coord/coordinator.go index 9641c165..9c1e8a70 100644 --- a/v2/coord/coordinator.go +++ b/v2/coord/coordinator.go @@ -308,6 +308,8 @@ func (c *Coordinator) dispatchEvent(ctx context.Context, ev BehaviourEvent) { // GetNode retrieves the node associated with the given node id from the DHT's local routing table. // If the node isn't found in the table, it returns ErrNodeNotFound. func (c *Coordinator) GetNode(ctx context.Context, id peer.ID) (Node, error) { + ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.GetNode") + defer span.End() if _, exists := c.rt.GetNode(kadt.PeerID(id).Key()); !exists { return nil, ErrNodeNotFound } @@ -321,6 +323,8 @@ func (c *Coordinator) GetNode(ctx context.Context, id peer.ID) (Node, error) { // GetClosestNodes requests the n closest nodes to the key from the node's local routing table. func (c *Coordinator) GetClosestNodes(ctx context.Context, k kadt.Key, n int) ([]Node, error) { + ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.GetClosestNodes") + defer span.End() closest := c.rt.NearestNodes(k, n) nodes := make([]Node, 0, len(closest)) for _, id := range closest { @@ -462,3 +466,32 @@ func (c *Coordinator) Bootstrap(ctx context.Context, seeds []peer.ID) error { return nil } + +// NotifyConnectivity notifies the coordinator that a peer has passed a connectivity check +// which means it is connected and supports finding closer nodes +func (c *Coordinator) NotifyConnectivity(ctx context.Context, id peer.ID) error { + ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.NotifyConnectivity") + defer span.End() + + ai := peer.AddrInfo{ + ID: id, + } + c.routingBehaviour.Notify(ctx, &EventNotifyConnectivity{ + NodeInfo: ai, + }) + + return nil +} + +// NotifyNonConnectivity notifies the coordinator that a peer has failed a connectivity check +// which means it is not connected and/or it doesn't support finding closer nodes +func (c *Coordinator) NotifyNonConnectivity(ctx context.Context, id peer.ID) error { + ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.NotifyNonConnectivity") + defer span.End() + + c.routingBehaviour.Notify(ctx, &EventNotifyNonConnectivity{ + NodeID: id, + }) + + return nil +} diff --git a/v2/coord/event.go b/v2/coord/event.go index 2355e259..69a9d5d7 100644 --- a/v2/coord/event.go +++ b/v2/coord/event.go @@ -86,6 +86,8 @@ type EventStopQuery struct { func (*EventStopQuery) behaviourEvent() {} func (*EventStopQuery) queryCommand() {} +// EventAddAddrInfo notifies the routing behaviour of a potential new peer or of additional addresses for +// an existing peer. type EventAddAddrInfo struct { NodeInfo peer.AddrInfo } @@ -93,9 +95,11 @@ type EventAddAddrInfo struct { func (*EventAddAddrInfo) behaviourEvent() {} func (*EventAddAddrInfo) routingCommand() {} +// EventGetCloserNodesSuccess notifies a behaviour that a GetCloserNodes request, initiated by an +// [EventOutboundGetCloserNodes] event has produced a successful response. type EventGetCloserNodesSuccess struct { QueryID query.QueryID - To peer.AddrInfo + To peer.AddrInfo // To is the peer address that the GetCloserNodes request was sent to. Target kadt.Key CloserNodes []peer.AddrInfo } @@ -103,9 +107,11 @@ type EventGetCloserNodesSuccess struct { func (*EventGetCloserNodesSuccess) behaviourEvent() {} func (*EventGetCloserNodesSuccess) nodeHandlerResponse() {} +// EventGetCloserNodesFailure notifies a behaviour that a GetCloserNodes request, initiated by an +// [EventOutboundGetCloserNodes] event has failed to produce a valid response. type EventGetCloserNodesFailure struct { QueryID query.QueryID - To peer.AddrInfo + To peer.AddrInfo // To is the peer address that the GetCloserNodes request was sent to. Target kadt.Key Err error } @@ -141,6 +147,14 @@ type EventRoutingUpdated struct { func (*EventRoutingUpdated) behaviourEvent() {} func (*EventRoutingUpdated) routingNotification() {} +// EventRoutingRemoved is emitted by the coordinator when new node has been removed from the routing table. +type EventRoutingRemoved struct { + NodeID peer.ID +} + +func (*EventRoutingRemoved) behaviourEvent() {} +func (*EventRoutingRemoved) routingNotification() {} + // EventBootstrapFinished is emitted by the coordinator when a bootstrap has finished, either through // running to completion or by being canceled. type EventBootstrapFinished struct { @@ -149,3 +163,23 @@ type EventBootstrapFinished struct { func (*EventBootstrapFinished) behaviourEvent() {} func (*EventBootstrapFinished) routingNotification() {} + +// EventNotifyConnectivity notifies a behaviour that a peer's connectivity and support for finding closer nodes +// has been confirmed such as from a successful query response or an inbound query. This should not be used for +// general connections to the host but only when it is confirmed that the peer responds to requests for closer +// nodes. +type EventNotifyConnectivity struct { + NodeInfo peer.AddrInfo +} + +func (*EventNotifyConnectivity) behaviourEvent() {} +func (*EventNotifyConnectivity) routingNotification() {} + +// EventNotifyNonConnectivity notifies a behaviour that a peer does not have connectivity and/or does not support +// finding closer nodes is known. +type EventNotifyNonConnectivity struct { + NodeID peer.ID +} + +func (*EventNotifyNonConnectivity) behaviourEvent() {} +func (*EventNotifyNonConnectivity) routingCommand() {} diff --git a/v2/coord/query.go b/v2/coord/query.go index 8bdfbd53..6857fc6d 100644 --- a/v2/coord/query.go +++ b/v2/coord/query.go @@ -81,6 +81,11 @@ func (p *PooledQueryBehaviour) Notify(ctx context.Context, ev BehaviourEvent) { CloserNodes: sliceOfAddrInfoToSliceOfKadPeerID(ev.CloserNodes), } case *EventGetCloserNodesFailure: + // queue an event that will notify the routing behaviour of a failed node + p.pending = append(p.pending, &EventNotifyNonConnectivity{ + ev.To.ID, + }) + cmd = &query.EventPoolFindCloserFailure[kadt.Key, kadt.PeerID]{ NodeID: kadt.PeerID(ev.To.ID), QueryID: ev.QueryID, diff --git a/v2/coord/query/query.go b/v2/coord/query/query.go index e5009a04..5982448d 100644 --- a/v2/coord/query/query.go +++ b/v2/coord/query/query.go @@ -130,14 +130,18 @@ func (q *Query[K, N]) Advance(ctx context.Context, ev QueryEvent) QueryState { switch tev := ev.(type) { case *EventQueryCancel: + span.SetAttributes(tele.AttrEvent("EventQueryCancel")) q.markFinished() return &StateQueryFinished{ QueryID: q.id, Stats: q.stats, } case *EventQueryFindCloserResponse[K, N]: + span.SetAttributes(tele.AttrEvent("EventQueryFindCloserResponse")) q.onMessageResponse(ctx, tev.NodeID, tev.CloserNodes) case *EventQueryFindCloserFailure[K, N]: + span.SetAttributes(tele.AttrEvent("EventQueryFindCloserFailure")) + span.RecordError(tev.Error) q.onMessageFailure(ctx, tev.NodeID) case nil: // TEMPORARY: no event to process @@ -170,6 +174,7 @@ func (q *Query[K, N]) Advance(ctx context.Context, ev QueryEvent) QueryState { q.inFlight-- q.stats.Failure++ } else if atCapacity() { + span.SetAttributes(tele.AttrOutEvent("StateQueryWaitingAtCapacity")) // this is the query's tracing span returnState = &StateQueryWaitingAtCapacity{ QueryID: q.id, Stats: q.stats, @@ -186,6 +191,7 @@ func (q *Query[K, N]) Advance(ctx context.Context, ev QueryEvent) QueryState { // If it has contacted at least NumResults nodes successfully then the iteration is done. if !progressing && successes >= q.cfg.NumResults { q.markFinished() + span.SetAttributes(tele.AttrOutEvent("StateQueryFinished")) // this is the query's tracing span returnState = &StateQueryFinished{ QueryID: q.id, Stats: q.stats, @@ -202,6 +208,7 @@ func (q *Query[K, N]) Advance(ctx context.Context, ev QueryEvent) QueryState { if q.stats.Start.IsZero() { q.stats.Start = q.cfg.Clock.Now() } + span.SetAttributes(tele.AttrOutEvent("StateQueryFindCloser")) // this is the query's tracing span returnState = &StateQueryFindCloser[K, N]{ NodeID: ni.NodeID, QueryID: q.id, @@ -211,6 +218,7 @@ func (q *Query[K, N]) Advance(ctx context.Context, ev QueryEvent) QueryState { return true } + span.SetAttributes(tele.AttrOutEvent("StateQueryWaitingAtCapacity")) // this is the query's tracing span returnState = &StateQueryWaitingAtCapacity{ QueryID: q.id, Stats: q.stats, @@ -233,6 +241,7 @@ func (q *Query[K, N]) Advance(ctx context.Context, ev QueryEvent) QueryState { if q.inFlight > 0 { // The iterator is still waiting for results and not at capacity + span.SetAttributes(tele.AttrOutEvent("StateQueryWaitingWithCapacity")) return &StateQueryWaitingWithCapacity{ QueryID: q.id, Stats: q.stats, @@ -242,6 +251,7 @@ func (q *Query[K, N]) Advance(ctx context.Context, ev QueryEvent) QueryState { // The iterator is finished because all available nodes have been contacted // and the iterator is not waiting for any more results. q.markFinished() + span.SetAttributes(tele.AttrOutEvent("StateQueryFinished")) return &StateQueryFinished{ QueryID: q.id, Stats: q.stats, diff --git a/v2/coord/routing.go b/v2/coord/routing.go index 95268cbe..f9edbe3f 100644 --- a/v2/coord/routing.go +++ b/v2/coord/routing.go @@ -91,7 +91,7 @@ func (r *RoutingBehaviour) notify(ctx context.Context, ev BehaviourEvent) { } case *EventRoutingUpdated: - span.SetAttributes(attribute.String("event", "EventRoutingUpdated")) + span.SetAttributes(attribute.String("event", "EventRoutingUpdated"), attribute.String("nodeid", ev.NodeInfo.ID.String())) cmd := &routing.EventProbeAdd[kadt.Key, kadt.PeerID]{ NodeID: addrInfoToKadPeerID(ev.NodeInfo), } @@ -201,6 +201,41 @@ func (r *RoutingBehaviour) notify(ctx context.Context, ev BehaviourEvent) { default: panic(fmt.Sprintf("unexpected query id: %s", ev.QueryID)) } + case *EventNotifyConnectivity: + span.SetAttributes(attribute.String("event", "EventNotifyConnectivity"), attribute.String("nodeid", ev.NodeInfo.ID.String())) + // ignore self + if ev.NodeInfo.ID == peer.ID(r.self) { + break + } + // tell the include state machine in case this is a new peer that could be added to the routing table + cmd := &routing.EventIncludeAddCandidate[kadt.Key, kadt.PeerID]{ + NodeID: kadt.PeerID(ev.NodeInfo.ID), + } + next, ok := r.advanceInclude(ctx, cmd) + if ok { + r.pending = append(r.pending, next) + } + + // tell the probe state machine in case there is are connectivity checks that could satisfied + cmdProbe := &routing.EventProbeNotifyConnectivity[kadt.Key, kadt.PeerID]{ + NodeID: kadt.PeerID(ev.NodeInfo.ID), + } + nextProbe, ok := r.advanceProbe(ctx, cmdProbe) + if ok { + r.pending = append(r.pending, nextProbe) + } + case *EventNotifyNonConnectivity: + span.SetAttributes(attribute.String("event", "EventNotifyConnectivity"), attribute.String("nodeid", ev.NodeID.String())) + + // tell the probe state machine to remove the node from the routing table and probe list + cmdProbe := &routing.EventProbeRemove[kadt.Key, kadt.PeerID]{ + NodeID: kadt.PeerID(ev.NodeID), + } + nextProbe, ok := r.advanceProbe(ctx, cmdProbe) + if ok { + r.pending = append(r.pending, nextProbe) + } + default: panic(fmt.Sprintf("unexpected dht event: %T", ev)) } @@ -351,7 +386,13 @@ func (r *RoutingBehaviour) advanceProbe(ctx context.Context, ev routing.ProbeEve Notify: r, }, true case *routing.StateProbeNodeFailure[kadt.Key, kadt.PeerID]: - // a node has failed a connectivity check been removed from the routing table and the probe list + // a node has failed a connectivity check and been removed from the routing table and the probe list + + // emit an EventRoutingRemoved event to notify clients that the node has been removed + r.pending = append(r.pending, &EventRoutingRemoved{ + NodeID: peer.ID(st.NodeID), + }) + // add the node to the inclusion list for a second chance r.notify(ctx, &EventAddAddrInfo{ NodeInfo: kadPeerIDToAddrInfo(st.NodeID), diff --git a/v2/coord/routing/bootstrap.go b/v2/coord/routing/bootstrap.go index 2c674b00..683683a7 100644 --- a/v2/coord/routing/bootstrap.go +++ b/v2/coord/routing/bootstrap.go @@ -8,6 +8,7 @@ import ( "github.com/benbjohnson/clock" "github.com/plprobelab/go-kademlia/kad" "github.com/plprobelab/go-kademlia/kaderr" + "go.opentelemetry.io/otel/attribute" "github.com/libp2p/go-libp2p-kad-dht/v2/coord/query" "github.com/libp2p/go-libp2p-kad-dht/v2/tele" @@ -96,6 +97,7 @@ func (b *Bootstrap[K, N]) Advance(ctx context.Context, ev BootstrapEvent) Bootst switch tev := ev.(type) { case *EventBootstrapStart[K, N]: + span.SetAttributes(tele.AttrEvent("EventBootstrapStart")) // TODO: ignore start event if query is already in progress iter := query.NewClosestNodesIter[K, N](b.self.Key()) @@ -116,17 +118,21 @@ func (b *Bootstrap[K, N]) Advance(ctx context.Context, ev BootstrapEvent) Bootst return b.advanceQuery(ctx, nil) case *EventBootstrapFindCloserResponse[K, N]: + span.SetAttributes(tele.AttrEvent("EventBootstrapFindCloserResponse")) return b.advanceQuery(ctx, &query.EventQueryFindCloserResponse[K, N]{ NodeID: tev.NodeID, CloserNodes: tev.CloserNodes, }) case *EventBootstrapFindCloserFailure[K, N]: + span.SetAttributes(tele.AttrEvent("EventBootstrapFindCloserFailure")) + span.RecordError(tev.Error) return b.advanceQuery(ctx, &query.EventQueryFindCloserFailure[K, N]{ NodeID: tev.NodeID, Error: tev.Error, }) case *EventBootstrapPoll: + span.SetAttributes(tele.AttrEvent("EventBootstrapPoll")) // ignore, nothing to do default: panic(fmt.Sprintf("unexpected event: %T", tev)) @@ -140,9 +146,12 @@ func (b *Bootstrap[K, N]) Advance(ctx context.Context, ev BootstrapEvent) Bootst } func (b *Bootstrap[K, N]) advanceQuery(ctx context.Context, qev query.QueryEvent) BootstrapState { + ctx, span := tele.StartSpan(ctx, "Bootstrap.advanceQuery") + defer span.End() state := b.qry.Advance(ctx, qev) switch st := state.(type) { case *query.StateQueryFindCloser[K, N]: + span.SetAttributes(attribute.String("out_state", "StateQueryFindCloser")) return &StateBootstrapFindCloser[K, N]{ QueryID: st.QueryID, Stats: st.Stats, @@ -150,26 +159,31 @@ func (b *Bootstrap[K, N]) advanceQuery(ctx context.Context, qev query.QueryEvent Target: st.Target, } case *query.StateQueryFinished: + span.SetAttributes(attribute.String("out_state", "StateBootstrapFinished")) return &StateBootstrapFinished{ Stats: st.Stats, } case *query.StateQueryWaitingAtCapacity: elapsed := b.cfg.Clock.Since(st.Stats.Start) if elapsed > b.cfg.Timeout { + span.SetAttributes(attribute.String("out_state", "StateBootstrapTimeout")) return &StateBootstrapTimeout{ Stats: st.Stats, } } + span.SetAttributes(attribute.String("out_state", "StateBootstrapWaiting")) return &StateBootstrapWaiting{ Stats: st.Stats, } case *query.StateQueryWaitingWithCapacity: elapsed := b.cfg.Clock.Since(st.Stats.Start) if elapsed > b.cfg.Timeout { + span.SetAttributes(attribute.String("out_state", "StateBootstrapTimeout")) return &StateBootstrapTimeout{ Stats: st.Stats, } } + span.SetAttributes(attribute.String("out_state", "StateBootstrapWaiting")) return &StateBootstrapWaiting{ Stats: st.Stats, } diff --git a/v2/coord/routing/include.go b/v2/coord/routing/include.go index 0830a6b1..749fe931 100644 --- a/v2/coord/routing/include.go +++ b/v2/coord/routing/include.go @@ -106,6 +106,7 @@ func (b *Include[K, N]) Advance(ctx context.Context, ev IncludeEvent) IncludeSta switch tev := ev.(type) { case *EventIncludeAddCandidate[K, N]: + span.SetAttributes(tele.AttrEvent("EventIncludeAddCandidate")) // Ignore if already running a check _, checking := b.checks[key.HexString(tev.NodeID.Key())] if checking { @@ -124,20 +125,25 @@ func (b *Include[K, N]) Advance(ctx context.Context, ev IncludeEvent) IncludeSta b.candidates.Enqueue(ctx, tev.NodeID) case *EventIncludeConnectivityCheckSuccess[K, N]: + span.SetAttributes(tele.AttrEvent("EventIncludeConnectivityCheckSuccess")) ch, ok := b.checks[key.HexString(tev.NodeID.Key())] if ok { delete(b.checks, key.HexString(tev.NodeID.Key())) if b.rt.AddNode(tev.NodeID) { + span.SetAttributes(tele.AttrOutEvent("StateIncludeRoutingUpdated")) return &StateIncludeRoutingUpdated[K, N]{ NodeID: ch.NodeID, } } } case *EventIncludeConnectivityCheckFailure[K, N]: + span.SetAttributes(tele.AttrEvent("EventIncludeConnectivityCheckFailure")) + span.RecordError(tev.Error) delete(b.checks, key.HexString(tev.NodeID.Key())) case *EventIncludePoll: - // ignore, nothing to do + span.SetAttributes(tele.AttrEvent("EventIncludePoll")) + // ignore, nothing to do default: panic(fmt.Sprintf("unexpected event: %T", tev)) } @@ -153,6 +159,7 @@ func (b *Include[K, N]) Advance(ctx context.Context, ev IncludeEvent) IncludeSta if !ok { // No candidate in queue if len(b.checks) > 0 { + span.SetAttributes(tele.AttrOutEvent("StateIncludeWaitingWithCapacity")) return &StateIncludeWaitingWithCapacity{} } return &StateIncludeIdle{} @@ -164,6 +171,7 @@ func (b *Include[K, N]) Advance(ctx context.Context, ev IncludeEvent) IncludeSta } // Ask the node to find itself + span.SetAttributes(tele.AttrOutEvent("StateIncludeConnectivityCheck")) return &StateIncludeConnectivityCheck[K, N]{ NodeID: candidate, } diff --git a/v2/coord/routing/probe.go b/v2/coord/routing/probe.go index fd044036..b8f78c60 100644 --- a/v2/coord/routing/probe.go +++ b/v2/coord/routing/probe.go @@ -160,11 +160,13 @@ func (p *Probe[K, N]) Advance(ctx context.Context, ev ProbeEvent) ProbeState { p.nvl.Put(nv) case *EventProbeRemove[K, N]: span.SetAttributes(tele.AttrEvent("EventProbeRemove"), attribute.String("nodeid", tev.NodeID.String())) + p.rt.RemoveKey(tev.NodeID.Key()) p.nvl.Remove(tev.NodeID) return &StateProbeNodeFailure[K, N]{ NodeID: tev.NodeID, } + case *EventProbeConnectivityCheckSuccess[K, N]: span.SetAttributes(tele.AttrEvent("EventProbeMessageResponse"), attribute.String("nodeid", tev.NodeID.String())) nv, found := p.nvl.Get(tev.NodeID) @@ -183,6 +185,7 @@ func (p *Probe[K, N]) Advance(ctx context.Context, ev ProbeEvent) ProbeState { // probe failed, so remove from routing table and from list span.SetAttributes(tele.AttrEvent("EventProbeMessageFailure"), attribute.String("nodeid", tev.NodeID.String())) span.RecordError(tev.Error) + p.rt.RemoveKey(tev.NodeID.Key()) p.nvl.Remove(tev.NodeID) return &StateProbeNodeFailure[K, N]{ @@ -211,6 +214,7 @@ func (p *Probe[K, N]) Advance(ctx context.Context, ev ProbeEvent) ProbeState { candidate, found := p.nvl.FindCheckPastDeadline(p.cfg.Clock.Now()) if !found { // nothing suitable for time out + span.SetAttributes(tele.AttrOutEvent("StateProbeWaitingAtCapacity")) return &StateProbeWaitingAtCapacity{} } @@ -228,6 +232,7 @@ func (p *Probe[K, N]) Advance(ctx context.Context, ev ProbeEvent) ProbeState { if !ok { if p.nvl.OngoingCount() > 0 { // waiting for a check but nothing else to do + span.SetAttributes(tele.AttrOutEvent("StateProbeWaitingWithCapacity")) return &StateProbeWaitingWithCapacity{} } // nothing happening and nothing to do @@ -237,6 +242,7 @@ func (p *Probe[K, N]) Advance(ctx context.Context, ev ProbeEvent) ProbeState { p.nvl.MarkOngoing(next.NodeID, p.cfg.Clock.Now().Add(p.cfg.Timeout)) // Ask the node to find itself + span.SetAttributes(tele.AttrOutEvent("StateProbeConnectivityCheck")) return &StateProbeConnectivityCheck[K, N]{ NodeID: next.NodeID, } diff --git a/v2/dht.go b/v2/dht.go index db0e0b63..06086d81 100644 --- a/v2/dht.go +++ b/v2/dht.go @@ -151,6 +151,7 @@ func New(h host.Host, cfg *Config) (*DHT, error) { // instantiate a new Kademlia DHT coordinator. coordCfg := coord.DefaultCoordinatorConfig() + coordCfg.Clock = cfg.Clock coordCfg.MeterProvider = cfg.MeterProvider coordCfg.TracerProvider = cfg.TracerProvider diff --git a/v2/go.mod b/v2/go.mod index 3e11da4d..5a0aacd8 100644 --- a/v2/go.mod +++ b/v2/go.mod @@ -15,7 +15,7 @@ require ( github.com/libp2p/go-msgio v0.3.0 github.com/multiformats/go-base32 v0.1.0 github.com/multiformats/go-multiaddr v0.11.0 - github.com/plprobelab/go-kademlia v0.0.0-20230911085009-18d957853c57 + github.com/plprobelab/go-kademlia v0.0.0-20230913171354-443ec1f56080 github.com/stretchr/testify v1.8.4 go.opentelemetry.io/otel v1.17.0 go.opentelemetry.io/otel/exporters/jaeger v1.16.0 diff --git a/v2/go.sum b/v2/go.sum index df21f778..f2feab1a 100644 --- a/v2/go.sum +++ b/v2/go.sum @@ -259,8 +259,8 @@ github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhM github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/plprobelab/go-kademlia v0.0.0-20230911085009-18d957853c57 h1:9qB1pIoeis/hdhTxVcQLrFYJhGVRJIJESLP/kCud5HE= -github.com/plprobelab/go-kademlia v0.0.0-20230911085009-18d957853c57/go.mod h1:9mz9/8plJj9HWiQmB6JkBNHY30AXzy9LrJ++sCvWqFQ= +github.com/plprobelab/go-kademlia v0.0.0-20230913171354-443ec1f56080 h1:CqaVJqntB6Gm7LILVsIZv0Sdy9kfmi74rwZRt66hPLM= +github.com/plprobelab/go-kademlia v0.0.0-20230913171354-443ec1f56080/go.mod h1:9mz9/8plJj9HWiQmB6JkBNHY30AXzy9LrJ++sCvWqFQ= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/polydawn/refmt v0.89.0 h1:ADJTApkvkeBZsN0tBTx8QjpD9JkmxbKp0cxfr9qszm4= diff --git a/v2/handlers.go b/v2/handlers.go index 5243a79d..bcd89f9a 100644 --- a/v2/handlers.go +++ b/v2/handlers.go @@ -24,6 +24,9 @@ func (d *DHT) handleFindPeer(ctx context.Context, remote peer.ID, req *pb.Messag return nil, fmt.Errorf("handleFindPeer with empty key") } + // tell the coordinator that this peer supports finding closer nodes + d.kad.NotifyConnectivity(ctx, remote) + // "parse" requested peer ID from the key field target := peer.ID(req.GetKey()) diff --git a/v2/handlers_test.go b/v2/handlers_test.go index e80e7192..a94816e1 100644 --- a/v2/handlers_test.go +++ b/v2/handlers_test.go @@ -33,7 +33,9 @@ import ( var rng = rand.New(rand.NewSource(1337)) func newTestDHT(t testing.TB) *DHT { - return newTestDHTWithConfig(t, DefaultConfig()) + cfg := DefaultConfig() + + return newTestDHTWithConfig(t, cfg) } func newTestDHTWithConfig(t testing.TB, cfg *Config) *DHT { diff --git a/v2/internal/kadtest/tracing.go b/v2/internal/kadtest/tracing.go index dc7c82c8..a8125423 100644 --- a/v2/internal/kadtest/tracing.go +++ b/v2/internal/kadtest/tracing.go @@ -2,28 +2,63 @@ package kadtest import ( "context" + "flag" "fmt" "testing" + "github.com/libp2p/go-libp2p-kad-dht/v2/tele" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/jaeger" - "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" + "go.opentelemetry.io/otel/trace" ) +var ( + tracing = flag.Bool("tracing", false, "Enable or disable tracing") + tracingHost = flag.String("tracinghost", "127.0.0.1", "Hostname of tracing collector endpoint") + tracingPort = flag.Int("tracingport", 14268, "Port number of tracing collector endpoint") +) + +// MaybeTrace returns a context containing a new root span named after the test. It creates an new +// tracing provider and installs it as the global provider, restoring the previous provider at the +// end of the test. This function cannot be called from tests that are run in parallel. +func MaybeTrace(t *testing.T, ctx context.Context) (context.Context, trace.TracerProvider) { + if !*tracing { + return ctx, otel.GetTracerProvider() + } + + tp := JaegerTracerProvider(t) + t.Logf("Tracing enabled and exporting to %s:%d", *tracingHost, *tracingPort) + + ctx, span := tp.Tracer("kadtest").Start(ctx, t.Name(), trace.WithNewRoot()) + t.Cleanup(func() { + span.End() + }) + + return ctx, tp +} + // JaegerTracerProvider creates a tracer provider that exports traces to a Jaeger instance running // on localhost on port 14268 -func JaegerTracerProvider(t *testing.T) *trace.TracerProvider { +func JaegerTracerProvider(t *testing.T) trace.TracerProvider { t.Helper() - traceHost := "127.0.0.1" - tracePort := 14268 - - endpoint := fmt.Sprintf("http://%s:%d/api/traces", traceHost, tracePort) + endpoint := fmt.Sprintf("http://%s:%d/api/traces", *tracingHost, *tracingPort) exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(endpoint))) if err != nil { t.Fatalf("failed to create jaeger exporter: %v", err) } - tp := trace.NewTracerProvider(trace.WithBatcher(exp)) + tp := sdktrace.NewTracerProvider( + sdktrace.WithBatcher(exp), + sdktrace.WithResource(resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceNameKey.String(tele.TracerName), + semconv.DeploymentEnvironmentKey.String("testing"), + )), + ) t.Cleanup(func() { tp.Shutdown(context.Background()) diff --git a/v2/notifee.go b/v2/notifee.go index 8999b7c0..d1889428 100644 --- a/v2/notifee.go +++ b/v2/notifee.go @@ -1,10 +1,12 @@ package dht import ( + "context" "fmt" "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" ) // networkEventsSubscription registers a subscription on the libp2p event bus @@ -49,6 +51,7 @@ func (d *DHT) consumeNetworkEvents(sub event.Subscription) { case event.EvtLocalAddressesUpdated: case event.EvtPeerProtocolsUpdated: case event.EvtPeerIdentificationCompleted: + d.onEvtPeerIdentificationCompleted(evt) case event.EvtPeerConnectednessChanged: default: d.log.Warn("unknown libp2p event", "type", fmt.Sprintf("%T", evt)) @@ -84,3 +87,10 @@ func (d *DHT) onEvtLocalReachabilityChanged(evt event.EvtLocalReachabilityChange d.log.With("reachability", evt.Reachability).Warn("unknown reachability type") } } + +func (d *DHT) onEvtPeerIdentificationCompleted(evt event.EvtPeerIdentificationCompleted) { + // tell the coordinator about a new candidate for inclusion in the routing table + d.kad.AddNodes(context.Background(), []peer.AddrInfo{ + {ID: evt.Peer}, + }) +} diff --git a/v2/notifee_test.go b/v2/notifee_test.go index e8f3d90e..a42f82bf 100644 --- a/v2/notifee_test.go +++ b/v2/notifee_test.go @@ -2,11 +2,14 @@ package dht import ( "testing" + "time" + "github.com/libp2p/go-libp2p-kad-dht/v2/internal/kadtest" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/event" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestDHT_consumeNetworkEvents_onEvtLocalReachabilityChanged(t *testing.T) { @@ -65,3 +68,21 @@ func TestDHT_consumeNetworkEvents_onEvtLocalReachabilityChanged(t *testing.T) { assert.Equal(t, modeServer, d.mode) }) } + +func TestDHT_consumeNetworkEvents_onEvtPeerIdentificationCompleted(t *testing.T) { + ctx := kadtest.CtxShort(t) + + d1 := newServerDht(t, nil) + d2 := newServerDht(t, nil) + + // make sure d1 has the address of d2 in its peerstore + d1.host.Peerstore().AddAddrs(d2.host.ID(), d2.host.Addrs(), time.Minute) + + // send the event + d1.onEvtPeerIdentificationCompleted(event.EvtPeerIdentificationCompleted{ + Peer: d2.host.ID(), + }) + + _, err := expectRoutingUpdated(t, ctx, d1.kad.RoutingNotifications(), d2.host.ID()) + require.NoError(t, err) +} diff --git a/v2/query_test.go b/v2/query_test.go index 29fa004a..b96c0b33 100644 --- a/v2/query_test.go +++ b/v2/query_test.go @@ -1,11 +1,19 @@ package dht import ( + "context" + "fmt" "testing" + "time" "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" "github.com/stretchr/testify/require" + + "github.com/libp2p/go-libp2p-kad-dht/v2/coord" + "github.com/libp2p/go-libp2p-kad-dht/v2/internal/kadtest" + "github.com/libp2p/go-libp2p-kad-dht/v2/kadt" ) func newServerHost(t testing.TB) host.Host { @@ -39,6 +47,7 @@ func newClientHost(t testing.TB) host.Host { func newServerDht(t testing.TB, cfg *Config) *DHT { h := newServerHost(t) + var err error if cfg == nil { cfg = DefaultConfig() } @@ -47,6 +56,10 @@ func newServerDht(t testing.TB, cfg *Config) *DHT { d, err := New(h, cfg) require.NoError(t, err) + // add at least 1 entry in the routing table so the server will pass connectivity checks + fillRoutingTable(t, d, 1) + require.NotEmpty(t, d.rt.NearestNodes(kadt.PeerID(d.host.ID()).Key(), 1)) + t.Cleanup(func() { if err = d.Close(); err != nil { t.Logf("unexpected error when closing dht: %s", err) @@ -58,6 +71,7 @@ func newServerDht(t testing.TB, cfg *Config) *DHT { func newClientDht(t testing.TB, cfg *Config) *DHT { h := newClientHost(t) + var err error if cfg == nil { cfg = DefaultConfig() } @@ -72,3 +86,144 @@ func newClientDht(t testing.TB, cfg *Config) *DHT { }) return d } + +// expectRoutingUpdated selects on the event channel until an EventRoutingUpdated event is seen for the specified peer id +func expectRoutingUpdated(t *testing.T, ctx context.Context, events <-chan coord.RoutingNotification, id peer.ID) (*coord.EventRoutingUpdated, error) { + t.Helper() + for { + select { + case ev := <-events: + if tev, ok := ev.(*coord.EventRoutingUpdated); ok { + if tev.NodeInfo.ID == id { + return tev, nil + } + t.Logf("saw routing update for %s", tev.NodeInfo.ID) + } + case <-ctx.Done(): + return nil, fmt.Errorf("test deadline exceeded while waiting for routing update event") + } + } +} + +// expectRoutingUpdated selects on the event channel until an EventRoutingUpdated event is seen for the specified peer id +func expectRoutingRemoved(t *testing.T, ctx context.Context, events <-chan coord.RoutingNotification, id peer.ID) (*coord.EventRoutingRemoved, error) { + t.Helper() + for { + select { + case ev := <-events: + if tev, ok := ev.(*coord.EventRoutingRemoved); ok { + if tev.NodeID == id { + return tev, nil + } + t.Logf("saw routing removed for %s", tev.NodeID) + } + case <-ctx.Done(): + return nil, fmt.Errorf("test deadline exceeded while waiting for routing removed event") + } + } +} + +func connect(t *testing.T, ctx context.Context, a, b *DHT) { + t.Helper() + + remoteAddrInfo := peer.AddrInfo{ + ID: b.host.ID(), + Addrs: b.host.Addrs(), + } + + // Add b's addresss to a + err := a.AddAddresses(ctx, []peer.AddrInfo{remoteAddrInfo}, time.Minute) + require.NoError(t, err) + + // the include state machine runs in the background for a and eventually should add the node to routing table + _, err = expectRoutingUpdated(t, ctx, a.kad.RoutingNotifications(), b.host.ID()) + require.NoError(t, err) + + // the routing table should now contain the node + _, err = a.kad.GetNode(ctx, b.host.ID()) + require.NoError(t, err) +} + +// connectLinearChain connects the dhts together in a linear chain. +// The dhts are configured with routing tables that contain immediate neighbours. +func connectLinearChain(t *testing.T, ctx context.Context, dhts ...*DHT) { + for i := 1; i < len(dhts); i++ { + connect(t, ctx, dhts[i-1], dhts[i]) + connect(t, ctx, dhts[i], dhts[i-1]) + } +} + +func TestRTAdditionOnSuccessfulQuery(t *testing.T) { + ctx := kadtest.CtxShort(t) + ctx, tp := kadtest.MaybeTrace(t, ctx) + + cfg := DefaultConfig() + cfg.TracerProvider = tp + + d1 := newServerDht(t, cfg) + d2 := newServerDht(t, cfg) + d3 := newServerDht(t, cfg) + + connectLinearChain(t, ctx, d1, d2, d3) + + // d3 does not know about d1 + _, err := d3.kad.GetNode(ctx, d1.host.ID()) + require.ErrorIs(t, err, coord.ErrNodeNotFound) + + // d1 does not know about d3 + _, err = d1.kad.GetNode(ctx, d3.host.ID()) + require.ErrorIs(t, err, coord.ErrNodeNotFound) + + // // but when d3 queries d2, d1 and d3 discover each other + _, _ = d3.FindPeer(ctx, "something") + // ignore the error + + // d3 should update its routing table to include d1 during the query + _, err = expectRoutingUpdated(t, ctx, d3.kad.RoutingNotifications(), d1.host.ID()) + require.NoError(t, err) + + // d3 now has d1 in its routing table + _, err = d3.kad.GetNode(ctx, d1.host.ID()) + require.NoError(t, err) + + // d1 should update its routing table to include d3 during the query + _, err = expectRoutingUpdated(t, ctx, d1.kad.RoutingNotifications(), d3.host.ID()) + require.NoError(t, err) + + // d1 now has d3 in its routing table + _, err = d1.kad.GetNode(ctx, d3.host.ID()) + require.NoError(t, err) +} + +func TestRTEvictionOnFailedQuery(t *testing.T) { + ctx := kadtest.CtxShort(t) + + cfg := DefaultConfig() + + d1 := newServerDht(t, cfg) + d2 := newServerDht(t, cfg) + connect(t, ctx, d1, d2) + connect(t, ctx, d2, d1) + + // close both hosts so query fails + require.NoError(t, d1.host.Close()) + require.NoError(t, d2.host.Close()) + + // peers will still be in the RT because time is paused and + // no scheduled probes will have taken place + + // d1 still has d2 in the routing table + _, err := d1.kad.GetNode(ctx, d2.host.ID()) + require.NoError(t, err) + + // d2 still has d1 in the routing table + _, err = d2.kad.GetNode(ctx, d1.host.ID()) + require.NoError(t, err) + + // failed queries should remove the queried peers from the routing table + _, _ = d1.FindPeer(ctx, "test") + + // d1 should update its routing table to remove d2 because of the failure + _, err = expectRoutingRemoved(t, ctx, d1.kad.RoutingNotifications(), d2.host.ID()) + require.NoError(t, err) +} diff --git a/v2/router.go b/v2/router.go index f18f2892..2c5ed505 100644 --- a/v2/router.go +++ b/v2/router.go @@ -58,7 +58,7 @@ func (r *Router) SendMessage(ctx context.Context, to peer.AddrInfo, protoID addr // TODO: what to do with addresses in peer.AddrInfo? if len(r.host.Peerstore().Addrs(to.ID)) == 0 { - return nil, fmt.Errorf("aaah ProtoKadMessage") + return nil, fmt.Errorf("no address for peer %s", to.ID) } var cancel context.CancelFunc diff --git a/v2/routing.go b/v2/routing.go index cc72f849..e17ae434 100644 --- a/v2/routing.go +++ b/v2/routing.go @@ -18,7 +18,6 @@ import ( "github.com/libp2p/go-libp2p/core/routing" "go.opentelemetry.io/otel/attribute" otel "go.opentelemetry.io/otel/trace" - "golang.org/x/exp/slog" ) var _ routing.Routing = (*DHT)(nil) @@ -44,7 +43,6 @@ func (d *DHT) FindPeer(ctx context.Context, id peer.ID) (peer.AddrInfo, error) { var foundNode coord.Node fn := func(ctx context.Context, node coord.Node, stats coord.QueryStats) error { - slog.Info("visiting node", "id", node.ID()) if node.ID() == id { foundNode = node return coord.ErrSkipRemaining diff --git a/v2/tele/tele.go b/v2/tele/tele.go index 7163f3d0..9309f85a 100644 --- a/v2/tele/tele.go +++ b/v2/tele/tele.go @@ -91,6 +91,11 @@ func AttrEvent(val string) attribute.KeyValue { return attribute.String("event", val) } +// AttrOutEvent creates an attribute that records the name of an event being returned +func AttrOutEvent(val string) attribute.KeyValue { + return attribute.String("out_event", val) +} + // WithAttributes is a function that attaches the provided attributes to the // given context. The given attributes will overwrite any already existing ones. func WithAttributes(ctx context.Context, attrs ...attribute.KeyValue) context.Context {