Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve query capabilities #932

Merged
merged 5 commits into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 75 additions & 16 deletions v2/coord/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"time"

"github.com/benbjohnson/clock"
uuid "github.com/google/uuid"
logging "github.com/ipfs/go-log/v2"
"github.com/plprobelab/go-kademlia/kad"
"github.com/plprobelab/go-kademlia/kaderr"
Expand Down Expand Up @@ -167,7 +168,7 @@

func NewCoordinator(self kadt.PeerID, rtr Router[kadt.Key, kadt.PeerID, *pb.Message], rt routing.RoutingTableCpl[kadt.Key, kadt.PeerID], cfg *CoordinatorConfig) (*Coordinator, error) {
if cfg == nil {
cfg = DefaultCoordinatorConfig()

Check warning on line 171 in v2/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/coordinator.go#L171

Added line #L171 was not covered by tests
} else if err := cfg.Validate(); err != nil {
return nil, err
}
Expand All @@ -175,8 +176,8 @@
// initialize a new telemetry struct
tele, err := NewTelemetry(cfg.MeterProvider, cfg.TracerProvider)
if err != nil {
return nil, fmt.Errorf("init telemetry: %w", err)
}

Check warning on line 180 in v2/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/coordinator.go#L179-L180

Added lines #L179 - L180 were not covered by tests

qpCfg := query.DefaultPoolConfig()
qpCfg.Clock = cfg.Clock
Expand All @@ -185,7 +186,7 @@
qpCfg.QueryConcurrency = cfg.RequestConcurrency
qpCfg.RequestTimeout = cfg.RequestTimeout

qp, err := query.NewPool[kadt.Key](self, qpCfg)
qp, err := query.NewPool[kadt.Key, kadt.PeerID, *pb.Message](self, qpCfg)
if err != nil {
return nil, fmt.Errorf("query pool: %w", err)
}
Expand All @@ -197,7 +198,7 @@
bootstrapCfg.RequestConcurrency = cfg.RequestConcurrency
bootstrapCfg.RequestTimeout = cfg.RequestTimeout

bootstrap, err := routing.NewBootstrap[kadt.Key](kadt.PeerID(self), bootstrapCfg)
bootstrap, err := routing.NewBootstrap(kadt.PeerID(self), bootstrapCfg)
if err != nil {
return nil, fmt.Errorf("bootstrap: %w", err)
}
Expand Down Expand Up @@ -259,7 +260,7 @@
return nil
}

func (c *Coordinator) ID() kadt.PeerID {

Check warning on line 263 in v2/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/coordinator.go#L263

Added line #L263 was not covered by tests
return c.self
}

Expand Down Expand Up @@ -341,7 +342,7 @@

// GetValue requests that the node return any value associated with the supplied key.
// If the node does not have a value for the key it returns ErrValueNotFound.
func (c *Coordinator) GetValue(ctx context.Context, k kadt.Key) (Value, error) {

Check warning on line 345 in v2/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/coordinator.go#L345

Added line #L345 was not covered by tests
panic("not implemented")
}

Expand All @@ -351,8 +352,18 @@
panic("not implemented")
}

