Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: avoid panic when node is re-added to probe list #902

Merged
merged 1 commit into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions v2/coord/routing/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,10 @@

func NewProbe[K kad.Key[K], N kad.NodeID[K]](rt RoutingTableCpl[K, N], cfg *ProbeConfig) (*Probe[K, N], error) {
if cfg == nil {
cfg = DefaultProbeConfig()

Check warning on line 123 in v2/coord/routing/probe.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/routing/probe.go#L123

Added line #L123 was not covered by tests
} else if err := cfg.Validate(); err != nil {
return nil, err
}

Check warning on line 126 in v2/coord/routing/probe.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/routing/probe.go#L125-L126

Added lines #L125 - L126 were not covered by tests

return &Probe[K, N]{
cfg: *cfg,
Expand Down Expand Up @@ -171,9 +171,9 @@
span.SetAttributes(tele.AttrEvent("EventProbeMessageResponse"), attribute.String("nodeid", tev.NodeID.String()))
nv, found := p.nvl.Get(tev.NodeID)
if !found {
// ignore message for unknown node, which might have been removed
span.RecordError(errors.New("node not in node value list"))
break

Check warning on line 176 in v2/coord/routing/probe.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/routing/probe.go#L174-L176

Added lines #L174 - L176 were not covered by tests
}
// update next check time
nv.NextCheckDue = p.cfg.Clock.Now().Add(p.cfg.CheckInterval)
Expand Down Expand Up @@ -204,8 +204,8 @@
// put into list, which will clear any ongoing check too
p.nvl.Put(nv)

default:
panic(fmt.Sprintf("unexpected event: %T", tev))

Check warning on line 208 in v2/coord/routing/probe.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/routing/probe.go#L207-L208

Added lines #L207 - L208 were not covered by tests
}

// Check if there is capacity
Expand Down Expand Up @@ -276,11 +276,11 @@
}

// probeState() ensures that only Probe states can be assigned to the ProbeState interface.
func (*StateProbeConnectivityCheck[K, N]) probeState() {}
func (*StateProbeIdle) probeState() {}
func (*StateProbeWaitingAtCapacity) probeState() {}
func (*StateProbeWaitingWithCapacity) probeState() {}
func (*StateProbeNodeFailure[K, N]) probeState() {}

Check warning on line 283 in v2/coord/routing/probe.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/routing/probe.go#L279-L283

Added lines #L279 - L283 were not covered by tests

// ProbeEvent is an event intended to advance the state of a probe.
type ProbeEvent interface {
Expand Down Expand Up @@ -317,12 +317,12 @@
}

// probeEvent() ensures that only events accepted by a [Probe] can be assigned to the [ProbeEvent] interface.
func (*EventProbePoll) probeEvent() {}
func (*EventProbeAdd[K, N]) probeEvent() {}
func (*EventProbeRemove[K, N]) probeEvent() {}
func (*EventProbeConnectivityCheckSuccess[K, N]) probeEvent() {}
func (*EventProbeConnectivityCheckFailure[K, N]) probeEvent() {}
func (*EventProbeNotifyConnectivity[K, N]) probeEvent() {}

Check warning on line 325 in v2/coord/routing/probe.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/routing/probe.go#L320-L325

Added lines #L320 - L325 were not covered by tests

