Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test query interactions with routing table #887

Merged
merged 30 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
4f0bd42
Test query interactions with routing table
iand Sep 8, 2023
8cf5bbc
v2: upgrade to go1.21
iand Sep 11, 2023
b1b4661
Add uci config
iand Sep 11, 2023
3997993
Use newer uci actions
iand Sep 11, 2023
55a4448
Merge branch 'v2-develop' into v2-query
iand Sep 11, 2023
fa75b51
Merge branch 'v2-develop' into v2-query
iand Sep 11, 2023
bd533e5
Pass clock to coordinator
iand Sep 12, 2023
77ca4ec
Fix merge conflict
iand Sep 12, 2023
d7feea9
Use v2 working directory in actions
iand Sep 12, 2023
1299359
Set go-version input in actions
iand Sep 13, 2023
ce1fa35
Set go-version input in actions
iand Sep 13, 2023
52555e3
Use go 1.20.8 in actions
iand Sep 13, 2023
9838121
Use go 1.21.1 and relative working directory
iand Sep 13, 2023
f08a1ea
Try default working directory on job
iand Sep 13, 2023
bf14d3e
Remove uci.yaml which is not supported yet
iand Sep 13, 2023
0e6e4d2
Try default working directory on job
iand Sep 13, 2023
318fdaa
Try default working directory as input
iand Sep 13, 2023
c5768c2
Restore uci.yaml
iand Sep 13, 2023
576d604
Restore uci.yaml
iand Sep 13, 2023
54cc8e8
Use modified go-check
iand Sep 13, 2023
3826f9d
Use modified go-test
iand Sep 13, 2023
f131d09
Fix go-test
iand Sep 13, 2023
b8662b9
Fix go-test
iand Sep 13, 2023
e94e01e
Fix go-test
iand Sep 13, 2023
22ad479
Merge branch 'v2-develop' into v2-go121
iand Sep 13, 2023
120c051
Merge branch 'v2-go121' into v2-query
iand Sep 13, 2023
26632d7
Update go-kademlia
iand Sep 13, 2023
3118105
Add more tracing
iand Sep 13, 2023
094a4b7
Use go-kademlia trie fix
iand Sep 13, 2023
3ee7700
Merge branch 'v2-develop' into v2-query
iand Sep 16, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions v2/coord/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@

func NewCoordinator(self kadt.PeerID, rtr Router, rt routing.RoutingTableCpl[kadt.Key, kadt.PeerID], cfg *CoordinatorConfig) (*Coordinator, error) {
if cfg == nil {
cfg = DefaultCoordinatorConfig()

Check warning on line 153 in v2/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/coordinator.go#L153

Added line #L153 was not covered by tests
} else if err := cfg.Validate(); err != nil {
return nil, err
}
Expand All @@ -158,8 +158,8 @@
// initialize a new telemetry struct
tele, err := NewTelemetry(cfg.MeterProvider, cfg.TracerProvider)
if err != nil {
return nil, fmt.Errorf("init telemetry: %w", err)
}

Check warning on line 162 in v2/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/coordinator.go#L161-L162

Added lines #L161 - L162 were not covered by tests

qpCfg := query.DefaultPoolConfig()
qpCfg.Clock = cfg.Clock
Expand Down Expand Up @@ -241,13 +241,13 @@
return nil
}

func (c *Coordinator) ID() kadt.PeerID {

Check warning on line 244 in v2/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/coordinator.go#L244

Added line #L244 was not covered by tests
return c.self
}

