Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: v2 simplify tracing #924

Merged
merged 59 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
f88eb9e
WIP
dennis-tra Aug 28, 2023
3898f39
revise protobuf
dennis-tra Aug 29, 2023
d31e9c9
remove gogo protobuf dependency
dennis-tra Aug 30, 2023
d2cfda6
WIP
dennis-tra Aug 30, 2023
f2f22a3
add kadt package
dennis-tra Aug 30, 2023
7e8f1df
Add routing test
dennis-tra Aug 30, 2023
95520ac
add custom zikade dependency
dennis-tra Aug 30, 2023
b2dbd5a
Merge branch 'v2-develop' into zikade-import
dennis-tra Aug 30, 2023
fc2d370
Merge branch 'v2-develop' into zikade-import
dennis-tra Sep 1, 2023
d0c67d3
Import zikade code
iand Sep 1, 2023
55fac36
Remove generics from zikade imported code
iand Sep 4, 2023
7add530
Update to latest go-kademlia
iand Sep 4, 2023
25bcc7f
Cleanup naming of events
iand Sep 4, 2023
fbe3f44
Minor naming cleanup
iand Sep 4, 2023
300e402
Merge pull request #875 from libp2p/zikade-import-id
iand Sep 5, 2023
dca5c5d
Change maintainers for v2 while being developed
iand Sep 5, 2023
afa8051
remove Zikade dependency
dennis-tra Sep 5, 2023
79c7009
Consolidate type parameters
dennis-tra Sep 5, 2023
1d7cdea
Change config test structure
dennis-tra Sep 5, 2023
e8d82f1
use opentelemetry
dennis-tra Sep 6, 2023
165b00b
use convenience attribute methods
dennis-tra Sep 6, 2023
413aefd
let coord package use tele
dennis-tra Sep 6, 2023
53821dd
fix golint warnings
dennis-tra Sep 6, 2023
05e2d90
use clock.Clock
dennis-tra Sep 6, 2023
5d7df1c
add telemetry context tests
dennis-tra Sep 6, 2023
a749dcf
Improve telemetry documentation
dennis-tra Sep 6, 2023
885bd82
Merge branch 'v2-develop' into zikade-import
dennis-tra Sep 7, 2023
5fa755b
fix test race
dennis-tra Sep 7, 2023
6170d5d
fix garbage collection race
dennis-tra Sep 7, 2023
e1708f9
Add lean bootstrapper Musa
dennis-tra Sep 6, 2023
8757aa7
extend bootstrap tracing
dennis-tra Sep 6, 2023
15c545c
Only enable telemetry if configured
dennis-tra Sep 7, 2023
3e0cff3
use slog for logging of config
dennis-tra Sep 7, 2023
1325685
document otel provider configs
dennis-tra Sep 7, 2023
dda0c65
Rename ProtocolIPFS to ProtocolAmino
dennis-tra Sep 7, 2023
43b9a8c
Merge branch 'v2-develop' into musa
dennis-tra Sep 8, 2023
7c377cd
Add bootstrap peers configuration option
dennis-tra Sep 8, 2023
8a8ab7d
Add done channel to coordinator
dennis-tra Sep 8, 2023
1c77e7c
Extract amino backend initialization
dennis-tra Sep 8, 2023
9519f91
use mocked clock in tests
dennis-tra Sep 8, 2023
5366dd7
skip flaky legacy test
dennis-tra Sep 8, 2023
d5db301
Merge branch 'v2-develop' into musa
dennis-tra Sep 18, 2023
cbb12e0
update trace provider init
dennis-tra Sep 18, 2023
ba9f167
init coodinator
dennis-tra Sep 18, 2023
060456d
consolidate trace attribute
dennis-tra Sep 18, 2023
f19cc39
wip
dennis-tra Sep 18, 2023
7b27703
Decouple coord package from addressing
iand Sep 18, 2023
a418e18
Go fmt
iand Sep 18, 2023
e294795
fix: garbage collection test race condition (#904)
dennis-tra Sep 19, 2023
b9b0a83
Fix test flakes that wait for routing events (#905)
iand Sep 19, 2023
564404e
Merge branch 'v2-coord-addr' into musa
dennis-tra Sep 19, 2023
b832362
WIP
dennis-tra Sep 19, 2023
cf32850
WIP
dennis-tra Sep 19, 2023
4e76a7d
Merge branch 'v2-develop' into musa
dennis-tra Sep 19, 2023
6e17354
WIP
dennis-tra Sep 19, 2023
2dc716c
WIP
dennis-tra Sep 19, 2023
b7314ff
WIP
dennis-tra Sep 19, 2023
079ae15
WIP
dennis-tra Sep 19, 2023
fbe6e3f
WIP
dennis-tra Sep 19, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 25 additions & 5 deletions v2/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
ds "github.com/ipfs/go-datastore"
leveldb "github.com/ipfs/go-ds-leveldb"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
Expand All @@ -27,9 +28,9 @@ import (
const ServiceName = "libp2p.DHT"

const (
// ProtocolIPFS is the protocol identifier for the main IPFS network. If the
// DHT is configured with this protocol, you must configure backends for
// IPNS, Public Key, and provider records (ipns, pk, and providers
// ProtocolIPFS is the protocol identifier for the main Amino DHT network.
// If the DHT is configured with this protocol, you must configure backends
// for IPNS, Public Key, and provider records (ipns, pk, and providers
// namespaces). Configuration validation will fail if backends are missing.
ProtocolIPFS protocol.ID = "/ipfs/kad/1.0.0"

Expand Down Expand Up @@ -117,6 +118,10 @@ type Config struct {
// BucketSize determines the number of closer peers to return
BucketSize int

// BootstrapPeers is the list of peers that should be used to bootstrap
// into the DHT network.
BootstrapPeers []peer.AddrInfo

// ProtocolID represents the DHT [protocol] we can query with and respond to.
//
// [protocol]: https://docs.libp2p.io/concepts/fundamentals/protocols/
Expand Down Expand Up @@ -167,10 +172,16 @@ type Config struct {
// used to filter out private addresses.
AddressFilter AddressFilter

// MeterProvider .
// MeterProvider provides access to named Meter instances. It's used to,
// e.g., expose prometheus metrics. Check out the [opentelemetry docs]:
//
// [opentelemetry docs]: https://opentelemetry.io/docs/specs/otel/metrics/api/#meterprovider
MeterProvider metric.MeterProvider

// TracerProvider .
// TracerProvider provides Tracers that are used by instrumentation code to
// trace computational workflows. Check out the [opentelemetry docs]:
//
// [opentelemetry docs]: https://opentelemetry.io/docs/concepts/signals/traces/#tracer-provider
TracerProvider trace.TracerProvider
}

Expand All @@ -184,6 +195,7 @@ func DefaultConfig() *Config {
Mode: ModeOptAutoClient,
Kademlia: coord.DefaultCoordinatorConfig(),
BucketSize: 20, // MAGIC
BootstrapPeers: DefaultBootstrapPeers(),
ProtocolID: ProtocolIPFS,
RoutingTable: nil, // nil because a routing table requires information about the local node. triert.TrieRT will be used if this field is nil.
Backends: map[string]Backend{}, // if empty and [ProtocolIPFS] is used, it'll be populated with the ipns, pk and providers backends
Expand Down Expand Up @@ -238,6 +250,14 @@ func (c *Config) Validate() error {
return fmt.Errorf("invalid kademlia configuration: %w", err)
}

if c.BucketSize == 0 {
return fmt.Errorf("bucket size must not be 0")
}

if len(c.BootstrapPeers) == 0 {
return fmt.Errorf("no bootstrap peer")
}

if c.ProtocolID == "" {
return fmt.Errorf("protocolID must not be empty")
}
Expand Down
13 changes: 13 additions & 0 deletions v2/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"
"time"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -100,4 +101,16 @@ func TestConfig_Validate(t *testing.T) {
cfg.Clock = nil
assert.Error(t, cfg.Validate())
})

t.Run("zero bucket size", func(t *testing.T) {
cfg := DefaultConfig()
cfg.BucketSize = 0
assert.Error(t, cfg.Validate())
})

t.Run("empty bootstrap peers", func(t *testing.T) {
cfg := DefaultConfig()
cfg.BootstrapPeers = []peer.AddrInfo{}
assert.Error(t, cfg.Validate())
})
}
13 changes: 12 additions & 1 deletion v2/coord/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@
// cancel is used to cancel all running goroutines when the coordinator is cleaning up
cancel context.CancelFunc

// done will be closed when the coordinator's eventLoop exits. Block-read
// from this channel to wait until resources of this coordinator were
// cleaned up
done chan struct{}

// cfg is a copy of the optional configuration supplied to the dht
cfg CoordinatorConfig

Expand Down Expand Up @@ -162,7 +167,7 @@

func NewCoordinator(self kadt.PeerID, rtr Router[kadt.Key, kadt.PeerID, *pb.Message], rt routing.RoutingTableCpl[kadt.Key, kadt.PeerID], cfg *CoordinatorConfig) (*Coordinator, error) {
if cfg == nil {
cfg = DefaultCoordinatorConfig()

Check warning on line 170 in v2/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/coordinator.go#L170

Added line #L170 was not covered by tests
} else if err := cfg.Validate(); err != nil {
return nil, err
}
Expand All @@ -170,8 +175,8 @@
// initialize a new telemetry struct
tele, err := NewTelemetry(cfg.MeterProvider, cfg.TracerProvider)
if err != nil {
return nil, fmt.Errorf("init telemetry: %w", err)
}

Check warning on line 179 in v2/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/coordinator.go#L178-L179

Added lines #L178 - L179 were not covered by tests

qpCfg := query.DefaultPoolConfig()
qpCfg.Clock = cfg.Clock
Expand All @@ -180,7 +185,7 @@
qpCfg.QueryConcurrency = cfg.RequestConcurrency
qpCfg.RequestTimeout = cfg.RequestTimeout

qp, err := query.NewPool[kadt.Key](kadt.PeerID(self), qpCfg)
qp, err := query.NewPool[kadt.Key](self, qpCfg)
if err != nil {
return nil, fmt.Errorf("query pool: %w", err)
}
Expand Down Expand Up @@ -235,11 +240,13 @@
rtr: rtr,
rt: rt,
cancel: cancel,
done: make(chan struct{}),

networkBehaviour: networkBehaviour,
routingBehaviour: routingBehaviour,
queryBehaviour: queryBehaviour,
}

go d.eventLoop(ctx)

return d, nil
Expand All @@ -248,14 +255,17 @@
// Close cleans up all resources associated with this Coordinator.
func (c *Coordinator) Close() error {
c.cancel()
<-c.done
return nil
}

func (c *Coordinator) ID() kadt.PeerID {

Check warning on line 262 in v2/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/coordinator.go#L262

Added line #L262 was not covered by tests
return c.self
}

func (c *Coordinator) eventLoop(ctx context.Context) {
defer close(c.done)

ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.eventLoop")
defer span.End()
for {
Expand Down Expand Up @@ -331,7 +341,7 @@

// GetValue requests that the node return any value associated with the supplied key.
// If the node does not have a value for the key it returns ErrValueNotFound.
func (c *Coordinator) GetValue(ctx context.Context, k kadt.Key) (Value, error) {

Check warning on line 344 in v2/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/coordinator.go#L344

Added line #L344 was not covered by tests
panic("not implemented")
}

Expand Down Expand Up @@ -446,6 +456,7 @@
func (c *Coordinator) Bootstrap(ctx context.Context, seeds []kadt.PeerID) error {
ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.Bootstrap")
defer span.End()

c.routingBehaviour.Notify(ctx, &EventStartBootstrap{
SeedNodes: seeds,
})
Expand All @@ -468,15 +479,15 @@

// NotifyNonConnectivity notifies the coordinator that a peer has failed a connectivity check
// which means it is not connected and/or it doesn't support finding closer nodes
func (c *Coordinator) NotifyNonConnectivity(ctx context.Context, id kadt.PeerID) error {
ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.NotifyNonConnectivity")
defer span.End()

c.routingBehaviour.Notify(ctx, &EventNotifyNonConnectivity{
NodeID: id,
})

return nil

Check warning on line 490 in v2/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/coordinator.go#L482-L490

Added lines #L482 - L490 were not covered by tests
}

// A BufferedRoutingNotifier is a [RoutingNotifier] that buffers [RoutingNotification] events and provides methods
Expand Down Expand Up @@ -519,8 +530,8 @@

// wait to be signaled that there is a new event
select {
case <-ctx.Done():
return nil, fmt.Errorf("test deadline exceeded while waiting for event %T", expected)

Check warning on line 534 in v2/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/coordinator.go#L533-L534

Added lines #L533 - L534 were not covered by tests
case <-w.signal:
}
}
Expand All @@ -545,8 +556,8 @@

// wait to be signaled that there is a new event
select {
case <-ctx.Done():
return nil, fmt.Errorf("test deadline exceeded while waiting for routing updated event")

Check warning on line 560 in v2/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/coordinator.go#L559-L560

Added lines #L559 - L560 were not covered by tests
case <-w.signal:
}
}
Expand All @@ -571,8 +582,8 @@

// wait to be signaled that there is a new event
select {
case <-ctx.Done():
return nil, fmt.Errorf("test deadline exceeded while waiting for routing removed event")

Check warning on line 586 in v2/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/coordinator.go#L585-L586

Added lines #L585 - L586 were not covered by tests
case <-w.signal:
}
}
Expand Down
23 changes: 10 additions & 13 deletions v2/coord/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"github.com/plprobelab/go-kademlia/kad"
"github.com/plprobelab/go-kademlia/kaderr"
"github.com/plprobelab/go-kademlia/key"
"go.opentelemetry.io/otel/trace"

"github.com/libp2p/go-libp2p-kad-dht/v2/tele"
)
Expand Down Expand Up @@ -93,15 +94,15 @@

func NewQuery[K kad.Key[K], N kad.NodeID[K]](self N, id QueryID, target K, iter NodeIter[K, N], knownClosestNodes []N, cfg *QueryConfig[K]) (*Query[K, N], error) {
if cfg == nil {
cfg = DefaultQueryConfig[K]()

Check warning on line 97 in v2/coord/query/query.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/query/query.go#L97

Added line #L97 was not covered by tests
} else if err := cfg.Validate(); err != nil {
return nil, err
}

Check warning on line 100 in v2/coord/query/query.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/query/query.go#L99-L100

Added lines #L99 - L100 were not covered by tests

for _, node := range knownClosestNodes {
// exclude self from closest nodes
if key.Equal(node.Key(), self.Key()) {
continue

Check warning on line 105 in v2/coord/query/query.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/query/query.go#L105

Added line #L105 was not covered by tests
}
iter.Add(&NodeStatus[K, N]{
NodeID: node,
Expand All @@ -118,9 +119,13 @@
}, nil
}

func (q *Query[K, N]) Advance(ctx context.Context, ev QueryEvent) QueryState {
ctx, span := tele.StartSpan(ctx, "Query.Advance")
defer span.End()
func (q *Query[K, N]) Advance(ctx context.Context, ev QueryEvent) (out QueryState) {
ctx, span := tele.StartSpan(ctx, "Query.Advance", trace.WithAttributes(tele.AttrInEvent(ev)))
defer func() {
span.SetAttributes(tele.AttrOutEvent(out))
span.End()
}()

if q.finished {
return &StateQueryFinished{
QueryID: q.id,
Expand All @@ -130,23 +135,20 @@

switch tev := ev.(type) {
case *EventQueryCancel:
span.SetAttributes(tele.AttrEvent("EventQueryCancel"))
q.markFinished()
return &StateQueryFinished{
QueryID: q.id,
Stats: q.stats,
}
case *EventQueryFindCloserResponse[K, N]:
span.SetAttributes(tele.AttrEvent("EventQueryFindCloserResponse"))
q.onMessageResponse(ctx, tev.NodeID, tev.CloserNodes)
case *EventQueryFindCloserFailure[K, N]:
span.SetAttributes(tele.AttrEvent("EventQueryFindCloserFailure"))
span.RecordError(tev.Error)
q.onMessageFailure(ctx, tev.NodeID)
case nil:
// TEMPORARY: no event to process
default:
panic(fmt.Sprintf("unexpected event: %T", tev))

Check warning on line 151 in v2/coord/query/query.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/query/query.go#L150-L151

Added lines #L150 - L151 were not covered by tests
}

// count number of successes in the order of the iteration
Expand Down Expand Up @@ -174,7 +176,6 @@
q.inFlight--
q.stats.Failure++
} else if atCapacity() {
span.SetAttributes(tele.AttrOutEvent("StateQueryWaitingAtCapacity")) // this is the query's tracing span
returnState = &StateQueryWaitingAtCapacity{
QueryID: q.id,
Stats: q.stats,
Expand All @@ -191,7 +192,6 @@
// If it has contacted at least NumResults nodes successfully then the iteration is done.
if !progressing && successes >= q.cfg.NumResults {
q.markFinished()
span.SetAttributes(tele.AttrOutEvent("StateQueryFinished")) // this is the query's tracing span
returnState = &StateQueryFinished{
QueryID: q.id,
Stats: q.stats,
Expand All @@ -208,28 +208,27 @@
if q.stats.Start.IsZero() {
q.stats.Start = q.cfg.Clock.Now()
}
span.SetAttributes(tele.AttrOutEvent("StateQueryFindCloser")) // this is the query's tracing span
returnState = &StateQueryFindCloser[K, N]{
NodeID: ni.NodeID,
QueryID: q.id,
Stats: q.stats,
Target: q.target,
}
return true

}
span.SetAttributes(tele.AttrOutEvent("StateQueryWaitingAtCapacity")) // this is the query's tracing span

returnState = &StateQueryWaitingAtCapacity{
QueryID: q.id,
Stats: q.stats,
}

return true

Check warning on line 225 in v2/coord/query/query.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/query/query.go#L220-L225

Added lines #L220 - L225 were not covered by tests
case *StateNodeUnresponsive:
// ignore
case *StateNodeFailed:
// ignore
default:
panic(fmt.Sprintf("unexpected state: %T", ni.State))

Check warning on line 231 in v2/coord/query/query.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/query/query.go#L230-L231

Added lines #L230 - L231 were not covered by tests
}

return false
Expand All @@ -241,7 +240,6 @@

if q.inFlight > 0 {
// The iterator is still waiting for results and not at capacity
span.SetAttributes(tele.AttrOutEvent("StateQueryWaitingWithCapacity"))
return &StateQueryWaitingWithCapacity{
QueryID: q.id,
Stats: q.stats,
Expand All @@ -251,7 +249,6 @@
// The iterator is finished because all available nodes have been contacted
// and the iterator is not waiting for any more results.
q.markFinished()
span.SetAttributes(tele.AttrOutEvent("StateQueryFinished"))
return &StateQueryFinished{
QueryID: q.id,
Stats: q.stats,
Expand All @@ -276,20 +273,20 @@
case *StateNodeWaiting:
q.inFlight--
q.stats.Success++
case *StateNodeUnresponsive:
q.stats.Success++

Check warning on line 277 in v2/coord/query/query.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/query/query.go#L276-L277

Added lines #L276 - L277 were not covered by tests

case *StateNodeNotContacted:
// ignore duplicate or late response
return
case *StateNodeFailed:
// ignore duplicate or late response
return
case *StateNodeSucceeded:
// ignore duplicate or late response
return
default:
panic(fmt.Sprintf("unexpected state: %T", st))

Check warning on line 289 in v2/coord/query/query.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/query/query.go#L279-L289

Added lines #L279 - L289 were not covered by tests
}

// add closer nodes to list
Expand All @@ -310,27 +307,27 @@
func (q *Query[K, N]) onMessageFailure(ctx context.Context, node N) {
ni, found := q.iter.Find(node.Key())
if !found {
// got a rogue message
return
}

Check warning on line 312 in v2/coord/query/query.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/query/query.go#L310-L312

Added lines #L310 - L312 were not covered by tests
switch st := ni.State.(type) {
case *StateNodeWaiting:
q.inFlight--
q.stats.Failure++
case *StateNodeUnresponsive:
// update node state to failed
break
case *StateNodeNotContacted:
// update node state to failed
break
case *StateNodeFailed:
// ignore duplicate or late response
return
case *StateNodeSucceeded:
// ignore duplicate or late response
return
default:
panic(fmt.Sprintf("unexpected state: %T", st))

Check warning on line 330 in v2/coord/query/query.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/query/query.go#L317-L330

Added lines #L317 - L330 were not covered by tests
}

ni.State = &StateNodeFailed{}
Expand Down Expand Up @@ -367,10 +364,10 @@
}

// queryState() ensures that only [Query] states can be assigned to a QueryState.
func (*StateQueryFinished) queryState() {}
func (*StateQueryFindCloser[K, N]) queryState() {}
func (*StateQueryWaitingAtCapacity) queryState() {}
func (*StateQueryWaitingWithCapacity) queryState() {}

Check warning on line 370 in v2/coord/query/query.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/query/query.go#L367-L370

Added lines #L367 - L370 were not covered by tests

type QueryEvent interface {
queryEvent()
Expand All @@ -392,6 +389,6 @@
}

// queryEvent() ensures that only events accepted by [Query] can be assigned to a [QueryEvent].
func (*EventQueryCancel) queryEvent() {}
func (*EventQueryFindCloserResponse[K, N]) queryEvent() {}
func (*EventQueryFindCloserFailure[K, N]) queryEvent() {}

Check warning on line 394 in v2/coord/query/query.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/query/query.go#L392-L394

Added lines #L392 - L394 were not covered by tests
3 changes: 2 additions & 1 deletion v2/coord/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@

// notify must only be called while r.pendingMu is held
func (r *RoutingBehaviour) notify(ctx context.Context, ev BehaviourEvent) {
ctx, span := r.tracer.Start(ctx, "RoutingBehaviour.notify")
ctx, span := r.tracer.Start(ctx, "RoutingBehaviour.notify", trace.WithAttributes(attribute.String("event", fmt.Sprintf("%T", ev))))
defer span.End()

switch ev := ev.(type) {
case *EventStartBootstrap:
span.SetAttributes(attribute.String("event", "EventStartBootstrap"))
Expand Down Expand Up @@ -128,10 +129,10 @@
NodeID: ev.To,
}
} else {
cmd = &routing.EventIncludeConnectivityCheckFailure[kadt.Key, kadt.PeerID]{
NodeID: ev.To,
Error: fmt.Errorf("response did not include any closer nodes"),
}

Check warning on line 135 in v2/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/routing.go#L132-L135

Added lines #L132 - L135 were not covered by tests
}
// attempt to advance the include
next, ok := r.advanceInclude(ctx, cmd)
Expand All @@ -140,17 +141,17 @@
}

case "probe":
var cmd routing.ProbeEvent
// require that the node responded with at least one closer node
if len(ev.CloserNodes) > 0 {
cmd = &routing.EventProbeConnectivityCheckSuccess[kadt.Key, kadt.PeerID]{
NodeID: ev.To,
}
} else {
cmd = &routing.EventProbeConnectivityCheckFailure[kadt.Key, kadt.PeerID]{
NodeID: ev.To,
Error: fmt.Errorf("response did not include any closer nodes"),
}

Check warning on line 154 in v2/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/routing.go#L144-L154

Added lines #L144 - L154 were not covered by tests
}
// attempt to advance the probe state machine
next, ok := r.advanceProbe(ctx, cmd)
Expand Down Expand Up @@ -186,9 +187,9 @@
r.pending = append(r.pending, next)
}
case "probe":
cmd := &routing.EventProbeConnectivityCheckFailure[kadt.Key, kadt.PeerID]{
NodeID: ev.To,
Error: ev.Err,

Check warning on line 192 in v2/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/routing.go#L190-L192

Added lines #L190 - L192 were not covered by tests
}
// attempt to advance the probe state machine
next, ok := r.advanceProbe(ctx, cmd)
Expand All @@ -203,7 +204,7 @@
span.SetAttributes(attribute.String("event", "EventNotifyConnectivity"), attribute.String("nodeid", ev.NodeID.String()))
// ignore self
if r.self.Equal(ev.NodeID) {
break

Check warning on line 207 in v2/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/routing.go#L207

Added line #L207 was not covered by tests
}
// tell the include state machine in case this is a new peer that could be added to the routing table
cmd := &routing.EventIncludeAddCandidate[kadt.Key, kadt.PeerID]{
Expand All @@ -220,8 +221,8 @@
}
nextProbe, ok := r.advanceProbe(ctx, cmdProbe)
if ok {
r.pending = append(r.pending, nextProbe)
}

Check warning on line 225 in v2/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/routing.go#L224-L225

Added lines #L224 - L225 were not covered by tests
case *EventNotifyNonConnectivity:
span.SetAttributes(attribute.String("event", "EventNotifyConnectivity"), attribute.String("nodeid", ev.NodeID.String()))

Expand All @@ -231,8 +232,8 @@
}
nextProbe, ok := r.advanceProbe(ctx, cmdProbe)
if ok {
r.pending = append(r.pending, nextProbe)
}

Check warning on line 236 in v2/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/routing.go#L235-L236

Added lines #L235 - L236 were not covered by tests

default:
panic(fmt.Sprintf("unexpected dht event: %T", ev))
Expand Down
10 changes: 3 additions & 7 deletions v2/coord/routing/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"github.com/plprobelab/go-kademlia/kad"
"github.com/plprobelab/go-kademlia/kaderr"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/libp2p/go-libp2p-kad-dht/v2/coord/query"
"github.com/libp2p/go-libp2p-kad-dht/v2/tele"
Expand Down Expand Up @@ -79,10 +80,10 @@

func NewBootstrap[K kad.Key[K], N kad.NodeID[K]](self N, cfg *BootstrapConfig[K]) (*Bootstrap[K, N], error) {
if cfg == nil {
cfg = DefaultBootstrapConfig[K]()

Check warning on line 83 in v2/coord/routing/bootstrap.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/routing/bootstrap.go#L83

Added line #L83 was not covered by tests
} else if err := cfg.Validate(); err != nil {
return nil, err
}

Check warning on line 86 in v2/coord/routing/bootstrap.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/routing/bootstrap.go#L85-L86

Added lines #L85 - L86 were not covered by tests

return &Bootstrap[K, N]{
self: self,
Expand All @@ -92,13 +93,11 @@

// Advance advances the state of the bootstrap by attempting to advance its query if running.
func (b *Bootstrap[K, N]) Advance(ctx context.Context, ev BootstrapEvent) BootstrapState {
ctx, span := tele.StartSpan(ctx, "Bootstrap.Advance")
ctx, span := tele.StartSpan(ctx, "Bootstrap.Advance", trace.WithAttributes(tele.AttrInEvent(ev)))
defer span.End()

switch tev := ev.(type) {
case *EventBootstrapStart[K, N]:
span.SetAttributes(tele.AttrEvent("EventBootstrapStart"))

// TODO: ignore start event if query is already in progress
iter := query.NewClosestNodesIter[K, N](b.self.Key())

Expand All @@ -111,31 +110,28 @@

qry, err := query.NewQuery[K, N](b.self, queryID, b.self.Key(), iter, tev.KnownClosestNodes, qryCfg)
if err != nil {
// TODO: don't panic
panic(err)

Check warning on line 114 in v2/coord/routing/bootstrap.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/routing/bootstrap.go#L113-L114

Added lines #L113 - L114 were not covered by tests
}
b.qry = qry
return b.advanceQuery(ctx, nil)

case *EventBootstrapFindCloserResponse[K, N]:
span.SetAttributes(tele.AttrEvent("EventBootstrapFindCloserResponse"))
return b.advanceQuery(ctx, &query.EventQueryFindCloserResponse[K, N]{
NodeID: tev.NodeID,
CloserNodes: tev.CloserNodes,
})
case *EventBootstrapFindCloserFailure[K, N]:
span.SetAttributes(tele.AttrEvent("EventBootstrapFindCloserFailure"))
span.RecordError(tev.Error)
return b.advanceQuery(ctx, &query.EventQueryFindCloserFailure[K, N]{
NodeID: tev.NodeID,
Error: tev.Error,
})

Check warning on line 129 in v2/coord/routing/bootstrap.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/routing/bootstrap.go#L124-L129

Added lines #L124 - L129 were not covered by tests

case *EventBootstrapPoll:
span.SetAttributes(tele.AttrEvent("EventBootstrapPoll"))
// ignore, nothing to do
// ignore, nothing to do
default:
panic(fmt.Sprintf("unexpected event: %T", tev))

Check warning on line 134 in v2/coord/routing/bootstrap.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/routing/bootstrap.go#L133-L134

Added lines #L133 - L134 were not covered by tests
}

if b.qry != nil {
Expand Down Expand Up @@ -166,11 +162,11 @@
case *query.StateQueryWaitingAtCapacity:
elapsed := b.cfg.Clock.Since(st.Stats.Start)
if elapsed > b.cfg.Timeout {
span.SetAttributes(attribute.String("out_state", "StateBootstrapTimeout"))
return &StateBootstrapTimeout{
Stats: st.Stats,
}
}

Check warning on line 169 in v2/coord/routing/bootstrap.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/routing/bootstrap.go#L165-L169

Added lines #L165 - L169 were not covered by tests
span.SetAttributes(attribute.String("out_state", "StateBootstrapWaiting"))
return &StateBootstrapWaiting{
Stats: st.Stats,
Expand All @@ -178,17 +174,17 @@
case *query.StateQueryWaitingWithCapacity:
elapsed := b.cfg.Clock.Since(st.Stats.Start)
if elapsed > b.cfg.Timeout {
span.SetAttributes(attribute.String("out_state", "StateBootstrapTimeout"))
return &StateBootstrapTimeout{
Stats: st.Stats,
}
}

Check warning on line 181 in v2/coord/routing/bootstrap.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/routing/bootstrap.go#L177-L181

Added lines #L177 - L181 were not covered by tests
span.SetAttributes(attribute.String("out_state", "StateBootstrapWaiting"))
return &StateBootstrapWaiting{
Stats: st.Stats,
}
default:
panic(fmt.Sprintf("unexpected state: %T", st))

Check warning on line 187 in v2/coord/routing/bootstrap.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/routing/bootstrap.go#L186-L187

Added lines #L186 - L187 were not covered by tests
}
}

Expand Down Expand Up @@ -224,11 +220,11 @@
}

// bootstrapState() ensures that only Bootstrap states can be assigned to a BootstrapState.
func (*StateBootstrapFindCloser[K, N]) bootstrapState() {}
func (*StateBootstrapIdle) bootstrapState() {}
func (*StateBootstrapFinished) bootstrapState() {}
func (*StateBootstrapTimeout) bootstrapState() {}
func (*StateBootstrapWaiting) bootstrapState() {}

Check warning on line 227 in v2/coord/routing/bootstrap.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/routing/bootstrap.go#L223-L227

Added lines #L223 - L227 were not covered by tests

// BootstrapEvent is an event intended to advance the state of a bootstrap.
type BootstrapEvent interface {
Expand Down Expand Up @@ -256,7 +252,7 @@
}

// bootstrapEvent() ensures that only events accepted by a [Bootstrap] can be assigned to the [BootstrapEvent] interface.
func (*EventBootstrapPoll) bootstrapEvent() {}
func (*EventBootstrapStart[K, N]) bootstrapEvent() {}
func (*EventBootstrapFindCloserResponse[K, N]) bootstrapEvent() {}
func (*EventBootstrapFindCloserFailure[K, N]) bootstrapEvent() {}

Check warning on line 258 in v2/coord/routing/bootstrap.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/routing/bootstrap.go#L255-L258

Added lines #L255 - L258 were not covered by tests
19 changes: 7 additions & 12 deletions v2/coord/routing/include.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
"time"

"github.com/benbjohnson/clock"

"github.com/plprobelab/go-kademlia/kad"
"github.com/plprobelab/go-kademlia/kaderr"
"github.com/plprobelab/go-kademlia/key"
"go.opentelemetry.io/otel/trace"

"github.com/libp2p/go-libp2p-kad-dht/v2/tele"
)
Expand Down Expand Up @@ -85,10 +85,10 @@

func NewInclude[K kad.Key[K], N kad.NodeID[K]](rt kad.RoutingTable[K, N], cfg *IncludeConfig) (*Include[K, N], error) {
if cfg == nil {
cfg = DefaultIncludeConfig()

Check warning on line 88 in v2/coord/routing/include.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/routing/include.go#L88

Added line #L88 was not covered by tests
} else if err := cfg.Validate(); err != nil {
return nil, err
}

Check warning on line 91 in v2/coord/routing/include.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/routing/include.go#L90-L91

Added lines #L90 - L91 were not covered by tests

return &Include[K, N]{
candidates: newNodeQueue[K, N](cfg.QueueCapacity),
Expand All @@ -99,14 +99,15 @@
}

// Advance advances the state of the include state machine by attempting to advance its query if running.
func (b *Include[K, N]) Advance(ctx context.Context, ev IncludeEvent) IncludeState {
ctx, span := tele.StartSpan(ctx, "Include.Advance")
defer span.End()
func (b *Include[K, N]) Advance(ctx context.Context, ev IncludeEvent) (out IncludeState) {
ctx, span := tele.StartSpan(ctx, "Include.Advance", trace.WithAttributes(tele.AttrInEvent(ev)))
defer func() {
span.SetAttributes(tele.AttrOutEvent(out))
span.End()
}()

switch tev := ev.(type) {

case *EventIncludeAddCandidate[K, N]:
span.SetAttributes(tele.AttrEvent("EventIncludeAddCandidate"))
// Ignore if already running a check
_, checking := b.checks[key.HexString(tev.NodeID.Key())]
if checking {
Expand All @@ -125,27 +126,23 @@
b.candidates.Enqueue(ctx, tev.NodeID)

case *EventIncludeConnectivityCheckSuccess[K, N]:
span.SetAttributes(tele.AttrEvent("EventIncludeConnectivityCheckSuccess"))
ch, ok := b.checks[key.HexString(tev.NodeID.Key())]
if ok {
delete(b.checks, key.HexString(tev.NodeID.Key()))
if b.rt.AddNode(tev.NodeID) {
span.SetAttributes(tele.AttrOutEvent("StateIncludeRoutingUpdated"))
return &StateIncludeRoutingUpdated[K, N]{
NodeID: ch.NodeID,
}
}
}
case *EventIncludeConnectivityCheckFailure[K, N]:
span.SetAttributes(tele.AttrEvent("EventIncludeConnectivityCheckFailure"))
span.RecordError(tev.Error)
delete(b.checks, key.HexString(tev.NodeID.Key()))

case *EventIncludePoll:
span.SetAttributes(tele.AttrEvent("EventIncludePoll"))
// ignore, nothing to do
default:
panic(fmt.Sprintf("unexpected event: %T", tev))

Check warning on line 145 in v2/coord/routing/include.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/routing/include.go#L144-L145

Added lines #L144 - L145 were not covered by tests
}

if len(b.checks) == b.cfg.Concurrency {
Expand All @@ -159,7 +156,6 @@
if !ok {
// No candidate in queue
if len(b.checks) > 0 {
span.SetAttributes(tele.AttrOutEvent("StateIncludeWaitingWithCapacity"))
return &StateIncludeWaitingWithCapacity{}
}
return &StateIncludeIdle{}
Expand All @@ -171,7 +167,6 @@
}

// Ask the node to find itself
span.SetAttributes(tele.AttrOutEvent("StateIncludeConnectivityCheck"))
return &StateIncludeConnectivityCheck[K, N]{
NodeID: candidate,
}
Expand All @@ -196,12 +191,12 @@
// added and false otherwise.
func (q *nodeQueue[K, N]) Enqueue(ctx context.Context, id N) bool {
if len(q.nodes) == q.capacity {
return false
}

Check warning on line 195 in v2/coord/routing/include.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/routing/include.go#L194-L195

Added lines #L194 - L195 were not covered by tests

if _, exists := q.keys[key.HexString(id.Key())]; exists {
return false
}

Check warning on line 199 in v2/coord/routing/include.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/routing/include.go#L198-L199

Added lines #L198 - L199 were not covered by tests

q.nodes = append(q.nodes, id)
q.keys[key.HexString(id.Key())] = struct{}{}
Expand Down Expand Up @@ -259,12 +254,12 @@
}

// includeState() ensures that only Include states can be assigned to an IncludeState.
func (*StateIncludeConnectivityCheck[K, N]) includeState() {}
func (*StateIncludeIdle) includeState() {}
func (*StateIncludeWaitingAtCapacity) includeState() {}
func (*StateIncludeWaitingWithCapacity) includeState() {}
func (*StateIncludeWaitingFull) includeState() {}
func (*StateIncludeRoutingUpdated[K, N]) includeState() {}

Check warning on line 262 in v2/coord/routing/include.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/routing/include.go#L257-L262

Added lines #L257 - L262 were not covered by tests

// IncludeEvent is an event intended to advance the state of an [Include].
type IncludeEvent interface {
Expand All @@ -291,7 +286,7 @@
}

// includeEvent() ensures that only events accepted by an [Include] can be assigned to the [IncludeEvent] interface.
func (*EventIncludePoll) includeEvent() {}
func (*EventIncludeAddCandidate[K, N]) includeEvent() {}
func (*EventIncludeConnectivityCheckSuccess[K, N]) includeEvent() {}
func (*EventIncludeConnectivityCheckFailure[K, N]) includeEvent() {}

Check warning on line 292 in v2/coord/routing/include.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/routing/include.go#L289-L292

Added lines #L289 - L292 were not covered by tests
Loading
Loading