Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add example of a find peer query using go-kademlia #865

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions v2/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package dht

import (
"fmt"
"sync"

"github.com/libp2p/go-libp2p/core/host"
ma "github.com/multiformats/go-multiaddr"
"github.com/plprobelab/go-kademlia/coord"
"github.com/plprobelab/go-kademlia/kad"
"github.com/plprobelab/go-kademlia/key"
"github.com/plprobelab/go-kademlia/query"
"github.com/plprobelab/go-kademlia/routing/triert"
)

Expand All @@ -31,6 +33,13 @@ type DHT struct {
// rt holds a reference to the routing table implementation. This can be
// configured via the Config struct.
rt kad.RoutingTable[key.Key256, kad.NodeID[key.Key256]]

// --- go-kademlia specific fields below ---

// qSubs tracks a mapping of queries to their subscribers
qSubs map[query.QueryID]chan<- kad.Response[key.Key256, ma.Multiaddr]
qSubsLk sync.RWMutex
qSubCnt uint64
Comment on lines +40 to +42
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think this would better handled in go-kademlia

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it would be nicer but the compromise is that the logic for stopping the query in the event loop has to be injected

}

// New constructs a new DHT for the given underlying host and with the given
Expand All @@ -44,8 +53,9 @@ func New(h host.Host, cfg *Config) (*DHT, error) {
}

d := &DHT{
host: h,
cfg: cfg,
host: h,
cfg: cfg,
qSubs: make(map[query.QueryID]chan<- kad.Response[key.Key256, ma.Multiaddr]),
}

nid := nodeID(d.host.ID())
Expand Down
31 changes: 31 additions & 0 deletions v2/examples/findpeer/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package main

import (
"context"
"fmt"

"github.com/libp2p/go-libp2p"
dht "github.com/libp2p/go-libp2p-kad-dht/v2"
)

func main() {
ctx := context.Background()

h, err := libp2p.New(libp2p.NoListenAddrs)
if err != nil {
panic(err)
}

d, err := dht.New(h, dht.DefaultConfig())
if err != nil {
panic(err)
}

go d.Start(ctx)

addrInfo, err := d.FindPeer(ctx, "")
if err != nil {
panic(err)
}
fmt.Println(addrInfo)
}
29 changes: 29 additions & 0 deletions v2/protocol.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package dht

import (
"github.com/plprobelab/go-kademlia/kad"
"github.com/plprobelab/go-kademlia/network/address"
)

var protoID = address.ProtocolID("/statemachine/1.0.0") // protocol ID for the test

type FindNodeRequest[K kad.Key[K], A kad.Address[A]] struct {
NodeID kad.NodeID[K]
}

func (r FindNodeRequest[K, A]) Target() K {
return r.NodeID.Key()
}

func (FindNodeRequest[K, A]) EmptyResponse() kad.Response[K, A] {
return &FindNodeResponse[K, A]{}
}

type FindNodeResponse[K kad.Key[K], A kad.Address[A]] struct {
NodeID kad.NodeID[K] // node we were looking for
CloserPeers []kad.NodeInfo[K, A]
}

func (r *FindNodeResponse[K, A]) CloserNodes() []kad.NodeInfo[K, A] {
return r.CloserPeers
}
103 changes: 95 additions & 8 deletions v2/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,129 @@ package dht

import (
"context"
"fmt"

"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"
ma "github.com/multiformats/go-multiaddr"
"github.com/plprobelab/go-kademlia/coord"
"github.com/plprobelab/go-kademlia/kad"
"github.com/plprobelab/go-kademlia/key"
"github.com/plprobelab/go-kademlia/query"
)

func (d *DHT) Start(ctx context.Context) {
ctx, span := startSpan(ctx, "mainLoop")
defer span.End()

kadEvents := d.kad.Events()
for {
select {
case <-ctx.Done():
return
case ev := <-kadEvents:
switch tev := ev.(type) {
case *coord.KademliaOutboundQueryProgressedEvent[key.Key256, ma.Multiaddr]:
// TODO: locking
ch, ok := d.qSubs[tev.QueryID]
if !ok {
// we have lost the query waiter somehow
d.kad.StopQuery(ctx, tev.QueryID)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StopQuery returns an error but in go-kademlia this is always nil

continue
}

// notify the waiter
ch <- tev.Response
}
}
}
}

// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
// guarantee, but we can use them to aid refactoring.
var (
_ routing.Routing = (*DHT)(nil)
)

func (d DHT) Provide(ctx context.Context, cid cid.Cid, b bool) error {
func (d *DHT) Provide(ctx context.Context, cid cid.Cid, b bool) error {
panic("implement me")
}

func (d DHT) FindProvidersAsync(ctx context.Context, cid cid.Cid, i int) <-chan peer.AddrInfo {
func (d *DHT) FindProvidersAsync(ctx context.Context, cid cid.Cid, i int) <-chan peer.AddrInfo {
panic("implement me")
}

func (d DHT) FindPeer(ctx context.Context, id peer.ID) (peer.AddrInfo, error) {
panic("implement me")
func (d *DHT) registerQuery() (query.QueryID, chan kad.Response[key.Key256, ma.Multiaddr]) {
ch := make(chan kad.Response[key.Key256, ma.Multiaddr])

d.qSubsLk.Lock()
d.qSubCnt += 1
qid := query.QueryID(fmt.Sprintf("query-%d", d.qSubCnt))
d.qSubs[qid] = ch
d.qSubsLk.Unlock()

return qid, ch
}

func (d *DHT) FindPeer(ctx context.Context, id peer.ID) (peer.AddrInfo, error) {
// TODO: look in local peer store first

qid, ch := d.registerQuery()

nid := nodeID(d.host.ID())
err := d.kad.StartQuery(ctx, qid, protoID, &FindNodeRequest[key.Key256, ma.Multiaddr]{NodeID: nid})
if err != nil {
return peer.AddrInfo{}, fmt.Errorf("failed to start query: %w", err)
}

for {
select {
case <-ctx.Done():
d.kad.StopQuery(ctx, qid)
return peer.AddrInfo{}, ctx.Err()
case resp, ok := <-ch:
if !ok {
// channel was closed, so query can't progress
d.kad.StopQuery(ctx, qid)
return peer.AddrInfo{}, fmt.Errorf("query was unexpectedly stopped")
}

// we got a response from a message sent by query
switch resp := resp.(type) {
case *FindNodeResponse[key.Key256, ma.Multiaddr]:
// interpret the response
println("IpfsHandler.FindNode: got FindNode response")
for _, found := range resp.CloserPeers {
if key.Equal(found.ID().Key(), nid.Key()) {
// found the node we were looking for
d.kad.StopQuery(ctx, qid)
pid := peer.AddrInfo{
ID: peer.ID(found.ID().(nodeID)),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be cooler if we didn't need to do this type cast here

Addrs: found.Addresses(),
}
return pid, nil
}
}
default:
return peer.AddrInfo{}, fmt.Errorf("unknown response: %v", resp)
}
}
}
}

func (d DHT) PutValue(ctx context.Context, s string, bytes []byte, option ...routing.Option) error {
func (d *DHT) PutValue(ctx context.Context, s string, bytes []byte, option ...routing.Option) error {
panic("implement me")
}

func (d DHT) GetValue(ctx context.Context, s string, option ...routing.Option) ([]byte, error) {
func (d *DHT) GetValue(ctx context.Context, s string, option ...routing.Option) ([]byte, error) {
panic("implement me")
}

func (d DHT) SearchValue(ctx context.Context, s string, option ...routing.Option) (<-chan []byte, error) {
func (d *DHT) SearchValue(ctx context.Context, s string, option ...routing.Option) (<-chan []byte, error) {
panic("implement me")
}

func (d DHT) Bootstrap(ctx context.Context) error {
func (d *DHT) Bootstrap(ctx context.Context) error {
panic("implement me")
}
13 changes: 13 additions & 0 deletions v2/tele.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package dht

import (
"context"
"fmt"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)

func startSpan(ctx context.Context, name string, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
return otel.Tracer("go-libp2p-kad-dht").Start(ctx, fmt.Sprintf("Libp2pDHT.%s", name), opts...)
}
Loading