func (c *Coordinator) Addresses() []ma.Multiaddr {
// TODO: return configured listen addresses
info, err := c.rtr.GetNodeInfo(context.TODO(), peer.ID(c.self))

Check warning on line 250 in v2/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/coordinator.go#L250

Added line #L250 was not covered by tests
if err != nil {
return nil
}
Expand Down Expand Up @@ -308,6 +308,8 @@
// GetNode retrieves the node associated with the given node id from the DHT's local routing table.
// If the node isn't found in the table, it returns ErrNodeNotFound.
func (c *Coordinator) GetNode(ctx context.Context, id peer.ID) (Node, error) {
ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.GetNode")
defer span.End()
if _, exists := c.rt.GetNode(kadt.PeerID(id).Key()); !exists {
return nil, ErrNodeNotFound
}
Expand All @@ -321,6 +323,8 @@

// GetClosestNodes requests the n closest nodes to the key from the node's local routing table.
func (c *Coordinator) GetClosestNodes(ctx context.Context, k kadt.Key, n int) ([]Node, error) {
ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.GetClosestNodes")
defer span.End()
closest := c.rt.NearestNodes(k, n)
nodes := make([]Node, 0, len(closest))
for _, id := range closest {
Expand All @@ -335,7 +339,7 @@

// GetValue requests that the node return any value associated with the supplied key.
// If the node does not have a value for the key it returns ErrValueNotFound.
func (c *Coordinator) GetValue(ctx context.Context, k kadt.Key) (Value, error) {

Check warning on line 342 in v2/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/coordinator.go#L342

Added line #L342 was not covered by tests
panic("not implemented")
}

Expand Down Expand Up @@ -462,3 +466,32 @@

return nil
}

// NotifyConnectivity notifies the coordinator that a peer has passed a connectivity check
// which means it is connected and supports finding closer nodes
func (c *Coordinator) NotifyConnectivity(ctx context.Context, id peer.ID) error {
ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.NotifyConnectivity")
defer span.End()

ai := peer.AddrInfo{
ID: id,
}
c.routingBehaviour.Notify(ctx, &EventNotifyConnectivity{
NodeInfo: ai,
})

return nil
}

// NotifyNonConnectivity notifies the coordinator that a peer has failed a connectivity check
// which means it is not connected and/or it doesn't support finding closer nodes
func (c *Coordinator) NotifyNonConnectivity(ctx context.Context, id peer.ID) error {
ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.NotifyNonConnectivity")
defer span.End()

c.routingBehaviour.Notify(ctx, &EventNotifyNonConnectivity{
NodeID: id,
})

return nil

Check warning on line 496 in v2/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/coordinator.go#L488-L496

Added lines #L488 - L496 were not covered by tests
}
38 changes: 36 additions & 2 deletions v2/coord/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,26 +86,32 @@
func (*EventStopQuery) behaviourEvent() {}
func (*EventStopQuery) queryCommand() {}

// EventAddAddrInfo notifies the routing behaviour of a potential new peer or of additional addresses for
// an existing peer.
type EventAddAddrInfo struct {
NodeInfo peer.AddrInfo
}

func (*EventAddAddrInfo) behaviourEvent() {}
func (*EventAddAddrInfo) routingCommand() {}

// EventGetCloserNodesSuccess notifies a behaviour that a GetCloserNodes request, initiated by an
// [EventOutboundGetCloserNodes] event has produced a successful response.
type EventGetCloserNodesSuccess struct {
QueryID query.QueryID
To peer.AddrInfo
To peer.AddrInfo // To is the peer address that the GetCloserNodes request was sent to.
Target kadt.Key
CloserNodes []peer.AddrInfo
}

func (*EventGetCloserNodesSuccess) behaviourEvent() {}
func (*EventGetCloserNodesSuccess) nodeHandlerResponse() {}

// EventGetCloserNodesFailure notifies a behaviour that a GetCloserNodes request, initiated by an
// [EventOutboundGetCloserNodes] event has failed to produce a valid response.
type EventGetCloserNodesFailure struct {
QueryID query.QueryID
To peer.AddrInfo
To peer.AddrInfo // To is the peer address that the GetCloserNodes request was sent to.
Target kadt.Key
Err error
}
Expand Down Expand Up @@ -141,6 +147,14 @@
func (*EventRoutingUpdated) behaviourEvent() {}
func (*EventRoutingUpdated) routingNotification() {}

// EventRoutingRemoved is emitted by the coordinator when new node has been removed from the routing table.
type EventRoutingRemoved struct {
NodeID peer.ID
}

func (*EventRoutingRemoved) behaviourEvent() {}
func (*EventRoutingRemoved) routingNotification() {}

Check warning on line 156 in v2/coord/event.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/event.go#L155-L156

Added lines #L155 - L156 were not covered by tests

// EventBootstrapFinished is emitted by the coordinator when a bootstrap has finished, either through
// running to completion or by being canceled.
type EventBootstrapFinished struct {
Expand All @@ -149,3 +163,23 @@

func (*EventBootstrapFinished) behaviourEvent() {}
func (*EventBootstrapFinished) routingNotification() {}

// EventNotifyConnectivity notifies a behaviour that a peer's connectivity and support for finding closer nodes
// has been confirmed such as from a successful query response or an inbound query. This should not be used for
// general connections to the host but only when it is confirmed that the peer responds to requests for closer
// nodes.
type EventNotifyConnectivity struct {
NodeInfo peer.AddrInfo
}

func (*EventNotifyConnectivity) behaviourEvent() {}
func (*EventNotifyConnectivity) routingNotification() {}

Check warning on line 176 in v2/coord/event.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/event.go#L175-L176

Added lines #L175 - L176 were not covered by tests

// EventNotifyNonConnectivity notifies a behaviour that a peer does not have connectivity and/or does not support
// finding closer nodes is known.
type EventNotifyNonConnectivity struct {
NodeID peer.ID
}

func (*EventNotifyNonConnectivity) behaviourEvent() {}
func (*EventNotifyNonConnectivity) routingCommand() {}

Check warning on line 185 in v2/coord/event.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/event.go#L184-L185

Added lines #L184 - L185 were not covered by tests
5 changes: 5 additions & 0 deletions v2/coord/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ func (p *PooledQueryBehaviour) Notify(ctx context.Context, ev BehaviourEvent) {
CloserNodes: sliceOfAddrInfoToSliceOfKadPeerID(ev.CloserNodes),
}
case *EventGetCloserNodesFailure:
// queue an event that will notify the routing behaviour of a failed node
p.pending = append(p.pending, &EventNotifyNonConnectivity{
ev.To.ID,
})

cmd = &query.EventPoolFindCloserFailure[kadt.Key, kadt.PeerID]{
NodeID: kadt.PeerID(ev.To.ID),
QueryID: ev.QueryID,
Expand Down
10 changes: 10 additions & 0 deletions v2/coord/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,15 @@

func NewQuery[K kad.Key[K], N kad.NodeID[K]](self N, id QueryID, target K, iter NodeIter[K, N], knownClosestNodes []N, cfg *QueryConfig[K]) (*Query[K, N], error) {
if cfg == nil {
cfg = DefaultQueryConfig[K]()

Check warning on line 96 in v2/coord/query/query.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/query/query.go#L96

Added line #L96 was not covered by tests
} else if err := cfg.Validate(); err != nil {
return nil, err
}

Check warning on line 99 in v2/coord/query/query.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/query/query.go#L98-L99

Added lines #L98 - L99 were not covered by tests

for _, node := range knownClosestNodes {
// exclude self from closest nodes
if key.Equal(node.Key(), self.Key()) {
continue

Check warning on line 104 in v2/coord/query/query.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/query/query.go#L104

Added line #L104 was not covered by tests
}
iter.Add(&NodeStatus[K, N]{
NodeID: node,
Expand Down Expand Up @@ -130,19 +130,23 @@

switch tev := ev.(type) {
case *EventQueryCancel:
span.SetAttributes(tele.AttrEvent("EventQueryCancel"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could have something like:

span.SetAttributes(tele.AttrEvent(fmt.Sprintf("%T", ev)))

at the top of this method. fmt is already imported.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or add it right to L122

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wary of adding allocations for tracing since they will affect performance

q.markFinished()
return &StateQueryFinished{
QueryID: q.id,
Stats: q.stats,
}
case *EventQueryFindCloserResponse[K, N]:
span.SetAttributes(tele.AttrEvent("EventQueryFindCloserResponse"))
q.onMessageResponse(ctx, tev.NodeID, tev.CloserNodes)
case *EventQueryFindCloserFailure[K, N]:
span.SetAttributes(tele.AttrEvent("EventQueryFindCloserFailure"))
span.RecordError(tev.Error)
q.onMessageFailure(ctx, tev.NodeID)
case nil:
// TEMPORARY: no event to process
default:
panic(fmt.Sprintf("unexpected event: %T", tev))

Check warning on line 149 in v2/coord/query/query.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/query/query.go#L148-L149

Added lines #L148 - L149 were not covered by tests
}

// count number of successes in the order of the iteration
Expand Down Expand Up @@ -170,6 +174,7 @@
q.inFlight--
q.stats.Failure++
} else if atCapacity() {
span.SetAttributes(tele.AttrOutEvent("StateQueryWaitingAtCapacity")) // this is the query's tracing span
returnState = &StateQueryWaitingAtCapacity{
QueryID: q.id,
Stats: q.stats,
Expand All @@ -186,6 +191,7 @@
// If it has contacted at least NumResults nodes successfully then the iteration is done.
if !progressing && successes >= q.cfg.NumResults {
q.markFinished()
span.SetAttributes(tele.AttrOutEvent("StateQueryFinished")) // this is the query's tracing span
returnState = &StateQueryFinished{
QueryID: q.id,
Stats: q.stats,
Expand All @@ -202,6 +208,7 @@
if q.stats.Start.IsZero() {
q.stats.Start = q.cfg.Clock.Now()
}
span.SetAttributes(tele.AttrOutEvent("StateQueryFindCloser")) // this is the query's tracing span
returnState = &StateQueryFindCloser[K, N]{
NodeID: ni.NodeID,
QueryID: q.id,
Expand All @@ -211,17 +218,18 @@
return true

}
span.SetAttributes(tele.AttrOutEvent("StateQueryWaitingAtCapacity")) // this is the query's tracing span
returnState = &StateQueryWaitingAtCapacity{
QueryID: q.id,
Stats: q.stats,
}
return true

Check warning on line 226 in v2/coord/query/query.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/query/query.go#L221-L226

Added lines #L221 - L226 were not covered by tests
case *StateNodeUnresponsive:
// ignore
case *StateNodeFailed:
// ignore
default:
panic(fmt.Sprintf("unexpected state: %T", ni.State))

Check warning on line 232 in v2/coord/query/query.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/query/query.go#L231-L232

Added lines #L231 - L232 were not covered by tests
}

return false
Expand All @@ -233,6 +241,7 @@

if q.inFlight > 0 {
// The iterator is still waiting for results and not at capacity
span.SetAttributes(tele.AttrOutEvent("StateQueryWaitingWithCapacity"))
return &StateQueryWaitingWithCapacity{
QueryID: q.id,
Stats: q.stats,
Expand All @@ -242,6 +251,7 @@
// The iterator is finished because all available nodes have been contacted
// and the iterator is not waiting for any more results.
q.markFinished()
span.SetAttributes(tele.AttrOutEvent("StateQueryFinished"))
return &StateQueryFinished{
QueryID: q.id,
Stats: q.stats,
Expand All @@ -266,20 +276,20 @@
case *StateNodeWaiting:
q.inFlight--
q.stats.Success++
case *StateNodeUnresponsive:
q.stats.Success++

Check warning on line 280 in v2/coord/query/query.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/query/query.go#L279-L280

Added lines #L279 - L280 were not covered by tests

case *StateNodeNotContacted:
// ignore duplicate or late response
return
case *StateNodeFailed:
// ignore duplicate or late response
return
case *StateNodeSucceeded:
// ignore duplicate or late response
return
default:
panic(fmt.Sprintf("unexpected state: %T", st))

Check warning on line 292 in v2/coord/query/query.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/query/query.go#L282-L292

Added lines #L282 - L292 were not covered by tests
}

// add closer nodes to list
Expand All @@ -300,27 +310,27 @@
func (q *Query[K, N]) onMessageFailure(ctx context.Context, node N) {
ni, found := q.iter.Find(node.Key())
if !found {
// got a rogue message
return
}

Check warning on line 315 in v2/coord/query/query.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/query/query.go#L313-L315

Added lines #L313 - L315 were not covered by tests
switch st := ni.State.(type) {
case *StateNodeWaiting:
q.inFlight--
q.stats.Failure++
case *StateNodeUnresponsive:
// update node state to failed
break
case *StateNodeNotContacted:
// update node state to failed
break
case *StateNodeFailed:
// ignore duplicate or late response
return
case *StateNodeSucceeded:
// ignore duplicate or late response
return
default:
panic(fmt.Sprintf("unexpected state: %T", st))

Check warning on line 333 in v2/coord/query/query.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/query/query.go#L320-L333

Added lines #L320 - L333 were not covered by tests
}

ni.State = &StateNodeFailed{}
Expand Down Expand Up @@ -357,10 +367,10 @@
}

// queryState() ensures that only [Query] states can be assigned to a QueryState.
func (*StateQueryFinished) queryState() {}
func (*StateQueryFindCloser[K, N]) queryState() {}
func (*StateQueryWaitingAtCapacity) queryState() {}
func (*StateQueryWaitingWithCapacity) queryState() {}

Check warning on line 373 in v2/coord/query/query.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/query/query.go#L370-L373

Added lines #L370 - L373 were not covered by tests

type QueryEvent interface {
queryEvent()
Expand All @@ -382,6 +392,6 @@
}

// queryEvent() ensures that only events accepted by [Query] can be assigned to a [QueryEvent].
func (*EventQueryCancel) queryEvent() {}
func (*EventQueryFindCloserResponse[K, N]) queryEvent() {}
func (*EventQueryFindCloserFailure[K, N]) queryEvent() {}

Check warning on line 397 in v2/coord/query/query.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/query/query.go#L395-L397

Added lines #L395 - L397 were not covered by tests
45 changes: 43 additions & 2 deletions v2/coord/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@
}

case *EventRoutingUpdated:
span.SetAttributes(attribute.String("event", "EventRoutingUpdated"))
span.SetAttributes(attribute.String("event", "EventRoutingUpdated"), attribute.String("nodeid", ev.NodeInfo.ID.String()))
cmd := &routing.EventProbeAdd[kadt.Key, kadt.PeerID]{
NodeID: addrInfoToKadPeerID(ev.NodeInfo),
}
Expand Down Expand Up @@ -130,10 +130,10 @@
NodeID: kadt.PeerID(ev.To.ID),
}
} else {
cmd = &routing.EventIncludeConnectivityCheckFailure[kadt.Key, kadt.PeerID]{
NodeID: kadt.PeerID(ev.To.ID),
Error: fmt.Errorf("response did not include any closer nodes"),
}

Check warning on line 136 in v2/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/routing.go#L133-L136

Added lines #L133 - L136 were not covered by tests
}
// attempt to advance the include
next, ok := r.advanceInclude(ctx, cmd)
Expand All @@ -142,17 +142,17 @@
}

case "probe":
var cmd routing.ProbeEvent
// require that the node responded with at least one closer node
if len(ev.CloserNodes) > 0 {
cmd = &routing.EventProbeConnectivityCheckSuccess[kadt.Key, kadt.PeerID]{
NodeID: kadt.PeerID(ev.To.ID),
}
} else {
cmd = &routing.EventProbeConnectivityCheckFailure[kadt.Key, kadt.PeerID]{
NodeID: kadt.PeerID(ev.To.ID),
Error: fmt.Errorf("response did not include any closer nodes"),
}

Check warning on line 155 in v2/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/routing.go#L145-L155

Added lines #L145 - L155 were not covered by tests
}
// attempt to advance the probe state machine
next, ok := r.advanceProbe(ctx, cmd)
Expand Down Expand Up @@ -188,9 +188,9 @@
r.pending = append(r.pending, next)
}
case "probe":
cmd := &routing.EventProbeConnectivityCheckFailure[kadt.Key, kadt.PeerID]{
NodeID: kadt.PeerID(ev.To.ID),
Error: ev.Err,

Check warning on line 193 in v2/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/routing.go#L191-L193

Added lines #L191 - L193 were not covered by tests
}
// attempt to advance the probe state machine
next, ok := r.advanceProbe(ctx, cmd)
Expand All @@ -201,6 +201,41 @@
default:
panic(fmt.Sprintf("unexpected query id: %s", ev.QueryID))
}
case *EventNotifyConnectivity:
span.SetAttributes(attribute.String("event", "EventNotifyConnectivity"), attribute.String("nodeid", ev.NodeInfo.ID.String()))
// ignore self
if ev.NodeInfo.ID == peer.ID(r.self) {
break

Check warning on line 208 in v2/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/routing.go#L208

Added line #L208 was not covered by tests
}
// tell the include state machine in case this is a new peer that could be added to the routing table
cmd := &routing.EventIncludeAddCandidate[kadt.Key, kadt.PeerID]{
NodeID: kadt.PeerID(ev.NodeInfo.ID),
}
next, ok := r.advanceInclude(ctx, cmd)
if ok {
r.pending = append(r.pending, next)
}

// tell the probe state machine in case there is are connectivity checks that could satisfied
cmdProbe := &routing.EventProbeNotifyConnectivity[kadt.Key, kadt.PeerID]{
NodeID: kadt.PeerID(ev.NodeInfo.ID),
}
nextProbe, ok := r.advanceProbe(ctx, cmdProbe)
if ok {
r.pending = append(r.pending, nextProbe)
}

Check warning on line 226 in v2/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/routing.go#L225-L226

Added lines #L225 - L226 were not covered by tests
case *EventNotifyNonConnectivity:
span.SetAttributes(attribute.String("event", "EventNotifyConnectivity"), attribute.String("nodeid", ev.NodeID.String()))

// tell the probe state machine to remove the node from the routing table and probe list
cmdProbe := &routing.EventProbeRemove[kadt.Key, kadt.PeerID]{
NodeID: kadt.PeerID(ev.NodeID),
}
nextProbe, ok := r.advanceProbe(ctx, cmdProbe)
if ok {
r.pending = append(r.pending, nextProbe)
}

Check warning on line 237 in v2/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/routing.go#L236-L237

Added lines #L236 - L237 were not covered by tests

default:
panic(fmt.Sprintf("unexpected dht event: %T", ev))
}
Expand Down Expand Up @@ -351,7 +386,13 @@
Notify: r,
}, true
case *routing.StateProbeNodeFailure[kadt.Key, kadt.PeerID]:
// a node has failed a connectivity check been removed from the routing table and the probe list
// a node has failed a connectivity check and been removed from the routing table and the probe list

// emit an EventRoutingRemoved event to notify clients that the node has been removed
r.pending = append(r.pending, &EventRoutingRemoved{
NodeID: peer.ID(st.NodeID),
})

// add the node to the inclusion list for a second chance
r.notify(ctx, &EventAddAddrInfo{
NodeInfo: kadPeerIDToAddrInfo(st.NodeID),
Expand Down
14 changes: 14 additions & 0 deletions v2/coord/routing/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"github.com/benbjohnson/clock"
"github.com/plprobelab/go-kademlia/kad"
"github.com/plprobelab/go-kademlia/kaderr"
"go.opentelemetry.io/otel/attribute"

"github.com/libp2p/go-libp2p-kad-dht/v2/coord/query"
"github.com/libp2p/go-libp2p-kad-dht/v2/tele"
Expand Down Expand Up @@ -78,10 +79,10 @@

func NewBootstrap[K kad.Key[K], N kad.NodeID[K]](self N, cfg *BootstrapConfig[K]) (*Bootstrap[K, N], error) {
if cfg == nil {
cfg = DefaultBootstrapConfig[K]()

Check warning on line 82 in v2/coord/routing/bootstrap.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/routing/bootstrap.go#L82

Added line #L82 was not covered by tests
} else if err := cfg.Validate(); err != nil {
return nil, err
}

Check warning on line 85 in v2/coord/routing/bootstrap.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/routing/bootstrap.go#L84-L85

Added lines #L84 - L85 were not covered by tests

return &Bootstrap[K, N]{
self: self,
Expand All @@ -96,6 +97,7 @@

switch tev := ev.(type) {
case *EventBootstrapStart[K, N]:
span.SetAttributes(tele.AttrEvent("EventBootstrapStart"))

// TODO: ignore start event if query is already in progress
iter := query.NewClosestNodesIter[K, N](b.self.Key())
Expand All @@ -109,27 +111,31 @@

qry, err := query.NewQuery[K, N](b.self, queryID, b.self.Key(), iter, tev.KnownClosestNodes, qryCfg)
if err != nil {
// TODO: don't panic
panic(err)

Check warning on line 115 in v2/coord/routing/bootstrap.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/routing/bootstrap.go#L114-L115

Added lines #L114 - L115 were not covered by tests
}
b.qry = qry
return b.advanceQuery(ctx, nil)

case *EventBootstrapFindCloserResponse[K, N]:
span.SetAttributes(tele.AttrEvent("EventBootstrapFindCloserResponse"))
return b.advanceQuery(ctx, &query.EventQueryFindCloserResponse[K, N]{
NodeID: tev.NodeID,
CloserNodes: tev.CloserNodes,
})
case *EventBootstrapFindCloserFailure[K, N]:
span.SetAttributes(tele.AttrEvent("EventBootstrapFindCloserFailure"))
span.RecordError(tev.Error)
return b.advanceQuery(ctx, &query.EventQueryFindCloserFailure[K, N]{
NodeID: tev.NodeID,
Error: tev.Error,
})

Check warning on line 132 in v2/coord/routing/bootstrap.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/routing/bootstrap.go#L126-L132

Added lines #L126 - L132 were not covered by tests

case *EventBootstrapPoll:
span.SetAttributes(tele.AttrEvent("EventBootstrapPoll"))
// ignore, nothing to do
default:
panic(fmt.Sprintf("unexpected event: %T", tev))

Check warning on line 138 in v2/coord/routing/bootstrap.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/routing/bootstrap.go#L137-L138

Added lines #L137 - L138 were not covered by tests
}

if b.qry != nil {
Expand All @@ -140,41 +146,49 @@
}

func (b *Bootstrap[K, N]) advanceQuery(ctx context.Context, qev query.QueryEvent) BootstrapState {
ctx, span := tele.StartSpan(ctx, "Bootstrap.advanceQuery")
defer span.End()
state := b.qry.Advance(ctx, qev)
switch st := state.(type) {
case *query.StateQueryFindCloser[K, N]:
span.SetAttributes(attribute.String("out_state", "StateQueryFindCloser"))
return &StateBootstrapFindCloser[K, N]{
QueryID: st.QueryID,
Stats: st.Stats,
NodeID: st.NodeID,
Target: st.Target,
}
case *query.StateQueryFinished:
span.SetAttributes(attribute.String("out_state", "StateBootstrapFinished"))
return &StateBootstrapFinished{
Stats: st.Stats,
}
case *query.StateQueryWaitingAtCapacity:
elapsed := b.cfg.Clock.Since(st.Stats.Start)
if elapsed > b.cfg.Timeout {
span.SetAttributes(attribute.String("out_state", "StateBootstrapTimeout"))
return &StateBootstrapTimeout{
Stats: st.Stats,
}
}

Check warning on line 173 in v2/coord/routing/bootstrap.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/routing/bootstrap.go#L169-L173

Added lines #L169 - L173 were not covered by tests
span.SetAttributes(attribute.String("out_state", "StateBootstrapWaiting"))
return &StateBootstrapWaiting{
Stats: st.Stats,
}
case *query.StateQueryWaitingWithCapacity:
elapsed := b.cfg.Clock.Since(st.Stats.Start)
if elapsed > b.cfg.Timeout {
span.SetAttributes(attribute.String("out_state", "StateBootstrapTimeout"))
return &StateBootstrapTimeout{
Stats: st.Stats,
}
}

Check warning on line 185 in v2/coord/routing/bootstrap.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/routing/bootstrap.go#L181-L185

Added lines #L181 - L185 were not covered by tests
span.SetAttributes(attribute.String("out_state", "StateBootstrapWaiting"))
return &StateBootstrapWaiting{
Stats: st.Stats,
}
default:
panic(fmt.Sprintf("unexpected state: %T", st))

Check warning on line 191 in v2/coord/routing/bootstrap.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/routing/bootstrap.go#L190-L191

Added lines #L190 - L191 were not covered by tests
}
}

Expand Down Expand Up @@ -210,11 +224,11 @@
}

// bootstrapState() ensures that only Bootstrap states can be assigned to a BootstrapState.
func (*StateBootstrapFindCloser[K, N]) bootstrapState() {}
func (*StateBootstrapIdle) bootstrapState() {}
func (*StateBootstrapFinished) bootstrapState() {}
func (*StateBootstrapTimeout) bootstrapState() {}
func (*StateBootstrapWaiting) bootstrapState() {}

Check warning on line 231 in v2/coord/routing/bootstrap.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/routing/bootstrap.go#L227-L231

Added lines #L227 - L231 were not covered by tests

// BootstrapEvent is an event intended to advance the state of a bootstrap.
type BootstrapEvent interface {
Expand Down Expand Up @@ -242,7 +256,7 @@
}

// bootstrapEvent() ensures that only events accepted by a [Bootstrap] can be assigned to the [BootstrapEvent] interface.
func (*EventBootstrapPoll) bootstrapEvent() {}
func (*EventBootstrapStart[K, N]) bootstrapEvent() {}
func (*EventBootstrapFindCloserResponse[K, N]) bootstrapEvent() {}
func (*EventBootstrapFindCloserFailure[K, N]) bootstrapEvent() {}

Check warning on line 262 in v2/coord/routing/bootstrap.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/routing/bootstrap.go#L259-L262

Added lines #L259 - L262 were not covered by tests
Loading
Loading