Skip to content

Commit

Permalink
Test query interactions with routing table (#887)
Browse files Browse the repository at this point in the history
* Test query interactions with routing table

* v2: upgrade to go1.21

* Add uci config

* Use newer uci actions

* Pass clock to coordinator

* Use v2 working directory in actions

* Set go-version input in actions

* Set go-version input in actions

* Use go 1.20.8 in actions

* Use go 1.21.1 and relative working directory

* Try default working directory on job

* Remove uci.yaml which is not supported yet

* Try default working directory on job

* Try default working directory as input

* Restore uci.yaml

* Restore uci.yaml

* Use modified go-check

* Use modified go-test

* Fix go-test

* Fix go-test

* Fix go-test

* Update go-kademlia

* Add more tracing

* Use go-kademlia trie fix
  • Loading branch information
iand authored Sep 18, 2023
1 parent 97e4e02 commit 1d35505
Show file tree
Hide file tree
Showing 20 changed files with 400 additions and 19 deletions.
33 changes: 33 additions & 0 deletions v2/coord/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,8 @@ func (c *Coordinator) dispatchEvent(ctx context.Context, ev BehaviourEvent) {
// GetNode retrieves the node associated with the given node id from the DHT's local routing table.
// If the node isn't found in the table, it returns ErrNodeNotFound.
func (c *Coordinator) GetNode(ctx context.Context, id peer.ID) (Node, error) {
ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.GetNode")
defer span.End()
if _, exists := c.rt.GetNode(kadt.PeerID(id).Key()); !exists {
return nil, ErrNodeNotFound
}
Expand All @@ -321,6 +323,8 @@ func (c *Coordinator) GetNode(ctx context.Context, id peer.ID) (Node, error) {

// GetClosestNodes requests the n closest nodes to the key from the node's local routing table.
func (c *Coordinator) GetClosestNodes(ctx context.Context, k kadt.Key, n int) ([]Node, error) {
ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.GetClosestNodes")
defer span.End()
closest := c.rt.NearestNodes(k, n)
nodes := make([]Node, 0, len(closest))
for _, id := range closest {
Expand Down Expand Up @@ -462,3 +466,32 @@ func (c *Coordinator) Bootstrap(ctx context.Context, seeds []peer.ID) error {

return nil
}

// NotifyConnectivity notifies the coordinator that a peer has passed a connectivity check
// which means it is connected and supports finding closer nodes
func (c *Coordinator) NotifyConnectivity(ctx context.Context, id peer.ID) error {
ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.NotifyConnectivity")
defer span.End()

ai := peer.AddrInfo{
ID: id,
}
c.routingBehaviour.Notify(ctx, &EventNotifyConnectivity{
NodeInfo: ai,
})

return nil
}

// NotifyNonConnectivity notifies the coordinator that a peer has failed a connectivity check
// which means it is not connected and/or it doesn't support finding closer nodes
func (c *Coordinator) NotifyNonConnectivity(ctx context.Context, id peer.ID) error {
ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.NotifyNonConnectivity")
defer span.End()

c.routingBehaviour.Notify(ctx, &EventNotifyNonConnectivity{
NodeID: id,
})

return nil
}
38 changes: 36 additions & 2 deletions v2/coord/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,26 +86,32 @@ type EventStopQuery struct {
func (*EventStopQuery) behaviourEvent() {}
func (*EventStopQuery) queryCommand() {}

// EventAddAddrInfo notifies the routing behaviour of a potential new peer or of additional addresses for
// an existing peer.
type EventAddAddrInfo struct {
NodeInfo peer.AddrInfo
}

func (*EventAddAddrInfo) behaviourEvent() {}
func (*EventAddAddrInfo) routingCommand() {}

// EventGetCloserNodesSuccess notifies a behaviour that a GetCloserNodes request, initiated by an
// [EventOutboundGetCloserNodes] event has produced a successful response.
type EventGetCloserNodesSuccess struct {
QueryID query.QueryID
To peer.AddrInfo
To peer.AddrInfo // To is the peer address that the GetCloserNodes request was sent to.
Target kadt.Key
CloserNodes []peer.AddrInfo
}

func (*EventGetCloserNodesSuccess) behaviourEvent() {}
func (*EventGetCloserNodesSuccess) nodeHandlerResponse() {}

// EventGetCloserNodesFailure notifies a behaviour that a GetCloserNodes request, initiated by an
// [EventOutboundGetCloserNodes] event has failed to produce a valid response.
type EventGetCloserNodesFailure struct {
QueryID query.QueryID
To peer.AddrInfo
To peer.AddrInfo // To is the peer address that the GetCloserNodes request was sent to.
Target kadt.Key
Err error
}
Expand Down Expand Up @@ -141,6 +147,14 @@ type EventRoutingUpdated struct {
func (*EventRoutingUpdated) behaviourEvent() {}
func (*EventRoutingUpdated) routingNotification() {}

// EventRoutingRemoved is emitted by the coordinator when new node has been removed from the routing table.
type EventRoutingRemoved struct {
NodeID peer.ID
}

func (*EventRoutingRemoved) behaviourEvent() {}
func (*EventRoutingRemoved) routingNotification() {}

// EventBootstrapFinished is emitted by the coordinator when a bootstrap has finished, either through
// running to completion or by being canceled.
type EventBootstrapFinished struct {
Expand All @@ -149,3 +163,23 @@ type EventBootstrapFinished struct {

func (*EventBootstrapFinished) behaviourEvent() {}
func (*EventBootstrapFinished) routingNotification() {}

// EventNotifyConnectivity notifies a behaviour that a peer's connectivity and support for finding closer nodes
// has been confirmed such as from a successful query response or an inbound query. This should not be used for
// general connections to the host but only when it is confirmed that the peer responds to requests for closer
// nodes.
type EventNotifyConnectivity struct {
NodeInfo peer.AddrInfo
}

func (*EventNotifyConnectivity) behaviourEvent() {}
func (*EventNotifyConnectivity) routingNotification() {}

// EventNotifyNonConnectivity notifies a behaviour that a peer does not have connectivity and/or does not support
// finding closer nodes is known.
type EventNotifyNonConnectivity struct {
NodeID peer.ID
}

func (*EventNotifyNonConnectivity) behaviourEvent() {}
func (*EventNotifyNonConnectivity) routingCommand() {}
5 changes: 5 additions & 0 deletions v2/coord/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ func (p *PooledQueryBehaviour) Notify(ctx context.Context, ev BehaviourEvent) {
CloserNodes: sliceOfAddrInfoToSliceOfKadPeerID(ev.CloserNodes),
}
case *EventGetCloserNodesFailure:
// queue an event that will notify the routing behaviour of a failed node
p.pending = append(p.pending, &EventNotifyNonConnectivity{
ev.To.ID,
})

cmd = &query.EventPoolFindCloserFailure[kadt.Key, kadt.PeerID]{
NodeID: kadt.PeerID(ev.To.ID),
QueryID: ev.QueryID,
Expand Down
10 changes: 10 additions & 0 deletions v2/coord/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,14 +130,18 @@ func (q *Query[K, N]) Advance(ctx context.Context, ev QueryEvent) QueryState {

switch tev := ev.(type) {
case *EventQueryCancel:
span.SetAttributes(tele.AttrEvent("EventQueryCancel"))
q.markFinished()
return &StateQueryFinished{
QueryID: q.id,
Stats: q.stats,
}
case *EventQueryFindCloserResponse[K, N]:
span.SetAttributes(tele.AttrEvent("EventQueryFindCloserResponse"))
q.onMessageResponse(ctx, tev.NodeID, tev.CloserNodes)
case *EventQueryFindCloserFailure[K, N]:
span.SetAttributes(tele.AttrEvent("EventQueryFindCloserFailure"))
span.RecordError(tev.Error)
q.onMessageFailure(ctx, tev.NodeID)
case nil:
// TEMPORARY: no event to process
Expand Down Expand Up @@ -170,6 +174,7 @@ func (q *Query[K, N]) Advance(ctx context.Context, ev QueryEvent) QueryState {
q.inFlight--
q.stats.Failure++
} else if atCapacity() {
span.SetAttributes(tele.AttrOutEvent("StateQueryWaitingAtCapacity")) // this is the query's tracing span
returnState = &StateQueryWaitingAtCapacity{
QueryID: q.id,
Stats: q.stats,
Expand All @@ -186,6 +191,7 @@ func (q *Query[K, N]) Advance(ctx context.Context, ev QueryEvent) QueryState {
// If it has contacted at least NumResults nodes successfully then the iteration is done.
if !progressing && successes >= q.cfg.NumResults {
q.markFinished()
span.SetAttributes(tele.AttrOutEvent("StateQueryFinished")) // this is the query's tracing span
returnState = &StateQueryFinished{
QueryID: q.id,
Stats: q.stats,
Expand All @@ -202,6 +208,7 @@ func (q *Query[K, N]) Advance(ctx context.Context, ev QueryEvent) QueryState {
if q.stats.Start.IsZero() {
q.stats.Start = q.cfg.Clock.Now()
}
span.SetAttributes(tele.AttrOutEvent("StateQueryFindCloser")) // this is the query's tracing span
returnState = &StateQueryFindCloser[K, N]{
NodeID: ni.NodeID,
QueryID: q.id,
Expand All @@ -211,6 +218,7 @@ func (q *Query[K, N]) Advance(ctx context.Context, ev QueryEvent) QueryState {
return true

}
span.SetAttributes(tele.AttrOutEvent("StateQueryWaitingAtCapacity")) // this is the query's tracing span
returnState = &StateQueryWaitingAtCapacity{
QueryID: q.id,
Stats: q.stats,
Expand All @@ -233,6 +241,7 @@ func (q *Query[K, N]) Advance(ctx context.Context, ev QueryEvent) QueryState {

if q.inFlight > 0 {
// The iterator is still waiting for results and not at capacity
span.SetAttributes(tele.AttrOutEvent("StateQueryWaitingWithCapacity"))
return &StateQueryWaitingWithCapacity{
QueryID: q.id,
Stats: q.stats,
Expand All @@ -242,6 +251,7 @@ func (q *Query[K, N]) Advance(ctx context.Context, ev QueryEvent) QueryState {
// The iterator is finished because all available nodes have been contacted
// and the iterator is not waiting for any more results.
q.markFinished()
span.SetAttributes(tele.AttrOutEvent("StateQueryFinished"))
return &StateQueryFinished{
QueryID: q.id,
Stats: q.stats,
Expand Down
45 changes: 43 additions & 2 deletions v2/coord/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (r *RoutingBehaviour) notify(ctx context.Context, ev BehaviourEvent) {
}

case *EventRoutingUpdated:
span.SetAttributes(attribute.String("event", "EventRoutingUpdated"))
span.SetAttributes(attribute.String("event", "EventRoutingUpdated"), attribute.String("nodeid", ev.NodeInfo.ID.String()))
cmd := &routing.EventProbeAdd[kadt.Key, kadt.PeerID]{
NodeID: addrInfoToKadPeerID(ev.NodeInfo),
}
Expand Down Expand Up @@ -201,6 +201,41 @@ func (r *RoutingBehaviour) notify(ctx context.Context, ev BehaviourEvent) {
default:
panic(fmt.Sprintf("unexpected query id: %s", ev.QueryID))
}
case *EventNotifyConnectivity:
span.SetAttributes(attribute.String("event", "EventNotifyConnectivity"), attribute.String("nodeid", ev.NodeInfo.ID.String()))
// ignore self
if ev.NodeInfo.ID == peer.ID(r.self) {
break
}
// tell the include state machine in case this is a new peer that could be added to the routing table
cmd := &routing.EventIncludeAddCandidate[kadt.Key, kadt.PeerID]{
NodeID: kadt.PeerID(ev.NodeInfo.ID),
}
next, ok := r.advanceInclude(ctx, cmd)
if ok {
r.pending = append(r.pending, next)
}

// tell the probe state machine in case there is are connectivity checks that could satisfied
cmdProbe := &routing.EventProbeNotifyConnectivity[kadt.Key, kadt.PeerID]{
NodeID: kadt.PeerID(ev.NodeInfo.ID),
}
nextProbe, ok := r.advanceProbe(ctx, cmdProbe)
if ok {
r.pending = append(r.pending, nextProbe)
}
case *EventNotifyNonConnectivity:
span.SetAttributes(attribute.String("event", "EventNotifyConnectivity"), attribute.String("nodeid", ev.NodeID.String()))

// tell the probe state machine to remove the node from the routing table and probe list
cmdProbe := &routing.EventProbeRemove[kadt.Key, kadt.PeerID]{
NodeID: kadt.PeerID(ev.NodeID),
}
nextProbe, ok := r.advanceProbe(ctx, cmdProbe)
if ok {
r.pending = append(r.pending, nextProbe)
}

default:
panic(fmt.Sprintf("unexpected dht event: %T", ev))
}
Expand Down Expand Up @@ -351,7 +386,13 @@ func (r *RoutingBehaviour) advanceProbe(ctx context.Context, ev routing.ProbeEve
Notify: r,
}, true
case *routing.StateProbeNodeFailure[kadt.Key, kadt.PeerID]:
// a node has failed a connectivity check been removed from the routing table and the probe list
// a node has failed a connectivity check and been removed from the routing table and the probe list

// emit an EventRoutingRemoved event to notify clients that the node has been removed
r.pending = append(r.pending, &EventRoutingRemoved{
NodeID: peer.ID(st.NodeID),
})

// add the node to the inclusion list for a second chance
r.notify(ctx, &EventAddAddrInfo{
NodeInfo: kadPeerIDToAddrInfo(st.NodeID),
Expand Down
14 changes: 14 additions & 0 deletions v2/coord/routing/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/benbjohnson/clock"
"github.com/plprobelab/go-kademlia/kad"
"github.com/plprobelab/go-kademlia/kaderr"
"go.opentelemetry.io/otel/attribute"

"github.com/libp2p/go-libp2p-kad-dht/v2/coord/query"
"github.com/libp2p/go-libp2p-kad-dht/v2/tele"
Expand Down Expand Up @@ -96,6 +97,7 @@ func (b *Bootstrap[K, N]) Advance(ctx context.Context, ev BootstrapEvent) Bootst

switch tev := ev.(type) {
case *EventBootstrapStart[K, N]:
span.SetAttributes(tele.AttrEvent("EventBootstrapStart"))

// TODO: ignore start event if query is already in progress
iter := query.NewClosestNodesIter[K, N](b.self.Key())
Expand All @@ -116,17 +118,21 @@ func (b *Bootstrap[K, N]) Advance(ctx context.Context, ev BootstrapEvent) Bootst
return b.advanceQuery(ctx, nil)

case *EventBootstrapFindCloserResponse[K, N]:
span.SetAttributes(tele.AttrEvent("EventBootstrapFindCloserResponse"))
return b.advanceQuery(ctx, &query.EventQueryFindCloserResponse[K, N]{
NodeID: tev.NodeID,
CloserNodes: tev.CloserNodes,
})
case *EventBootstrapFindCloserFailure[K, N]:
span.SetAttributes(tele.AttrEvent("EventBootstrapFindCloserFailure"))
span.RecordError(tev.Error)
return b.advanceQuery(ctx, &query.EventQueryFindCloserFailure[K, N]{
NodeID: tev.NodeID,
Error: tev.Error,
})

case *EventBootstrapPoll:
span.SetAttributes(tele.AttrEvent("EventBootstrapPoll"))
// ignore, nothing to do
default:
panic(fmt.Sprintf("unexpected event: %T", tev))
Expand All @@ -140,36 +146,44 @@ func (b *Bootstrap[K, N]) Advance(ctx context.Context, ev BootstrapEvent) Bootst
}

func (b *Bootstrap[K, N]) advanceQuery(ctx context.Context, qev query.QueryEvent) BootstrapState {
ctx, span := tele.StartSpan(ctx, "Bootstrap.advanceQuery")
defer span.End()
state := b.qry.Advance(ctx, qev)
switch st := state.(type) {
case *query.StateQueryFindCloser[K, N]:
span.SetAttributes(attribute.String("out_state", "StateQueryFindCloser"))
return &StateBootstrapFindCloser[K, N]{
QueryID: st.QueryID,
Stats: st.Stats,
NodeID: st.NodeID,
Target: st.Target,
}
case *query.StateQueryFinished:
span.SetAttributes(attribute.String("out_state", "StateBootstrapFinished"))
return &StateBootstrapFinished{
Stats: st.Stats,
}
case *query.StateQueryWaitingAtCapacity:
elapsed := b.cfg.Clock.Since(st.Stats.Start)
if elapsed > b.cfg.Timeout {
span.SetAttributes(attribute.String("out_state", "StateBootstrapTimeout"))
return &StateBootstrapTimeout{
Stats: st.Stats,
}
}
span.SetAttributes(attribute.String("out_state", "StateBootstrapWaiting"))
return &StateBootstrapWaiting{
Stats: st.Stats,
}
case *query.StateQueryWaitingWithCapacity:
elapsed := b.cfg.Clock.Since(st.Stats.Start)
if elapsed > b.cfg.Timeout {
span.SetAttributes(attribute.String("out_state", "StateBootstrapTimeout"))
return &StateBootstrapTimeout{
Stats: st.Stats,
}
}
span.SetAttributes(attribute.String("out_state", "StateBootstrapWaiting"))
return &StateBootstrapWaiting{
Stats: st.Stats,
}
Expand Down
Loading

0 comments on commit 1d35505

Please sign in to comment.