// Query traverses the DHT calling fn for each node visited.
func (c *Coordinator) Query(ctx context.Context, target kadt.Key, fn QueryFunc) (QueryStats, error) {
// QueryClosest starts a query that attempts to find the closest nodes to the target key.
// It returns the closest nodes found to the target key and statistics on the actions of the query.
//
// The supplied [QueryFunc] is called after each successful request to a node with the ID of the node,
// the response received from the find nodes request made to the node and the current query stats. The query
// terminates when [QueryFunc] returns an error or when the query has visited the configured minimum number
// of closest nodes (default 20)
//
// numResults specifies the minimum number of nodes to successfully contact before considering iteration complete.
// The query is considered to be exhausted when it has received responses from at least this number of nodes
// and there are no closer nodes remaining to be contacted. A default of 20 is used if this value is less than 1.
func (c *Coordinator) QueryClosest(ctx context.Context, target kadt.Key, fn QueryFunc, numResults int) ([]kadt.PeerID, QueryStats, error) {
ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.Query")
defer span.End()

Expand All @@ -361,7 +372,7 @@

seeds, err := c.GetClosestNodes(ctx, target, 20)
if err != nil {
return QueryStats{}, err
return nil, QueryStats{}, err

Check warning on line 375 in v2/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/coordinator.go#L375

Added line #L375 was not covered by tests
}

seedIDs := make([]kadt.PeerID, 0, len(seeds))
Expand All @@ -370,23 +381,75 @@
}

waiter := NewWaiter[BehaviourEvent]()
queryID := query.QueryID("foo") // TODO: choose query ID
queryID := query.QueryID(uuid.New().String())
dennis-tra marked this conversation as resolved.
Show resolved Hide resolved

cmd := &EventStartQuery{
cmd := &EventStartFindCloserQuery{
QueryID: queryID,
Target: target,
KnownClosestNodes: seedIDs,
Notify: waiter,
NumResults: numResults,
}

// queue the start of the query
c.queryBehaviour.Notify(ctx, cmd)

return c.waitForQuery(ctx, queryID, waiter, fn)
}

// QueryMessage starts a query that iterates over the closest nodes to the target key in the supplied message.
// The message is sent to each node that is visited.
//
// The supplied [QueryFunc] is called after each successful request to a node with the ID of the node,
// the response received from the find nodes request made to the node and the current query stats. The query
// terminates when [QueryFunc] returns an error or when the query has visited the configured minimum number
// of closest nodes (default 20)
//
// numResults specifies the minimum number of nodes to successfully contact before considering iteration complete.
// The query is considered to be exhausted when it has received responses from at least this number of nodes
// and there are no closer nodes remaining to be contacted. A default of 20 is used if this value is less than 1.
func (c *Coordinator) QueryMessage(ctx context.Context, msg *pb.Message, fn QueryFunc, numResults int) (QueryStats, error) {
ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.QueryMessage")
defer span.End()

ctx, cancel := context.WithCancel(ctx)
defer cancel()

seeds, err := c.GetClosestNodes(ctx, msg.Target(), 20)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should make 20 configurable

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, this number is like a global parameter. If we set this here to e.g. 30, every other peer would still respond with the 20 closest peers it knows. The remaining ten that we receive are just other random close peers. @guillaumemichel devised a great technique to query for a wider range of peers which is more elaborate: https://www.notion.so/pl-strflt/N-closest-peers-83038476d00f4af1bfb40aeeefec39f5?pvs=4

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it should be configurable. Though here the call to GetClosestNodes is getting the 20 closest nodes from our own routing table to seed the query.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indirectly used the DHT's bucketsize config value

if err != nil {
return QueryStats{}, err
}

Check warning on line 421 in v2/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/coordinator.go#L420-L421

Added lines #L420 - L421 were not covered by tests

seedIDs := make([]kadt.PeerID, 0, len(seeds))
for _, s := range seeds {
seedIDs = append(seedIDs, kadt.PeerID(s.ID()))
}

waiter := NewWaiter[BehaviourEvent]()
queryID := query.QueryID(uuid.New().String())

cmd := &EventStartMessageQuery{
QueryID: queryID,
Target: msg.Target(),
Message: msg,
KnownClosestNodes: seedIDs,
Notify: waiter,
NumResults: numResults,
}

// queue the start of the query
c.queryBehaviour.Notify(ctx, cmd)

_, stats, err := c.waitForQuery(ctx, queryID, waiter, fn)
return stats, err
}

func (c *Coordinator) waitForQuery(ctx context.Context, queryID query.QueryID, waiter *Waiter[BehaviourEvent], fn QueryFunc) ([]kadt.PeerID, QueryStats, error) {
var lastStats QueryStats
for {
select {
case <-ctx.Done():
return lastStats, ctx.Err()
return nil, lastStats, ctx.Err()

Check warning on line 452 in v2/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/coordinator.go#L452

Added line #L452 was not covered by tests
case wev := <-waiter.Chan():
ctx, ev := wev.Ctx, wev.Event
switch ev := ev.(type) {
Expand All @@ -403,26 +466,22 @@
break
}

err = fn(ctx, nh, lastStats)
err = fn(ctx, nh.ID(), ev.Response, lastStats)
if errors.Is(err, ErrSkipRemaining) {
// done
c.queryBehaviour.Notify(ctx, &EventStopQuery{QueryID: queryID})
return lastStats, nil
}
if errors.Is(err, ErrSkipNode) {
// TODO: don't add closer nodes from this node
break
return nil, lastStats, nil
}
if err != nil {
// user defined error that terminates the query
c.queryBehaviour.Notify(ctx, &EventStopQuery{QueryID: queryID})
return lastStats, err
return nil, lastStats, err

Check warning on line 478 in v2/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/coordinator.go#L478

Added line #L478 was not covered by tests
}

case *EventQueryFinished:
// query is done
lastStats.Exhausted = true
return lastStats, nil
return ev.ClosestNodes, lastStats, nil

default:
panic(fmt.Sprintf("unexpected event: %T", ev))
Expand Down Expand Up @@ -479,15 +538,15 @@

// NotifyNonConnectivity notifies the coordinator that a peer has failed a connectivity check
// which means it is not connected and/or it doesn't support finding closer nodes
func (c *Coordinator) NotifyNonConnectivity(ctx context.Context, id kadt.PeerID) error {
ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.NotifyNonConnectivity")
defer span.End()

c.routingBehaviour.Notify(ctx, &EventNotifyNonConnectivity{
NodeID: id,
})

return nil

Check warning on line 549 in v2/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/coordinator.go#L541-L549

Added lines #L541 - L549 were not covered by tests
}

// A BufferedRoutingNotifier is a [RoutingNotifier] that buffers [RoutingNotification] events and provides methods
Expand Down Expand Up @@ -530,8 +589,8 @@

// wait to be signaled that there is a new event
select {
case <-ctx.Done():
return nil, fmt.Errorf("test deadline exceeded while waiting for event %T", expected)

Check warning on line 593 in v2/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/coordinator.go#L592-L593

Added lines #L592 - L593 were not covered by tests
case <-w.signal:
}
}
Expand All @@ -556,8 +615,8 @@

// wait to be signaled that there is a new event
select {
case <-ctx.Done():
return nil, fmt.Errorf("test deadline exceeded while waiting for routing updated event")

Check warning on line 619 in v2/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/coordinator.go#L618-L619

Added lines #L618 - L619 were not covered by tests
case <-w.signal:
}
}
Expand All @@ -582,8 +641,8 @@

// wait to be signaled that there is a new event
select {
case <-ctx.Done():
return nil, fmt.Errorf("test deadline exceeded while waiting for routing removed event")

Check warning on line 645 in v2/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/coordinator.go#L644-L645

Added lines #L644 - L645 were not covered by tests
case <-w.signal:
}
}
Expand Down
11 changes: 6 additions & 5 deletions v2/coord/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/libp2p/go-libp2p-kad-dht/v2/coord/internal/nettest"
"github.com/libp2p/go-libp2p-kad-dht/v2/internal/kadtest"
"github.com/libp2p/go-libp2p-kad-dht/v2/kadt"
"github.com/libp2p/go-libp2p-kad-dht/v2/pb"
)