type nodeValue[K kad.Key[K], N kad.NodeID[K]] struct {
NodeID N
Expand All @@ -334,11 +334,12 @@

type nodeValueEntry[K kad.Key[K], N kad.NodeID[K]] struct {
nv *nodeValue[K, N]
index int // the index of the item in the ordering
index int // the index of the item in the ordering, set to -1 when the item is popped from the heap
}

type nodeValueList[K kad.Key[K], N kad.NodeID[K]] struct {
nodes map[string]*nodeValueEntry[K, N]
nodes map[string]*nodeValueEntry[K, N]
// pending is a list of nodes ordered by the time of the next check
pending *nodeValuePendingList[K, N]
// ongoing is a list of nodes with ongoing/in-progress probes, loosely ordered earliest to most recent
ongoing []N
Expand All @@ -359,14 +360,20 @@
nve, exists := l.nodes[mk]
if !exists {
nve = &nodeValueEntry[K, N]{
nv: nv,
nv: nv,
index: -1,
}
l.nodes[mk] = nve
} else {
nve.nv = nv
heap.Remove(l.pending, nve.index)
}
heap.Push(l.pending, nve)
l.nodes[mk] = nve

// nve.index is -1 when the node is not already in the pending list
// this could be because it is new or if there is an ongoing check
if nve.index == -1 {
heap.Push(l.pending, nve)
}

heap.Fix(l.pending, nve.index)
l.removeFromOngoing(nv.NodeID)
}
Expand Down Expand Up @@ -415,9 +422,9 @@
mk := key.HexString(n.Key())
nve, ok := l.nodes[mk]
if !ok {
// somehow the node doesn't exist so this is an obvious candidate for removal
return n, true
}

Check warning on line 427 in v2/coord/routing/probe.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/routing/probe.go#L425-L427

Added lines #L425 - L427 were not covered by tests
if !nve.nv.CheckDeadline.After(ts) {
return n, true
}
Expand All @@ -431,9 +438,9 @@
for i := range l.ongoing {
if key.Equal(n.Key(), l.ongoing[i].Key()) {
if len(l.ongoing) > 1 {
// swap with last entry
l.ongoing[i], l.ongoing[len(l.ongoing)-1] = l.ongoing[len(l.ongoing)-1], l.ongoing[i]
}

Check warning on line 443 in v2/coord/routing/probe.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/routing/probe.go#L441-L443

Added lines #L441 - L443 were not covered by tests
// remove last entry
var v N
l.ongoing[len(l.ongoing)-1] = v
Expand Down Expand Up @@ -466,8 +473,8 @@
mk := key.HexString(n.Key())
nve, ok := l.nodes[mk]
if !ok {
return
}

Check warning on line 477 in v2/coord/routing/probe.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/routing/probe.go#L476-L477

Added lines #L476 - L477 were not covered by tests
nve.nv.CheckDeadline = deadline
l.nodes[mk] = nve
heap.Remove(l.pending, nve.index)
Expand Down Expand Up @@ -502,8 +509,8 @@

func (o *nodeValuePendingList[K, N]) Pop() any {
if len(*o) == 0 {
return nil
}

Check warning on line 513 in v2/coord/routing/probe.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/routing/probe.go#L512-L513

Added lines #L512 - L513 were not covered by tests
old := *o
n := len(old)
v := old[n-1]
Expand Down
22 changes: 21 additions & 1 deletion v2/coord/routing/probe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ func TestNodeValueList(t *testing.T) {
require.Equal(t, 0, l.OngoingCount())
require.Equal(t, 1, l.NodeCount())

l.MarkOngoing(tiny.NewNode(5), clk.Now().Add(time.Minute))
l.MarkOngoing(nv1.NodeID, clk.Now().Add(time.Minute))
require.Equal(t, 0, l.PendingCount())
require.Equal(t, 1, l.OngoingCount())
require.Equal(t, 1, l.NodeCount())
Expand Down Expand Up @@ -582,6 +582,26 @@ func TestNodeValueList(t *testing.T) {
require.Equal(t, 0, l.OngoingCount())
require.Equal(t, 1, l.NodeCount())
})

t.Run("mark ongoing pending mixed", func(t *testing.T) {
t.Parallel()

clk := clock.NewMock()
l := NewNodeValueList[tiny.Key, tiny.Node]()
nv1 := &nodeValue[tiny.Key, tiny.Node]{
NodeID: tiny.NewNode(5),
NextCheckDue: clk.Now().Add(time.Minute),
}
nv2 := &nodeValue[tiny.Key, tiny.Node]{
NodeID: tiny.NewNode(6),
NextCheckDue: clk.Now().Add(time.Minute),
}

l.Put(nv1)
l.Put(nv2)
l.MarkOngoing(nv1.NodeID, clk.Now().Add(time.Minute))
l.Put(nv1)
})
}

func TestProbeConnectivityCheckSuccess(t *testing.T) {
Expand Down
Loading