Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IpfsDHT -> KadDHT #568

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 41 additions & 41 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ const (
kad2 protocol.ID = "/kad/2.0.0"
)

// IpfsDHT is an implementation of Kademlia with S/Kademlia modifications.
// KadDHT is an implementation of Kademlia with S/Kademlia modifications.
// It is used to implement the base Routing module.
type IpfsDHT struct {
type KadDHT struct {
host host.Host // the network services we need
self peer.ID // Local peer (yourself)
peerstore peerstore.Peerstore // Peer Registry
Expand Down Expand Up @@ -119,18 +119,18 @@ type IpfsDHT struct {
// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
// guarantee, but we can use them to aid refactoring.
var (
_ routing.ContentRouting = (*IpfsDHT)(nil)
_ routing.Routing = (*IpfsDHT)(nil)
_ routing.PeerRouting = (*IpfsDHT)(nil)
_ routing.PubKeyFetcher = (*IpfsDHT)(nil)
_ routing.ValueStore = (*IpfsDHT)(nil)
_ routing.ContentRouting = (*KadDHT)(nil)
_ routing.Routing = (*KadDHT)(nil)
_ routing.PeerRouting = (*KadDHT)(nil)
_ routing.PubKeyFetcher = (*KadDHT)(nil)
_ routing.ValueStore = (*KadDHT)(nil)
)

// New creates a new DHT with the specified host and options.
// Please note that being connected to a DHT peer does not necessarily imply that it's also in the DHT Routing Table.
// If the Routing Table has more than "minRTRefreshThreshold" peers, we consider a peer as a Routing Table candidate ONLY when
// we successfully get a query response from it OR if it send us a query.
func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error) {
func New(ctx context.Context, h host.Host, options ...Option) (*KadDHT, error) {
var cfg config
if err := cfg.apply(append([]Option{defaults}, options...)...); err != nil {
return nil, err
Expand Down Expand Up @@ -202,7 +202,7 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
// NewDHT creates a new DHT object with the given peer as the 'local' host.
// IpfsDHT's initialized with this function will respond to DHT requests,
// whereas IpfsDHT's initialized with NewDHTClient will not.
func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *KadDHT {
dht, err := New(ctx, h, Datastore(dstore))
if err != nil {
panic(err)
Expand All @@ -214,15 +214,15 @@ func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
// host. IpfsDHT clients initialized with this function will not respond to DHT
// requests. If you need a peer to respond to DHT requests, use NewDHT instead.
// NewDHTClient creates a new DHT object with the given peer as the 'local' host
func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *KadDHT {
dht, err := New(ctx, h, Datastore(dstore), Mode(ModeClient))
if err != nil {
panic(err)
}
return dht
}

func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
func makeDHT(ctx context.Context, h host.Host, cfg config) (*KadDHT, error) {
var protocols, serverProtocols []protocol.ID

// check if custom test protocols were set
Expand All @@ -243,7 +243,7 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
serverProtocols = []protocol.ID{cfg.protocolPrefix + kad2, cfg.protocolPrefix + kad1}
}

dht := &IpfsDHT{
dht := &KadDHT{
datastore: cfg.datastore,
self: h.ID(),
peerstore: h.Peerstore(),
Expand Down Expand Up @@ -282,7 +282,7 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
return dht, nil
}

func makeRoutingTable(dht *IpfsDHT, cfg config) (*kb.RoutingTable, error) {
func makeRoutingTable(dht *KadDHT, cfg config) (*kb.RoutingTable, error) {
// The threshold is calculated based on the expected amount of time that should pass before we
// query a peer as part of our refresh cycle.
// To grok the Math Wizardy that produced these exact equations, please be patient as a document explaining it will
Expand Down Expand Up @@ -311,7 +311,7 @@ func makeRoutingTable(dht *IpfsDHT, cfg config) (*kb.RoutingTable, error) {
}

// fixLowPeers tries to get more peers into the routing table if we're below the threshold
func (dht *IpfsDHT) fixLowPeersRoutine(proc goprocess.Process) {
func (dht *KadDHT) fixLowPeersRoutine(proc goprocess.Process) {
for {
select {
case <-dht.fixLowPeersChan:
Expand All @@ -338,7 +338,7 @@ func (dht *IpfsDHT) fixLowPeersRoutine(proc goprocess.Process) {

// TODO This is hacky, horrible and the programmer needs to have his mother called a hamster.
// SHOULD be removed once https://github.com/libp2p/go-libp2p/issues/800 goes in.
func (dht *IpfsDHT) persistRTPeersInPeerStore() {
func (dht *KadDHT) persistRTPeersInPeerStore() {
tickr := time.NewTicker(peerstore.RecentlyConnectedAddrTTL / 3)
defer tickr.Stop()

Expand All @@ -356,7 +356,7 @@ func (dht *IpfsDHT) persistRTPeersInPeerStore() {
}

// putValueToPeer stores the given key/value pair at the peer 'p'
func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID, rec *recpb.Record) error {
func (dht *KadDHT) putValueToPeer(ctx context.Context, p peer.ID, rec *recpb.Record) error {
pmes := pb.NewMessage(pb.Message_PUT_VALUE, rec.Key, 0)
pmes.Record = rec
rpmes, err := dht.sendRequest(ctx, p, pmes)
Expand All @@ -379,7 +379,7 @@ var errInvalidRecord = errors.New("received invalid record")
// key. It returns either the value or a list of closer peers.
// NOTE: It will update the dht's peerstore with any new addresses
// it finds for the given peer.
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, key string) (*recpb.Record, []*peer.AddrInfo, error) {
func (dht *KadDHT) getValueOrPeers(ctx context.Context, p peer.ID, key string) (*recpb.Record, []*peer.AddrInfo, error) {
pmes, err := dht.getValueSingle(ctx, p, key)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -411,13 +411,13 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, key string)
}

// getValueSingle simply performs the get value RPC with the given parameters
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID, key string) (*pb.Message, error) {
func (dht *KadDHT) getValueSingle(ctx context.Context, p peer.ID, key string) (*pb.Message, error) {
pmes := pb.NewMessage(pb.Message_GET_VALUE, []byte(key), 0)
return dht.sendRequest(ctx, p, pmes)
}

// getLocal attempts to retrieve the value from the datastore
func (dht *IpfsDHT) getLocal(key string) (*recpb.Record, error) {
func (dht *KadDHT) getLocal(key string) (*recpb.Record, error) {
logger.Debugw("finding value in datastore", "key", loggableKeyString(key))

rec, err := dht.getRecordFromDatastore(mkDsKey(key))
Expand All @@ -436,7 +436,7 @@ func (dht *IpfsDHT) getLocal(key string) (*recpb.Record, error) {
}

// putLocal stores the key value pair in the datastore
func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error {
func (dht *KadDHT) putLocal(key string, rec *recpb.Record) error {
data, err := proto.Marshal(rec)
if err != nil {
logger.Warnw("failed to put marshal record for local put", "error", err, "key", key)
Expand All @@ -448,7 +448,7 @@ func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error {

// peerFound signals the routingTable that we've found a peer that
// might support the DHT protocol.
func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) {
func (dht *KadDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) {
logger.Debugw("peer found", "peer", p)
b, err := dht.validRTPeer(p)
if err != nil {
Expand All @@ -469,7 +469,7 @@ func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) {
}

// peerStoppedDHT signals the routing table that a peer is unable to responsd to DHT queries anymore.
func (dht *IpfsDHT) peerStoppedDHT(ctx context.Context, p peer.ID) {
func (dht *KadDHT) peerStoppedDHT(ctx context.Context, p peer.ID) {
logger.Debugw("peer stopped dht", "peer", p)
// A peer that does not support the DHT protocol is dead for us.
// There's no point in talking to anymore till it starts supporting the DHT protocol again.
Expand All @@ -479,15 +479,15 @@ func (dht *IpfsDHT) peerStoppedDHT(ctx context.Context, p peer.ID) {
dht.fixRTIfNeeded()
}

func (dht *IpfsDHT) fixRTIfNeeded() {
func (dht *KadDHT) fixRTIfNeeded() {
select {
case dht.fixLowPeersChan <- struct{}{}:
default:
}
}

// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
func (dht *IpfsDHT) FindLocal(id peer.ID) peer.AddrInfo {
func (dht *KadDHT) FindLocal(id peer.ID) peer.AddrInfo {
switch dht.host.Network().Connectedness(id) {
case network.Connected, network.CanConnect:
return dht.peerstore.PeerInfo(id)
Expand All @@ -497,24 +497,24 @@ func (dht *IpfsDHT) FindLocal(id peer.ID) peer.AddrInfo {
}

// findPeerSingle asks peer 'p' if they know where the peer with id 'id' is
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (*pb.Message, error) {
func (dht *KadDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (*pb.Message, error) {
pmes := pb.NewMessage(pb.Message_FIND_NODE, []byte(id), 0)
return dht.sendRequest(ctx, p, pmes)
}

func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key multihash.Multihash) (*pb.Message, error) {
func (dht *KadDHT) findProvidersSingle(ctx context.Context, p peer.ID, key multihash.Multihash) (*pb.Message, error) {
pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, key, 0)
return dht.sendRequest(ctx, p, pmes)
}

// nearestPeersToQuery returns the routing tables closest peers.
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID {
func (dht *KadDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID {
closer := dht.routingTable.NearestPeers(kb.ConvertKey(string(pmes.GetKey())), count)
return closer
}

// betterPeersToQuery returns nearestPeersToQuery with some additional filtering
func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, from peer.ID, count int) []peer.ID {
func (dht *KadDHT) betterPeersToQuery(pmes *pb.Message, from peer.ID, count int) []peer.ID {
closer := dht.nearestPeersToQuery(pmes, count)

// no node? nil
Expand Down Expand Up @@ -543,7 +543,7 @@ func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, from peer.ID, count int
return filtered
}

func (dht *IpfsDHT) setMode(m mode) error {
func (dht *KadDHT) setMode(m mode) error {
dht.modeLk.Lock()
defer dht.modeLk.Unlock()

Expand All @@ -564,7 +564,7 @@ func (dht *IpfsDHT) setMode(m mode) error {
// moveToServerMode advertises (via libp2p identify updates) that we are able to respond to DHT queries and sets the appropriate stream handlers.
// Note: We may support responding to queries with protocols aside from our primary ones in order to support
// interoperability with older versions of the DHT protocol.
func (dht *IpfsDHT) moveToServerMode() error {
func (dht *KadDHT) moveToServerMode() error {
dht.mode = modeServer
for _, p := range dht.serverProtocols {
dht.host.SetStreamHandler(p, dht.handleNewStream)
Expand All @@ -577,7 +577,7 @@ func (dht *IpfsDHT) moveToServerMode() error {
// utilizing the handled protocols.
// Note: We may support responding to queries with protocols aside from our primary ones in order to support
// interoperability with older versions of the DHT protocol.
func (dht *IpfsDHT) moveToClientMode() error {
func (dht *KadDHT) moveToClientMode() error {
dht.mode = modeClient
for _, p := range dht.serverProtocols {
dht.host.RemoveStreamHandler(p)
Expand All @@ -600,49 +600,49 @@ func (dht *IpfsDHT) moveToClientMode() error {
return nil
}

func (dht *IpfsDHT) getMode() mode {
func (dht *KadDHT) getMode() mode {
dht.modeLk.Lock()
defer dht.modeLk.Unlock()
return dht.mode
}

// Context return dht's context
func (dht *IpfsDHT) Context() context.Context {
func (dht *KadDHT) Context() context.Context {
return dht.ctx
}

// Process return dht's process
func (dht *IpfsDHT) Process() goprocess.Process {
func (dht *KadDHT) Process() goprocess.Process {
return dht.proc
}

// RoutingTable return dht's routingTable
func (dht *IpfsDHT) RoutingTable() *kb.RoutingTable {
func (dht *KadDHT) RoutingTable() *kb.RoutingTable {
return dht.routingTable
}

// Close calls Process Close
func (dht *IpfsDHT) Close() error {
func (dht *KadDHT) Close() error {
return dht.proc.Close()
}

func mkDsKey(s string) ds.Key {
return ds.NewKey(base32.RawStdEncoding.EncodeToString([]byte(s)))
}

func (dht *IpfsDHT) PeerID() peer.ID {
func (dht *KadDHT) PeerID() peer.ID {
return dht.self
}

func (dht *IpfsDHT) PeerKey() []byte {
func (dht *KadDHT) PeerKey() []byte {
return kb.ConvertPeerID(dht.self)
}

func (dht *IpfsDHT) Host() host.Host {
func (dht *KadDHT) Host() host.Host {
return dht.host
}

func (dht *IpfsDHT) Ping(ctx context.Context, p peer.ID) error {
func (dht *KadDHT) Ping(ctx context.Context, p peer.ID) error {
req := pb.NewMessage(pb.Message_PING, nil, 0)
resp, err := dht.sendRequest(ctx, p, req)
if err != nil {
Expand All @@ -657,7 +657,7 @@ func (dht *IpfsDHT) Ping(ctx context.Context, p peer.ID) error {
// newContextWithLocalTags returns a new context.Context with the InstanceID and
// PeerID keys populated. It will also take any extra tags that need adding to
// the context as tag.Mutators.
func (dht *IpfsDHT) newContextWithLocalTags(ctx context.Context, extraTags ...tag.Mutator) context.Context {
func (dht *KadDHT) newContextWithLocalTags(ctx context.Context, extraTags ...tag.Mutator) context.Context {
extraTags = append(
extraTags,
tag.Upsert(metrics.KeyPeerID, dht.self.Pretty()),
Expand Down
15 changes: 8 additions & 7 deletions dht_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package dht
import (
"context"
"fmt"
"github.com/libp2p/go-libp2p-core/peer"
"time"

"github.com/libp2p/go-libp2p-core/peer"

multierror "github.com/hashicorp/go-multierror"
process "github.com/jbenet/goprocess"
processctx "github.com/jbenet/goprocess/context"
Expand Down Expand Up @@ -42,7 +43,7 @@ func init() {
// startSelfLookup starts a go-routine that listens for requests to trigger a self walk on a dedicated channel
// and then sends the error status back on the error channel sent along with the request.
// if multiple callers "simultaneously" ask for a self walk, it performs ONLY one self walk and sends the same error status to all of them.
func (dht *IpfsDHT) startSelfLookup() {
func (dht *KadDHT) startSelfLookup() {
dht.proc.Go(func(proc process.Process) {
ctx := processctx.WithProcessClosing(dht.ctx, proc)
for {
Expand Down Expand Up @@ -82,7 +83,7 @@ func (dht *IpfsDHT) startSelfLookup() {
}

// Start the refresh worker.
func (dht *IpfsDHT) startRefreshing() {
func (dht *KadDHT) startRefreshing() {
// scan the RT table periodically & do a random walk for cpl's that haven't been queried since the given period
dht.proc.Go(func(proc process.Process) {
ctx := processctx.WithProcessClosing(dht.ctx, proc)
Expand Down Expand Up @@ -156,7 +157,7 @@ func collectWaitingChannels(source chan chan<- error) []chan<- error {
}
}

func (dht *IpfsDHT) doRefresh(ctx context.Context) error {
func (dht *KadDHT) doRefresh(ctx context.Context) error {
var merr error

// wait for the self walk result
Expand Down Expand Up @@ -184,7 +185,7 @@ func (dht *IpfsDHT) doRefresh(ctx context.Context) error {
}

// refreshCpls scans the routing table, and does a random walk for cpl's that haven't been queried since the given period
func (dht *IpfsDHT) refreshCpls(ctx context.Context) error {
func (dht *KadDHT) refreshCpls(ctx context.Context) error {
doQuery := func(cpl uint, target string, f func(context.Context) error) error {
logger.Infof("starting refreshing cpl %d to %s (routing table size was %d)",
cpl, target, dht.routingTable.Size())
Expand Down Expand Up @@ -236,7 +237,7 @@ func (dht *IpfsDHT) refreshCpls(ctx context.Context) error {
// IpfsRouter interface.
//
// This just calls `RefreshRoutingTable`.
func (dht *IpfsDHT) Bootstrap(_ context.Context) error {
func (dht *KadDHT) Bootstrap(_ context.Context) error {
dht.RefreshRoutingTable()
return nil
}
Expand All @@ -245,7 +246,7 @@ func (dht *IpfsDHT) Bootstrap(_ context.Context) error {
//
// The returned channel will block until the refresh finishes, then yield the
// error and close. The channel is buffered and safe to ignore.
func (dht *IpfsDHT) RefreshRoutingTable() <-chan error {
func (dht *KadDHT) RefreshRoutingTable() <-chan error {
res := make(chan error, 1)
select {
case dht.triggerRtRefresh <- res:
Expand Down
6 changes: 3 additions & 3 deletions dht_bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestSelfWalkOnAddressChange(t *testing.T) {
d2 := setupDHT(ctx, t, false, DisableAutoRefresh())
d3 := setupDHT(ctx, t, false, DisableAutoRefresh())

var connectedTo *IpfsDHT
var connectedTo *KadDHT
// connect d1 to whoever is "further"
if kb.CommonPrefixLen(kb.ConvertPeerID(d1.self), kb.ConvertPeerID(d2.self)) <=
kb.CommonPrefixLen(kb.ConvertPeerID(d1.self), kb.ConvertPeerID(d3.self)) {
Expand All @@ -34,14 +34,14 @@ func TestSelfWalkOnAddressChange(t *testing.T) {
connect(t, ctx, d2, d3)

// d1 should have ONLY 1 peer in it's RT
waitForWellFormedTables(t, []*IpfsDHT{d1}, 1, 1, 2*time.Second)
waitForWellFormedTables(t, []*KadDHT{d1}, 1, 1, 2*time.Second)
require.Equal(t, connectedTo.self, d1.routingTable.ListPeers()[0])

// now emit the address change event
em, err := d1.host.EventBus().Emitter(&event.EvtLocalAddressesUpdated{})
require.NoError(t, err)
require.NoError(t, em.Emit(event.EvtLocalAddressesUpdated{}))
waitForWellFormedTables(t, []*IpfsDHT{d1}, 2, 2, 2*time.Second)
waitForWellFormedTables(t, []*KadDHT{d1}, 2, 2, 2*time.Second)
// it should now have both peers in the RT
ps := d1.routingTable.ListPeers()
require.Contains(t, ps, d2.self)
Expand Down
Loading