Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Musa Bootstrapper #878

Closed
wants to merge 55 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
f88eb9e
WIP
dennis-tra Aug 28, 2023
3898f39
revise protobuf
dennis-tra Aug 29, 2023
d31e9c9
remove gogo protobuf dependency
dennis-tra Aug 30, 2023
d2cfda6
WIP
dennis-tra Aug 30, 2023
f2f22a3
add kadt package
dennis-tra Aug 30, 2023
7e8f1df
Add routing test
dennis-tra Aug 30, 2023
95520ac
add custom zikade dependency
dennis-tra Aug 30, 2023
b2dbd5a
Merge branch 'v2-develop' into zikade-import
dennis-tra Aug 30, 2023
fc2d370
Merge branch 'v2-develop' into zikade-import
dennis-tra Sep 1, 2023
d0c67d3
Import zikade code
iand Sep 1, 2023
55fac36
Remove generics from zikade imported code
iand Sep 4, 2023
7add530
Update to latest go-kademlia
iand Sep 4, 2023
25bcc7f
Cleanup naming of events
iand Sep 4, 2023
fbe3f44
Minor naming cleanup
iand Sep 4, 2023
300e402
Merge pull request #875 from libp2p/zikade-import-id
iand Sep 5, 2023
dca5c5d
Change maintainers for v2 while being developed
iand Sep 5, 2023
afa8051
remove Zikade dependency
dennis-tra Sep 5, 2023
79c7009
Consolidate type parameters
dennis-tra Sep 5, 2023
1d7cdea
Change config test structure
dennis-tra Sep 5, 2023
e8d82f1
use opentelemetry
dennis-tra Sep 6, 2023
165b00b
use convenience attribute methods
dennis-tra Sep 6, 2023
413aefd
let coord package use tele
dennis-tra Sep 6, 2023
53821dd
fix golint warnings
dennis-tra Sep 6, 2023
05e2d90
use clock.Clock
dennis-tra Sep 6, 2023
5d7df1c
add telemetry context tests
dennis-tra Sep 6, 2023
a749dcf
Improve telemetry documentation
dennis-tra Sep 6, 2023
885bd82
Merge branch 'v2-develop' into zikade-import
dennis-tra Sep 7, 2023
5fa755b
fix test race
dennis-tra Sep 7, 2023
6170d5d
fix garbage collection race
dennis-tra Sep 7, 2023
e1708f9
Add lean bootstrapper Musa
dennis-tra Sep 6, 2023
8757aa7
extend bootstrap tracing
dennis-tra Sep 6, 2023
15c545c
Only enable telemetry if configured
dennis-tra Sep 7, 2023
3e0cff3
use slog for logging of config
dennis-tra Sep 7, 2023
1325685
document otel provider configs
dennis-tra Sep 7, 2023
dda0c65
Rename ProtocolIPFS to ProtocolAmino
dennis-tra Sep 7, 2023
43b9a8c
Merge branch 'v2-develop' into musa
dennis-tra Sep 8, 2023
7c377cd
Add bootstrap peers configuration option
dennis-tra Sep 8, 2023
8a8ab7d
Add done channel to coordinator
dennis-tra Sep 8, 2023
1c77e7c
Extract amino backend initialization
dennis-tra Sep 8, 2023
9519f91
use mocked clock in tests
dennis-tra Sep 8, 2023
5366dd7
skip flaky legacy test
dennis-tra Sep 8, 2023
d5db301
Merge branch 'v2-develop' into musa
dennis-tra Sep 18, 2023
cbb12e0
update trace provider init
dennis-tra Sep 18, 2023
ba9f167
init coodinator
dennis-tra Sep 18, 2023
060456d
consolidate trace attribute
dennis-tra Sep 18, 2023
f19cc39
wip
dennis-tra Sep 18, 2023
7b27703
Decouple coord package from addressing
iand Sep 18, 2023
a418e18
Go fmt
iand Sep 18, 2023
e294795
fix: garbage collection test race condition (#904)
dennis-tra Sep 19, 2023
b9b0a83
Fix test flakes that wait for routing events (#905)
iand Sep 19, 2023
564404e
Merge branch 'v2-coord-addr' into musa
dennis-tra Sep 19, 2023
b832362
WIP
dennis-tra Sep 19, 2023
cf32850
WIP
dennis-tra Sep 19, 2023
4e76a7d
Merge branch 'v2-develop' into musa
dennis-tra Sep 19, 2023
6e17354
WIP
dennis-tra Sep 19, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ func checkForWellFormedTablesOnce(t *testing.T, dhts []*IpfsDHT, minPeers, avgPe
rtlen := dht.routingTable.Size()
totalPeers += rtlen
if minPeers > 0 && rtlen < minPeers {
//t.Logf("routing table for %s only has %d peers (should have >%d)", dht.self, rtlen, minPeers)
// t.Logf("routing table for %s only has %d peers (should have >%d)", dht.self, rtlen, minPeers)
return false
}
}
Expand Down Expand Up @@ -1397,6 +1397,7 @@ func minInt(a, b int) int {
}

func TestFindPeerQueryMinimal(t *testing.T) {
t.Skip("flaky")
testFindPeerQuery(t, 2, 22, 1)
}

Expand Down Expand Up @@ -1568,9 +1569,7 @@ func TestProvideDisabled(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var (
optsA, optsB []Option
)
var optsA, optsB []Option
optsA = append(optsA, ProtocolPrefix("/provMaybeDisabled"))
optsB = append(optsB, ProtocolPrefix("/provMaybeDisabled"))

Expand Down Expand Up @@ -1995,8 +1994,10 @@ func TestBootStrapWhenRTIsEmpty(t *testing.T) {
// convert the bootstrap addresses to a p2p address
bootstrapAddrs := make([]peer.AddrInfo, nBootStraps)
for i := 0; i < nBootStraps; i++ {
b := peer.AddrInfo{ID: bootstrappers[i].self,
Addrs: bootstrappers[i].host.Addrs()}
b := peer.AddrInfo{
ID: bootstrappers[i].self,
Addrs: bootstrappers[i].host.Addrs(),
}
bootstrapAddrs[i] = b
}

Expand Down
30 changes: 25 additions & 5 deletions v2/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
ds "github.com/ipfs/go-datastore"
leveldb "github.com/ipfs/go-ds-leveldb"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
Expand All @@ -27,9 +28,9 @@ import (
const ServiceName = "libp2p.DHT"

const (
// ProtocolIPFS is the protocol identifier for the main IPFS network. If the
// DHT is configured with this protocol, you must configure backends for
// IPNS, Public Key, and provider records (ipns, pk, and providers
// ProtocolIPFS is the protocol identifier for the main Amino DHT network.
// If the DHT is configured with this protocol, you must configure backends
// for IPNS, Public Key, and provider records (ipns, pk, and providers
// namespaces). Configuration validation will fail if backends are missing.
ProtocolIPFS protocol.ID = "/ipfs/kad/1.0.0"

Expand Down Expand Up @@ -117,6 +118,10 @@ type Config struct {
// BucketSize determines the number of closer peers to return
BucketSize int

// BootstrapPeers is the list of peers that should be used to bootstrap
// into the DHT network.
BootstrapPeers []peer.AddrInfo

// ProtocolID represents the DHT [protocol] we can query with and respond to.
//
// [protocol]: https://docs.libp2p.io/concepts/fundamentals/protocols/
Expand Down Expand Up @@ -167,10 +172,16 @@ type Config struct {
// used to filter out private addresses.
AddressFilter AddressFilter

// MeterProvider .
// MeterProvider provides access to named Meter instances. It's used to,
// e.g., expose prometheus metrics. Check out the [opentelemetry docs]:
//
// [opentelemetry docs]: https://opentelemetry.io/docs/specs/otel/metrics/api/#meterprovider
MeterProvider metric.MeterProvider

// TracerProvider .
// TracerProvider provides Tracers that are used by instrumentation code to
// trace computational workflows. Check out the [opentelemetry docs]:
//
// [opentelemetry docs]: https://opentelemetry.io/docs/concepts/signals/traces/#tracer-provider
TracerProvider trace.TracerProvider
}

Expand All @@ -184,6 +195,7 @@ func DefaultConfig() *Config {
Mode: ModeOptAutoClient,
Kademlia: coord.DefaultCoordinatorConfig(),
BucketSize: 20, // MAGIC
BootstrapPeers: DefaultBootstrapPeers(),
ProtocolID: ProtocolIPFS,
RoutingTable: nil, // nil because a routing table requires information about the local node. triert.TrieRT will be used if this field is nil.
Backends: map[string]Backend{}, // if empty and [ProtocolIPFS] is used, it'll be populated with the ipns, pk and providers backends
Expand Down Expand Up @@ -238,6 +250,14 @@ func (c *Config) Validate() error {
return fmt.Errorf("invalid kademlia configuration: %w", err)
}

if c.BucketSize == 0 {
return fmt.Errorf("bucket size must not be 0")
}

if len(c.BootstrapPeers) == 0 {
return fmt.Errorf("no bootstrap peer")
}

if c.ProtocolID == "" {
return fmt.Errorf("protocolID must not be empty")
}
Expand Down
14 changes: 14 additions & 0 deletions v2/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"testing"
"time"

"github.com/libp2p/go-libp2p/core/peer"

"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -100,4 +102,16 @@ func TestConfig_Validate(t *testing.T) {
cfg.Clock = nil
assert.Error(t, cfg.Validate())
})

t.Run("zero bucket size", func(t *testing.T) {
cfg := DefaultConfig()
cfg.BucketSize = 0
assert.Error(t, cfg.Validate())
})

t.Run("empty bootstrap peers", func(t *testing.T) {
cfg := DefaultConfig()
cfg.BootstrapPeers = []peer.AddrInfo{}
assert.Error(t, cfg.Validate())
})
}
13 changes: 12 additions & 1 deletion v2/coord/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ type Coordinator struct {
// cancel is used to cancel all running goroutines when the coordinator is cleaning up
cancel context.CancelFunc

// done will be closed when the coordinator's eventLoop exits. Block-read
// from this channel to wait until resources of this coordinator were
// cleaned up
done chan struct{}

// cfg is a copy of the optional configuration supplied to the dht
cfg CoordinatorConfig

Expand Down Expand Up @@ -180,7 +185,7 @@ func NewCoordinator(self kadt.PeerID, rtr Router[kadt.Key, kadt.PeerID, *pb.Mess
qpCfg.QueryConcurrency = cfg.RequestConcurrency
qpCfg.RequestTimeout = cfg.RequestTimeout

qp, err := query.NewPool[kadt.Key](kadt.PeerID(self), qpCfg)
qp, err := query.NewPool[kadt.Key](self, qpCfg)
if err != nil {
return nil, fmt.Errorf("query pool: %w", err)
}
Expand Down Expand Up @@ -235,11 +240,13 @@ func NewCoordinator(self kadt.PeerID, rtr Router[kadt.Key, kadt.PeerID, *pb.Mess
rtr: rtr,
rt: rt,
cancel: cancel,
done: make(chan struct{}),

networkBehaviour: networkBehaviour,
routingBehaviour: routingBehaviour,
queryBehaviour: queryBehaviour,
}

go d.eventLoop(ctx)

return d, nil
Expand All @@ -248,6 +255,7 @@ func NewCoordinator(self kadt.PeerID, rtr Router[kadt.Key, kadt.PeerID, *pb.Mess
// Close cleans up all resources associated with this Coordinator.
func (c *Coordinator) Close() error {
c.cancel()
<-c.done
return nil
}

Expand All @@ -256,6 +264,8 @@ func (c *Coordinator) ID() kadt.PeerID {
}

func (c *Coordinator) eventLoop(ctx context.Context) {
defer close(c.done)

ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.eventLoop")
defer span.End()
for {
Expand Down Expand Up @@ -446,6 +456,7 @@ func (c *Coordinator) AddNodes(ctx context.Context, ids []kadt.PeerID) error {
func (c *Coordinator) Bootstrap(ctx context.Context, seeds []kadt.PeerID) error {
ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.Bootstrap")
defer span.End()

c.routingBehaviour.Notify(ctx, &EventStartBootstrap{
SeedNodes: seeds,
})
Expand Down
23 changes: 10 additions & 13 deletions v2/coord/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/plprobelab/go-kademlia/kad"
"github.com/plprobelab/go-kademlia/kaderr"
"github.com/plprobelab/go-kademlia/key"
"go.opentelemetry.io/otel/trace"

"github.com/libp2p/go-libp2p-kad-dht/v2/tele"
)
Expand Down Expand Up @@ -118,9 +119,13 @@ func NewQuery[K kad.Key[K], N kad.NodeID[K]](self N, id QueryID, target K, iter
}, nil
}

func (q *Query[K, N]) Advance(ctx context.Context, ev QueryEvent) QueryState {
ctx, span := tele.StartSpan(ctx, "Query.Advance")
defer span.End()
func (q *Query[K, N]) Advance(ctx context.Context, ev QueryEvent) (out QueryState) {
ctx, span := tele.StartSpan(ctx, "Query.Advance", trace.WithAttributes(tele.AttrInEvent(ev)))
defer func() {
span.SetAttributes(tele.AttrOutEvent(out))
span.End()
}()

if q.finished {
return &StateQueryFinished{
QueryID: q.id,
Expand All @@ -130,17 +135,14 @@ func (q *Query[K, N]) Advance(ctx context.Context, ev QueryEvent) QueryState {

switch tev := ev.(type) {
case *EventQueryCancel:
span.SetAttributes(tele.AttrEvent("EventQueryCancel"))
q.markFinished()
return &StateQueryFinished{
QueryID: q.id,
Stats: q.stats,
}
case *EventQueryFindCloserResponse[K, N]:
span.SetAttributes(tele.AttrEvent("EventQueryFindCloserResponse"))
q.onMessageResponse(ctx, tev.NodeID, tev.CloserNodes)
case *EventQueryFindCloserFailure[K, N]:
span.SetAttributes(tele.AttrEvent("EventQueryFindCloserFailure"))
span.RecordError(tev.Error)
q.onMessageFailure(ctx, tev.NodeID)
case nil:
Expand Down Expand Up @@ -174,7 +176,6 @@ func (q *Query[K, N]) Advance(ctx context.Context, ev QueryEvent) QueryState {
q.inFlight--
q.stats.Failure++
} else if atCapacity() {
span.SetAttributes(tele.AttrOutEvent("StateQueryWaitingAtCapacity")) // this is the query's tracing span
returnState = &StateQueryWaitingAtCapacity{
QueryID: q.id,
Stats: q.stats,
Expand All @@ -191,7 +192,6 @@ func (q *Query[K, N]) Advance(ctx context.Context, ev QueryEvent) QueryState {
// If it has contacted at least NumResults nodes successfully then the iteration is done.
if !progressing && successes >= q.cfg.NumResults {
q.markFinished()
span.SetAttributes(tele.AttrOutEvent("StateQueryFinished")) // this is the query's tracing span
returnState = &StateQueryFinished{
QueryID: q.id,
Stats: q.stats,
Expand All @@ -208,21 +208,20 @@ func (q *Query[K, N]) Advance(ctx context.Context, ev QueryEvent) QueryState {
if q.stats.Start.IsZero() {
q.stats.Start = q.cfg.Clock.Now()
}
span.SetAttributes(tele.AttrOutEvent("StateQueryFindCloser")) // this is the query's tracing span
returnState = &StateQueryFindCloser[K, N]{
NodeID: ni.NodeID,
QueryID: q.id,
Stats: q.stats,
Target: q.target,
}
return true

}
span.SetAttributes(tele.AttrOutEvent("StateQueryWaitingAtCapacity")) // this is the query's tracing span

returnState = &StateQueryWaitingAtCapacity{
QueryID: q.id,
Stats: q.stats,
}

return true
case *StateNodeUnresponsive:
// ignore
Expand All @@ -241,7 +240,6 @@ func (q *Query[K, N]) Advance(ctx context.Context, ev QueryEvent) QueryState {

if q.inFlight > 0 {
// The iterator is still waiting for results and not at capacity
span.SetAttributes(tele.AttrOutEvent("StateQueryWaitingWithCapacity"))
return &StateQueryWaitingWithCapacity{
QueryID: q.id,
Stats: q.stats,
Expand All @@ -251,7 +249,6 @@ func (q *Query[K, N]) Advance(ctx context.Context, ev QueryEvent) QueryState {
// The iterator is finished because all available nodes have been contacted
// and the iterator is not waiting for any more results.
q.markFinished()
span.SetAttributes(tele.AttrOutEvent("StateQueryFinished"))
return &StateQueryFinished{
QueryID: q.id,
Stats: q.stats,
Expand Down
3 changes: 2 additions & 1 deletion v2/coord/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ func (r *RoutingBehaviour) Notify(ctx context.Context, ev BehaviourEvent) {

// notify must only be called while r.pendingMu is held
func (r *RoutingBehaviour) notify(ctx context.Context, ev BehaviourEvent) {
ctx, span := r.tracer.Start(ctx, "RoutingBehaviour.notify")
ctx, span := r.tracer.Start(ctx, "RoutingBehaviour.notify", trace.WithAttributes(attribute.String("event", fmt.Sprintf("%T", ev))))
defer span.End()

switch ev := ev.(type) {
case *EventStartBootstrap:
span.SetAttributes(attribute.String("event", "EventStartBootstrap"))
Expand Down
10 changes: 3 additions & 7 deletions v2/coord/routing/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/plprobelab/go-kademlia/kad"
"github.com/plprobelab/go-kademlia/kaderr"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/libp2p/go-libp2p-kad-dht/v2/coord/query"
"github.com/libp2p/go-libp2p-kad-dht/v2/tele"
Expand Down Expand Up @@ -92,13 +93,11 @@ func NewBootstrap[K kad.Key[K], N kad.NodeID[K]](self N, cfg *BootstrapConfig[K]

// Advance advances the state of the bootstrap by attempting to advance its query if running.
func (b *Bootstrap[K, N]) Advance(ctx context.Context, ev BootstrapEvent) BootstrapState {
ctx, span := tele.StartSpan(ctx, "Bootstrap.Advance")
ctx, span := tele.StartSpan(ctx, "Bootstrap.Advance", trace.WithAttributes(tele.AttrInEvent(ev)))
defer span.End()

switch tev := ev.(type) {
case *EventBootstrapStart[K, N]:
span.SetAttributes(tele.AttrEvent("EventBootstrapStart"))

// TODO: ignore start event if query is already in progress
iter := query.NewClosestNodesIter[K, N](b.self.Key())

Expand All @@ -118,22 +117,19 @@ func (b *Bootstrap[K, N]) Advance(ctx context.Context, ev BootstrapEvent) Bootst
return b.advanceQuery(ctx, nil)

case *EventBootstrapFindCloserResponse[K, N]:
span.SetAttributes(tele.AttrEvent("EventBootstrapFindCloserResponse"))
return b.advanceQuery(ctx, &query.EventQueryFindCloserResponse[K, N]{
NodeID: tev.NodeID,
CloserNodes: tev.CloserNodes,
})
case *EventBootstrapFindCloserFailure[K, N]:
span.SetAttributes(tele.AttrEvent("EventBootstrapFindCloserFailure"))
span.RecordError(tev.Error)
return b.advanceQuery(ctx, &query.EventQueryFindCloserFailure[K, N]{
NodeID: tev.NodeID,
Error: tev.Error,
})

case *EventBootstrapPoll:
span.SetAttributes(tele.AttrEvent("EventBootstrapPoll"))
// ignore, nothing to do
// ignore, nothing to do
default:
panic(fmt.Sprintf("unexpected event: %T", tev))
}
Expand Down
Loading
Loading