Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change IpfsDHT to KadDHT #702

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 49 additions & 46 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,12 @@ type addPeerRTReq struct {
queryPeer bool
}

// IpfsDHT is an implementation of Kademlia with S/Kademlia modifications.
// IpfsDHT is an alias for KadDHT.
type IpfsDHT KadDHT

// KadDHT is an implementation of Kademlia with S/Kademlia modifications.
// It is used to implement the base Routing module.
type IpfsDHT struct {
type KadDHT struct {
host host.Host // the network services we need
self peer.ID // Local peer (yourself)
selfKey kb.ID
Expand Down Expand Up @@ -151,18 +154,18 @@ type IpfsDHT struct {
// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
// guarantee, but we can use them to aid refactoring.
var (
_ routing.ContentRouting = (*IpfsDHT)(nil)
_ routing.Routing = (*IpfsDHT)(nil)
_ routing.PeerRouting = (*IpfsDHT)(nil)
_ routing.PubKeyFetcher = (*IpfsDHT)(nil)
_ routing.ValueStore = (*IpfsDHT)(nil)
_ routing.ContentRouting = (*KadDHT)(nil)
_ routing.Routing = (*KadDHT)(nil)
_ routing.PeerRouting = (*KadDHT)(nil)
_ routing.PubKeyFetcher = (*KadDHT)(nil)
_ routing.ValueStore = (*KadDHT)(nil)
)

// New creates a new DHT with the specified host and options.
// Please note that being connected to a DHT peer does not necessarily imply that it's also in the DHT Routing Table.
// If the Routing Table has more than "minRTRefreshThreshold" peers, we consider a peer as a Routing Table candidate ONLY when
// we successfully get a query response from it OR if it send us a query.
func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error) {
func New(ctx context.Context, h host.Host, options ...Option) (*KadDHT, error) {
var cfg config
if err := cfg.apply(append([]Option{defaults}, options...)...); err != nil {
return nil, err
Expand Down Expand Up @@ -244,9 +247,9 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
}

// NewDHT creates a new DHT object with the given peer as the 'local' host.
// IpfsDHT's initialized with this function will respond to DHT requests,
// whereas IpfsDHT's initialized with NewDHTClient will not.
func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
// KadDHT's initialized with this function will respond to DHT requests,
// whereas KadDHT's initialized with NewDHTClient will not.
func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *KadDHT {
dht, err := New(ctx, h, Datastore(dstore))
if err != nil {
panic(err)
Expand All @@ -255,17 +258,17 @@ func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
}

// NewDHTClient creates a new DHT object with the given peer as the 'local'
// host. IpfsDHT clients initialized with this function will not respond to DHT
// host. KadDHT clients initialized with this function will not respond to DHT
// requests. If you need a peer to respond to DHT requests, use NewDHT instead.
func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *KadDHT {
dht, err := New(ctx, h, Datastore(dstore), Mode(ModeClient))
if err != nil {
panic(err)
}
return dht
}

func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
func makeDHT(ctx context.Context, h host.Host, cfg config) (*KadDHT, error) {
var protocols, serverProtocols []protocol.ID

v1proto := cfg.protocolPrefix + kad1
Expand All @@ -277,7 +280,7 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
protocols = []protocol.ID{v1proto}
serverProtocols = []protocol.ID{v1proto}

dht := &IpfsDHT{
dht := &KadDHT{
datastore: cfg.datastore,
self: h.ID(),
selfKey: kb.ConvertPeerID(h.ID()),
Expand Down Expand Up @@ -351,7 +354,7 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
return dht, nil
}

func makeRtRefreshManager(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThreshold time.Duration) (*rtrefresh.RtRefreshManager, error) {
func makeRtRefreshManager(dht *KadDHT, cfg config, maxLastSuccessfulOutboundThreshold time.Duration) (*rtrefresh.RtRefreshManager, error) {
keyGenFnc := func(cpl uint) (string, error) {
p, err := dht.routingTable.GenRandPeerID(cpl)
return string(p), err
Expand All @@ -374,7 +377,7 @@ func makeRtRefreshManager(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThr
return r, err
}

func makeRoutingTable(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThreshold time.Duration) (*kb.RoutingTable, error) {
func makeRoutingTable(dht *KadDHT, cfg config, maxLastSuccessfulOutboundThreshold time.Duration) (*kb.RoutingTable, error) {
// make a Routing Table Diversity Filter
var filter *peerdiversity.Filter
if dht.rtPeerDiversityFilter != nil {
Expand Down Expand Up @@ -416,16 +419,16 @@ func makeRoutingTable(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThresho
}

// GetRoutingTableDiversityStats returns the diversity stats for the Routing Table.
func (dht *IpfsDHT) GetRoutingTableDiversityStats() []peerdiversity.CplDiversityStats {
func (dht *KadDHT) GetRoutingTableDiversityStats() []peerdiversity.CplDiversityStats {
return dht.routingTable.GetDiversityStats()
}

// Mode allows introspection of the operation mode of the DHT
func (dht *IpfsDHT) Mode() ModeOpt {
func (dht *KadDHT) Mode() ModeOpt {
return dht.auto
}

func (dht *IpfsDHT) populatePeers(_ goprocess.Process) {
func (dht *KadDHT) populatePeers(_ goprocess.Process) {
if !dht.disableFixLowPeers {
dht.fixLowPeers(dht.ctx)
}
Expand All @@ -442,7 +445,7 @@ func (dht *IpfsDHT) populatePeers(_ goprocess.Process) {
}

// fixLowPeersRouting manages simultaneous requests to fixLowPeers
func (dht *IpfsDHT) fixLowPeersRoutine(proc goprocess.Process) {
func (dht *KadDHT) fixLowPeersRoutine(proc goprocess.Process) {
ticker := time.NewTicker(periodicBootstrapInterval)
defer ticker.Stop()

Expand All @@ -460,7 +463,7 @@ func (dht *IpfsDHT) fixLowPeersRoutine(proc goprocess.Process) {
}

// fixLowPeers tries to get more peers into the routing table if we're below the threshold
func (dht *IpfsDHT) fixLowPeers(ctx context.Context) {
func (dht *KadDHT) fixLowPeers(ctx context.Context) {
if dht.routingTable.Size() > minRTRefreshThreshold {
return
}
Expand Down Expand Up @@ -519,7 +522,7 @@ func (dht *IpfsDHT) fixLowPeers(ctx context.Context) {

// TODO This is hacky, horrible and the programmer needs to have his mother called a hamster.
// SHOULD be removed once https://github.com/libp2p/go-libp2p/issues/800 goes in.
func (dht *IpfsDHT) persistRTPeersInPeerStore() {
func (dht *KadDHT) persistRTPeersInPeerStore() {
tickr := time.NewTicker(peerstore.RecentlyConnectedAddrTTL / 3)
defer tickr.Stop()

Expand All @@ -540,7 +543,7 @@ func (dht *IpfsDHT) persistRTPeersInPeerStore() {
//
// returns nil, nil when either nothing is found or the value found doesn't properly validate.
// returns nil, some_error when there's a *datastore* error (i.e., something goes very wrong)
func (dht *IpfsDHT) getLocal(key string) (*recpb.Record, error) {
func (dht *KadDHT) getLocal(key string) (*recpb.Record, error) {
logger.Debugw("finding value in datastore", "key", internal.LoggableRecordKeyString(key))

rec, err := dht.getRecordFromDatastore(mkDsKey(key))
Expand All @@ -559,7 +562,7 @@ func (dht *IpfsDHT) getLocal(key string) (*recpb.Record, error) {
}

// putLocal stores the key value pair in the datastore
func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error {
func (dht *KadDHT) putLocal(key string, rec *recpb.Record) error {
data, err := proto.Marshal(rec)
if err != nil {
logger.Warnw("failed to put marshal record for local put", "error", err, "key", internal.LoggableRecordKeyString(key))
Expand All @@ -569,7 +572,7 @@ func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error {
return dht.datastore.Put(mkDsKey(key), data)
}

func (dht *IpfsDHT) rtPeerLoop(proc goprocess.Process) {
func (dht *KadDHT) rtPeerLoop(proc goprocess.Process) {
bootstrapCount := 0
isBootsrapping := false
var timerCh <-chan time.Time
Expand Down Expand Up @@ -626,7 +629,7 @@ func (dht *IpfsDHT) rtPeerLoop(proc goprocess.Process) {
// LastUsefulAt remains unchanged
// If we connect to a peer we already have in the RT but do not exchange a query (rare)
// Do Nothing.
func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) {
func (dht *KadDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) {
if c := baseLogger.Check(zap.DebugLevel, "peer found"); c != nil {
c.Write(zap.String("peer", p.String()))
}
Expand All @@ -643,22 +646,22 @@ func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) {
}

// peerStoppedDHT signals the routing table that a peer is unable to responsd to DHT queries anymore.
func (dht *IpfsDHT) peerStoppedDHT(ctx context.Context, p peer.ID) {
func (dht *KadDHT) peerStoppedDHT(ctx context.Context, p peer.ID) {
logger.Debugw("peer stopped dht", "peer", p)
// A peer that does not support the DHT protocol is dead for us.
// There's no point in talking to anymore till it starts supporting the DHT protocol again.
dht.routingTable.RemovePeer(p)
}

func (dht *IpfsDHT) fixRTIfNeeded() {
func (dht *KadDHT) fixRTIfNeeded() {
select {
case dht.fixLowPeersChan <- struct{}{}:
default:
}
}

// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
func (dht *IpfsDHT) FindLocal(id peer.ID) peer.AddrInfo {
func (dht *KadDHT) FindLocal(id peer.ID) peer.AddrInfo {
switch dht.host.Network().Connectedness(id) {
case network.Connected, network.CanConnect:
return dht.peerstore.PeerInfo(id)
Expand All @@ -668,13 +671,13 @@ func (dht *IpfsDHT) FindLocal(id peer.ID) peer.AddrInfo {
}

// nearestPeersToQuery returns the routing tables closest peers.
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID {
func (dht *KadDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID {
closer := dht.routingTable.NearestPeers(kb.ConvertKey(string(pmes.GetKey())), count)
return closer
}

// betterPeersToQuery returns nearestPeersToQuery with some additional filtering
func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, from peer.ID, count int) []peer.ID {
func (dht *KadDHT) betterPeersToQuery(pmes *pb.Message, from peer.ID, count int) []peer.ID {
closer := dht.nearestPeersToQuery(pmes, count)

// no node? nil
Expand Down Expand Up @@ -703,7 +706,7 @@ func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, from peer.ID, count int
return filtered
}

func (dht *IpfsDHT) setMode(m mode) error {
func (dht *KadDHT) setMode(m mode) error {
dht.modeLk.Lock()
defer dht.modeLk.Unlock()

Expand All @@ -724,7 +727,7 @@ func (dht *IpfsDHT) setMode(m mode) error {
// moveToServerMode advertises (via libp2p identify updates) that we are able to respond to DHT queries and sets the appropriate stream handlers.
// Note: We may support responding to queries with protocols aside from our primary ones in order to support
// interoperability with older versions of the DHT protocol.
func (dht *IpfsDHT) moveToServerMode() error {
func (dht *KadDHT) moveToServerMode() error {
dht.mode = modeServer
for _, p := range dht.serverProtocols {
dht.host.SetStreamHandler(p, dht.handleNewStream)
Expand All @@ -737,7 +740,7 @@ func (dht *IpfsDHT) moveToServerMode() error {
// utilizing the handled protocols.
// Note: We may support responding to queries with protocols aside from our primary ones in order to support
// interoperability with older versions of the DHT protocol.
func (dht *IpfsDHT) moveToClientMode() error {
func (dht *KadDHT) moveToClientMode() error {
dht.mode = modeClient
for _, p := range dht.serverProtocols {
dht.host.RemoveStreamHandler(p)
Expand All @@ -760,29 +763,29 @@ func (dht *IpfsDHT) moveToClientMode() error {
return nil
}

func (dht *IpfsDHT) getMode() mode {
func (dht *KadDHT) getMode() mode {
dht.modeLk.Lock()
defer dht.modeLk.Unlock()
return dht.mode
}

// Context returns the DHT's context.
func (dht *IpfsDHT) Context() context.Context {
func (dht *KadDHT) Context() context.Context {
return dht.ctx
}

// Process returns the DHT's process.
func (dht *IpfsDHT) Process() goprocess.Process {
func (dht *KadDHT) Process() goprocess.Process {
return dht.proc
}

// RoutingTable returns the DHT's routingTable.
func (dht *IpfsDHT) RoutingTable() *kb.RoutingTable {
func (dht *KadDHT) RoutingTable() *kb.RoutingTable {
return dht.routingTable
}

// Close calls Process Close.
func (dht *IpfsDHT) Close() error {
func (dht *KadDHT) Close() error {
return dht.proc.Close()
}

Expand All @@ -791,29 +794,29 @@ func mkDsKey(s string) ds.Key {
}

// PeerID returns the DHT node's Peer ID.
func (dht *IpfsDHT) PeerID() peer.ID {
func (dht *KadDHT) PeerID() peer.ID {
return dht.self
}

// PeerKey returns a DHT key, converted from the DHT node's Peer ID.
func (dht *IpfsDHT) PeerKey() []byte {
func (dht *KadDHT) PeerKey() []byte {
return kb.ConvertPeerID(dht.self)
}

// Host returns the libp2p host this DHT is operating with.
func (dht *IpfsDHT) Host() host.Host {
func (dht *KadDHT) Host() host.Host {
return dht.host
}

// Ping sends a ping message to the passed peer and waits for a response.
func (dht *IpfsDHT) Ping(ctx context.Context, p peer.ID) error {
func (dht *KadDHT) Ping(ctx context.Context, p peer.ID) error {
return dht.protoMessenger.Ping(ctx, p)
}

// newContextWithLocalTags returns a new context.Context with the InstanceID and
// PeerID keys populated. It will also take any extra tags that need adding to
// the context as tag.Mutators.
func (dht *IpfsDHT) newContextWithLocalTags(ctx context.Context, extraTags ...tag.Mutator) context.Context {
func (dht *KadDHT) newContextWithLocalTags(ctx context.Context, extraTags ...tag.Mutator) context.Context {
extraTags = append(
extraTags,
tag.Upsert(metrics.KeyPeerID, dht.self.Pretty()),
Expand All @@ -826,7 +829,7 @@ func (dht *IpfsDHT) newContextWithLocalTags(ctx context.Context, extraTags ...ta
return ctx
}

func (dht *IpfsDHT) maybeAddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
func (dht *KadDHT) maybeAddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
// Don't add addresses for self or our connected peers. We have better ones.
if p == dht.self || dht.host.Network().Connectedness(p) == network.Connected {
return
Expand Down
6 changes: 3 additions & 3 deletions dht_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func GetDefaultBootstrapPeerAddrInfos() []peer.AddrInfo {

// Bootstrap tells the DHT to get into a bootstrapped state satisfying the
// IpfsRouter interface.
func (dht *IpfsDHT) Bootstrap(ctx context.Context) error {
func (dht *KadDHT) Bootstrap(ctx context.Context) error {
dht.fixRTIfNeeded()
dht.rtRefreshManager.RefreshNoWait()
return nil
Expand All @@ -67,7 +67,7 @@ func (dht *IpfsDHT) Bootstrap(ctx context.Context) error {
//
// The returned channel will block until the refresh finishes, then yield the
// error and close. The channel is buffered and safe to ignore.
func (dht *IpfsDHT) RefreshRoutingTable() <-chan error {
func (dht *KadDHT) RefreshRoutingTable() <-chan error {
return dht.rtRefreshManager.Refresh(false)
}

Expand All @@ -76,6 +76,6 @@ func (dht *IpfsDHT) RefreshRoutingTable() <-chan error {
//
// The returned channel will block until the refresh finishes, then yield the
// error and close. The channel is buffered and safe to ignore.
func (dht *IpfsDHT) ForceRefresh() <-chan error {
func (dht *KadDHT) ForceRefresh() <-chan error {
return dht.rtRefreshManager.Refresh(true)
}
Loading