const peerstoreTTL = 10 * time.Minute
Expand Down Expand Up @@ -115,13 +116,13 @@ func TestExhaustiveQuery(t *testing.T) {
visited := make(map[string]int)

// Record the nodes as they are visited
qfn := func(ctx context.Context, node Node, stats QueryStats) error {
visited[node.ID().String()]++
qfn := func(ctx context.Context, id kadt.PeerID, msg *pb.Message, stats QueryStats) error {
visited[id.String()]++
return nil
}

// Run a query to find the value
_, err = c.Query(ctx, target, qfn)
_, _, err = c.QueryClosest(ctx, target, qfn, 20)
require.NoError(t, err)

require.Equal(t, 3, len(visited))
Expand Down Expand Up @@ -154,13 +155,13 @@ func TestRoutingUpdatedEventEmittedForCloserNodes(t *testing.T) {
log.Fatalf("unexpected error creating coordinator: %v", err)
}

qfn := func(ctx context.Context, node Node, stats QueryStats) error {
qfn := func(ctx context.Context, id kadt.PeerID, msg *pb.Message, stats QueryStats) error {
return nil
}

// Run a query to find the value
target := nodes[3].NodeID.Key()
_, err = c.Query(ctx, target, qfn)
_, _, err = c.QueryClosest(ctx, target, qfn, 20)
require.NoError(t, err)

// the query run by the dht should have received a response from nodes[1] with closer nodes
Expand Down
3 changes: 2 additions & 1 deletion v2/coord/coretypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/plprobelab/go-kademlia/kad"

"github.com/libp2p/go-libp2p-kad-dht/v2/kadt"
"github.com/libp2p/go-libp2p-kad-dht/v2/pb"
)

// Value is a value that may be stored in the DHT.
Expand Down Expand Up @@ -49,7 +50,7 @@ var (
// Query stops entirely and returns that error.
//
// The stats argument contains statistics on the progress of the query so far.
type QueryFunc func(ctx context.Context, node Node, stats QueryStats) error
type QueryFunc func(ctx context.Context, id kadt.PeerID, resp *pb.Message, stats QueryStats) error

type QueryStats struct {
Start time.Time // Start is the time the query began executing.
Expand Down
62 changes: 56 additions & 6 deletions v2/coord/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import (
"github.com/libp2p/go-libp2p-kad-dht/v2/coord/query"
"github.com/libp2p/go-libp2p-kad-dht/v2/kadt"
"github.com/libp2p/go-libp2p-kad-dht/v2/pb"
)

type BehaviourEvent interface {
Expand Down Expand Up @@ -60,15 +61,39 @@
func (*EventOutboundGetCloserNodes) nodeHandlerRequest() {}
func (*EventOutboundGetCloserNodes) networkCommand() {}

type EventStartQuery struct {
type EventOutboundSendMessage struct {
QueryID query.QueryID
To kadt.PeerID
Message *pb.Message
Notify Notify[BehaviourEvent]
}

func (*EventOutboundSendMessage) behaviourEvent() {}
func (*EventOutboundSendMessage) nodeHandlerRequest() {}
func (*EventOutboundSendMessage) networkCommand() {}

Check warning on line 73 in v2/coord/event.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/event.go#L71-L73

Added lines #L71 - L73 were not covered by tests

type EventStartMessageQuery struct {
QueryID query.QueryID
Target kadt.Key
Message *pb.Message
KnownClosestNodes []kadt.PeerID
Notify NotifyCloser[BehaviourEvent]
NumResults int // the minimum number of nodes to successfully contact before considering iteration complete
}

func (*EventStartMessageQuery) behaviourEvent() {}
func (*EventStartMessageQuery) queryCommand() {}

Check warning on line 85 in v2/coord/event.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/event.go#L84-L85

Added lines #L84 - L85 were not covered by tests

type EventStartFindCloserQuery struct {
QueryID query.QueryID
Target kadt.Key
KnownClosestNodes []kadt.PeerID
Notify NotifyCloser[BehaviourEvent]
NumResults int // the minimum number of nodes to successfully contact before considering iteration complete
}

func (*EventStartQuery) behaviourEvent() {}
func (*EventStartQuery) queryCommand() {}
func (*EventStartFindCloserQuery) behaviourEvent() {}
func (*EventStartFindCloserQuery) queryCommand() {}

Check warning on line 96 in v2/coord/event.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/event.go#L95-L96

Added lines #L95 - L96 were not covered by tests

type EventStopQuery struct {
QueryID query.QueryID
Expand All @@ -82,8 +107,8 @@
NodeID kadt.PeerID
}

func (*EventAddNode) behaviourEvent() {}
func (*EventAddNode) routingCommand() {}

Check warning on line 111 in v2/coord/event.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/event.go#L110-L111

Added lines #L110 - L111 were not covered by tests

// EventGetCloserNodesSuccess notifies a behaviour that a GetCloserNodes request, initiated by an
// [EventOutboundGetCloserNodes] event has produced a successful response.
Expand All @@ -109,12 +134,36 @@
func (*EventGetCloserNodesFailure) behaviourEvent() {}
func (*EventGetCloserNodesFailure) nodeHandlerResponse() {}

// EventSendMessageSuccess notifies a behaviour that a SendMessage request, initiated by an
// [EventOutboundSendMessage] event has produced a successful response.
type EventSendMessageSuccess struct {
QueryID query.QueryID
To kadt.PeerID // To is the peer that the SendMessage request was sent to.
Response *pb.Message
CloserNodes []kadt.PeerID
}

func (*EventSendMessageSuccess) behaviourEvent() {}
func (*EventSendMessageSuccess) nodeHandlerResponse() {}

Check warning on line 147 in v2/coord/event.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/event.go#L146-L147

Added lines #L146 - L147 were not covered by tests

// EventSendMessageFailure notifies a behaviour that a SendMessage request, initiated by an
// [EventOutboundSendMessage] event has failed to produce a valid response.
type EventSendMessageFailure struct {
QueryID query.QueryID
To kadt.PeerID // To is the peer that the SendMessage request was sent to.
Target kadt.Key
Err error
}

func (*EventSendMessageFailure) behaviourEvent() {}
func (*EventSendMessageFailure) nodeHandlerResponse() {}

Check warning on line 159 in v2/coord/event.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/event.go#L158-L159

Added lines #L158 - L159 were not covered by tests

// EventQueryProgressed is emitted by the coordinator when a query has received a
// response from a node.
type EventQueryProgressed struct {
QueryID query.QueryID
NodeID kadt.PeerID
Response Message
Response *pb.Message
Stats query.QueryStats
}

Expand All @@ -123,8 +172,9 @@
// EventQueryFinished is emitted by the coordinator when a query has finished, either through
// running to completion or by being canceled.
type EventQueryFinished struct {
QueryID query.QueryID
Stats query.QueryStats
QueryID query.QueryID
Stats query.QueryStats
ClosestNodes []kadt.PeerID
}

func (*EventQueryFinished) behaviourEvent() {}
Expand All @@ -142,8 +192,8 @@
NodeID kadt.PeerID
}

func (*EventRoutingRemoved) behaviourEvent() {}
func (*EventRoutingRemoved) routingNotification() {}

Check warning on line 196 in v2/coord/event.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/event.go#L195-L196

Added lines #L195 - L196 were not covered by tests

// EventBootstrapFinished is emitted by the coordinator when a bootstrap has finished, either through
// running to completion or by being canceled.
Expand All @@ -162,8 +212,8 @@
NodeID kadt.PeerID
}

func (*EventNotifyConnectivity) behaviourEvent() {}
func (*EventNotifyConnectivity) routingNotification() {}

Check warning on line 216 in v2/coord/event.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/event.go#L215-L216

Added lines #L215 - L216 were not covered by tests

// EventNotifyNonConnectivity notifies a behaviour that a peer does not have connectivity and/or does not support
// finding closer nodes is known.
Expand All @@ -171,5 +221,5 @@
NodeID kadt.PeerID
}

func (*EventNotifyNonConnectivity) behaviourEvent() {}
func (*EventNotifyNonConnectivity) routingCommand() {}

Check warning on line 225 in v2/coord/event.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/event.go#L224-L225

Added lines #L224 - L225 were not covered by tests
3 changes: 2 additions & 1 deletion v2/coord/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ var (
)

var (
_ QueryCommand = (*EventStartQuery)(nil)
_ QueryCommand = (*EventStartMessageQuery)(nil)
_ QueryCommand = (*EventStartFindCloserQuery)(nil)
_ QueryCommand = (*EventStopQuery)(nil)
)

Expand Down
4 changes: 4 additions & 0 deletions v2/coord/internal/tiny/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
key Key
}

type Message struct {
Content string
}

var _ kad.NodeID[Key] = Node{}

func NewNode(k Key) Node {
Expand All @@ -22,8 +26,8 @@
return n.key
}

func (n Node) Equal(other Node) bool {
return n.key.Compare(other.key) == 0

Check warning on line 30 in v2/coord/internal/tiny/node.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/internal/tiny/node.go#L29-L30

Added lines #L29 - L30 were not covered by tests
}

func (n Node) String() string {
Expand Down
30 changes: 30 additions & 0 deletions v2/coord/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@
}
b.nodeHandlersMu.Unlock()
nh.Notify(ctx, ev)
case *EventOutboundSendMessage:
b.nodeHandlersMu.Lock()
p := kadt.PeerID(ev.To)
nh, ok := b.nodeHandlers[p]
if !ok {
nh = NewNodeHandler(p, b.rtr, b.logger, b.tracer)
b.nodeHandlers[p] = nh
}
b.nodeHandlersMu.Unlock()
nh.Notify(ctx, ev)
default:
panic(fmt.Sprintf("unexpected dht event: %T", ev))
}
Expand Down Expand Up @@ -160,6 +170,26 @@
Target: cmd.Target,
CloserNodes: nodes,
})
case *EventOutboundSendMessage:
if cmd.Notify == nil {
break

Check warning on line 175 in v2/coord/network.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/network.go#L175

Added line #L175 was not covered by tests
}
resp, err := h.rtr.SendMessage(ctx, h.self, cmd.Message)
if err != nil {
cmd.Notify.Notify(ctx, &EventSendMessageFailure{
QueryID: cmd.QueryID,
To: h.self,
Err: fmt.Errorf("NodeHandler: %w", err),
})
return false
}

cmd.Notify.Notify(ctx, &EventSendMessageSuccess{
QueryID: cmd.QueryID,
To: h.self,
Response: resp,
CloserNodes: resp.CloserNodes(),
})
default:
panic(fmt.Sprintf("unexpected command type: %T", cmd))
}
Expand Down Expand Up @@ -215,7 +245,7 @@

// GetValue requests that the node return any value associated with the supplied key.
// If the node does not have a value for the key it returns ErrValueNotFound.
func (h *NodeHandler) GetValue(ctx context.Context, key kadt.Key) (Value, error) {

Check warning on line 248 in v2/coord/network.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/network.go#L248

Added line #L248 was not covered by tests
panic("not implemented")
}

Expand Down
Loading
Loading