From e9e07e0489e67bfe8dbc06d7c861eae747fb6a88 Mon Sep 17 00:00:00 2001 From: endoffile78 Date: Mon, 11 Jan 2021 22:31:20 -0600 Subject: [PATCH] Rename IpfsDHT to KadDHT --- dht.go | 95 +++++++++++++++++++------------------ dht_bootstrap.go | 6 +-- dht_bootstrap_test.go | 16 +++---- dht_filters.go | 12 ++--- dht_net.go | 4 +- dht_options.go | 4 +- dht_test.go | 42 ++++++++-------- dual/dual.go | 6 +-- dual/dual_test.go | 10 ++-- handlers.go | 18 +++---- lookup.go | 2 +- query.go | 8 ++-- query_test.go | 2 +- records.go | 6 +-- routing.go | 30 ++++++------ rt_diversity_filter_test.go | 10 ++-- subscriber_notifee.go | 10 ++-- 17 files changed, 142 insertions(+), 139 deletions(-) diff --git a/dht.go b/dht.go index 7c89ab56a..2bef4aea0 100644 --- a/dht.go +++ b/dht.go @@ -71,9 +71,12 @@ type addPeerRTReq struct { queryPeer bool } -// IpfsDHT is an implementation of Kademlia with S/Kademlia modifications. +// IpfsDHT is an alias for KadDHT. +type IpfsDHT KadDHT + +// KadDHT is an implementation of Kademlia with S/Kademlia modifications. // It is used to implement the base Routing module. -type IpfsDHT struct { +type KadDHT struct { host host.Host // the network services we need self peer.ID // Local peer (yourself) selfKey kb.ID @@ -151,18 +154,18 @@ type IpfsDHT struct { // Assert that IPFS assumptions about interfaces aren't broken. These aren't a // guarantee, but we can use them to aid refactoring. var ( - _ routing.ContentRouting = (*IpfsDHT)(nil) - _ routing.Routing = (*IpfsDHT)(nil) - _ routing.PeerRouting = (*IpfsDHT)(nil) - _ routing.PubKeyFetcher = (*IpfsDHT)(nil) - _ routing.ValueStore = (*IpfsDHT)(nil) + _ routing.ContentRouting = (*KadDHT)(nil) + _ routing.Routing = (*KadDHT)(nil) + _ routing.PeerRouting = (*KadDHT)(nil) + _ routing.PubKeyFetcher = (*KadDHT)(nil) + _ routing.ValueStore = (*KadDHT)(nil) ) // New creates a new DHT with the specified host and options. // Please note that being connected to a DHT peer does not necessarily imply that it's also in the DHT Routing Table. // If the Routing Table has more than "minRTRefreshThreshold" peers, we consider a peer as a Routing Table candidate ONLY when // we successfully get a query response from it OR if it send us a query. -func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error) { +func New(ctx context.Context, h host.Host, options ...Option) (*KadDHT, error) { var cfg config if err := cfg.apply(append([]Option{defaults}, options...)...); err != nil { return nil, err @@ -244,9 +247,9 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error) } // NewDHT creates a new DHT object with the given peer as the 'local' host. -// IpfsDHT's initialized with this function will respond to DHT requests, -// whereas IpfsDHT's initialized with NewDHTClient will not. -func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT { +// KadDHT's initialized with this function will respond to DHT requests, +// whereas KadDHT's initialized with NewDHTClient will not. +func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *KadDHT { dht, err := New(ctx, h, Datastore(dstore)) if err != nil { panic(err) @@ -255,9 +258,9 @@ func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT { } // NewDHTClient creates a new DHT object with the given peer as the 'local' -// host. IpfsDHT clients initialized with this function will not respond to DHT +// host. KadDHT clients initialized with this function will not respond to DHT // requests. If you need a peer to respond to DHT requests, use NewDHT instead. -func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT { +func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *KadDHT { dht, err := New(ctx, h, Datastore(dstore), Mode(ModeClient)) if err != nil { panic(err) @@ -265,7 +268,7 @@ func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT return dht } -func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) { +func makeDHT(ctx context.Context, h host.Host, cfg config) (*KadDHT, error) { var protocols, serverProtocols []protocol.ID v1proto := cfg.protocolPrefix + kad1 @@ -277,7 +280,7 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) { protocols = []protocol.ID{v1proto} serverProtocols = []protocol.ID{v1proto} - dht := &IpfsDHT{ + dht := &KadDHT{ datastore: cfg.datastore, self: h.ID(), selfKey: kb.ConvertPeerID(h.ID()), @@ -351,7 +354,7 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) { return dht, nil } -func makeRtRefreshManager(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThreshold time.Duration) (*rtrefresh.RtRefreshManager, error) { +func makeRtRefreshManager(dht *KadDHT, cfg config, maxLastSuccessfulOutboundThreshold time.Duration) (*rtrefresh.RtRefreshManager, error) { keyGenFnc := func(cpl uint) (string, error) { p, err := dht.routingTable.GenRandPeerID(cpl) return string(p), err @@ -374,7 +377,7 @@ func makeRtRefreshManager(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThr return r, err } -func makeRoutingTable(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThreshold time.Duration) (*kb.RoutingTable, error) { +func makeRoutingTable(dht *KadDHT, cfg config, maxLastSuccessfulOutboundThreshold time.Duration) (*kb.RoutingTable, error) { // make a Routing Table Diversity Filter var filter *peerdiversity.Filter if dht.rtPeerDiversityFilter != nil { @@ -416,16 +419,16 @@ func makeRoutingTable(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThresho } // GetRoutingTableDiversityStats returns the diversity stats for the Routing Table. -func (dht *IpfsDHT) GetRoutingTableDiversityStats() []peerdiversity.CplDiversityStats { +func (dht *KadDHT) GetRoutingTableDiversityStats() []peerdiversity.CplDiversityStats { return dht.routingTable.GetDiversityStats() } // Mode allows introspection of the operation mode of the DHT -func (dht *IpfsDHT) Mode() ModeOpt { +func (dht *KadDHT) Mode() ModeOpt { return dht.auto } -func (dht *IpfsDHT) populatePeers(_ goprocess.Process) { +func (dht *KadDHT) populatePeers(_ goprocess.Process) { if !dht.disableFixLowPeers { dht.fixLowPeers(dht.ctx) } @@ -442,7 +445,7 @@ func (dht *IpfsDHT) populatePeers(_ goprocess.Process) { } // fixLowPeersRouting manages simultaneous requests to fixLowPeers -func (dht *IpfsDHT) fixLowPeersRoutine(proc goprocess.Process) { +func (dht *KadDHT) fixLowPeersRoutine(proc goprocess.Process) { ticker := time.NewTicker(periodicBootstrapInterval) defer ticker.Stop() @@ -460,7 +463,7 @@ func (dht *IpfsDHT) fixLowPeersRoutine(proc goprocess.Process) { } // fixLowPeers tries to get more peers into the routing table if we're below the threshold -func (dht *IpfsDHT) fixLowPeers(ctx context.Context) { +func (dht *KadDHT) fixLowPeers(ctx context.Context) { if dht.routingTable.Size() > minRTRefreshThreshold { return } @@ -519,7 +522,7 @@ func (dht *IpfsDHT) fixLowPeers(ctx context.Context) { // TODO This is hacky, horrible and the programmer needs to have his mother called a hamster. // SHOULD be removed once https://github.com/libp2p/go-libp2p/issues/800 goes in. -func (dht *IpfsDHT) persistRTPeersInPeerStore() { +func (dht *KadDHT) persistRTPeersInPeerStore() { tickr := time.NewTicker(peerstore.RecentlyConnectedAddrTTL / 3) defer tickr.Stop() @@ -540,7 +543,7 @@ func (dht *IpfsDHT) persistRTPeersInPeerStore() { // // returns nil, nil when either nothing is found or the value found doesn't properly validate. // returns nil, some_error when there's a *datastore* error (i.e., something goes very wrong) -func (dht *IpfsDHT) getLocal(key string) (*recpb.Record, error) { +func (dht *KadDHT) getLocal(key string) (*recpb.Record, error) { logger.Debugw("finding value in datastore", "key", internal.LoggableRecordKeyString(key)) rec, err := dht.getRecordFromDatastore(mkDsKey(key)) @@ -559,7 +562,7 @@ func (dht *IpfsDHT) getLocal(key string) (*recpb.Record, error) { } // putLocal stores the key value pair in the datastore -func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error { +func (dht *KadDHT) putLocal(key string, rec *recpb.Record) error { data, err := proto.Marshal(rec) if err != nil { logger.Warnw("failed to put marshal record for local put", "error", err, "key", internal.LoggableRecordKeyString(key)) @@ -569,7 +572,7 @@ func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error { return dht.datastore.Put(mkDsKey(key), data) } -func (dht *IpfsDHT) rtPeerLoop(proc goprocess.Process) { +func (dht *KadDHT) rtPeerLoop(proc goprocess.Process) { bootstrapCount := 0 isBootsrapping := false var timerCh <-chan time.Time @@ -626,7 +629,7 @@ func (dht *IpfsDHT) rtPeerLoop(proc goprocess.Process) { // LastUsefulAt remains unchanged // If we connect to a peer we already have in the RT but do not exchange a query (rare) // Do Nothing. -func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) { +func (dht *KadDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) { if c := baseLogger.Check(zap.DebugLevel, "peer found"); c != nil { c.Write(zap.String("peer", p.String())) } @@ -643,14 +646,14 @@ func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) { } // peerStoppedDHT signals the routing table that a peer is unable to responsd to DHT queries anymore. -func (dht *IpfsDHT) peerStoppedDHT(ctx context.Context, p peer.ID) { +func (dht *KadDHT) peerStoppedDHT(ctx context.Context, p peer.ID) { logger.Debugw("peer stopped dht", "peer", p) // A peer that does not support the DHT protocol is dead for us. // There's no point in talking to anymore till it starts supporting the DHT protocol again. dht.routingTable.RemovePeer(p) } -func (dht *IpfsDHT) fixRTIfNeeded() { +func (dht *KadDHT) fixRTIfNeeded() { select { case dht.fixLowPeersChan <- struct{}{}: default: @@ -658,7 +661,7 @@ func (dht *IpfsDHT) fixRTIfNeeded() { } // FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in. -func (dht *IpfsDHT) FindLocal(id peer.ID) peer.AddrInfo { +func (dht *KadDHT) FindLocal(id peer.ID) peer.AddrInfo { switch dht.host.Network().Connectedness(id) { case network.Connected, network.CanConnect: return dht.peerstore.PeerInfo(id) @@ -668,13 +671,13 @@ func (dht *IpfsDHT) FindLocal(id peer.ID) peer.AddrInfo { } // nearestPeersToQuery returns the routing tables closest peers. -func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID { +func (dht *KadDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID { closer := dht.routingTable.NearestPeers(kb.ConvertKey(string(pmes.GetKey())), count) return closer } // betterPeersToQuery returns nearestPeersToQuery with some additional filtering -func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, from peer.ID, count int) []peer.ID { +func (dht *KadDHT) betterPeersToQuery(pmes *pb.Message, from peer.ID, count int) []peer.ID { closer := dht.nearestPeersToQuery(pmes, count) // no node? nil @@ -703,7 +706,7 @@ func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, from peer.ID, count int return filtered } -func (dht *IpfsDHT) setMode(m mode) error { +func (dht *KadDHT) setMode(m mode) error { dht.modeLk.Lock() defer dht.modeLk.Unlock() @@ -724,7 +727,7 @@ func (dht *IpfsDHT) setMode(m mode) error { // moveToServerMode advertises (via libp2p identify updates) that we are able to respond to DHT queries and sets the appropriate stream handlers. // Note: We may support responding to queries with protocols aside from our primary ones in order to support // interoperability with older versions of the DHT protocol. -func (dht *IpfsDHT) moveToServerMode() error { +func (dht *KadDHT) moveToServerMode() error { dht.mode = modeServer for _, p := range dht.serverProtocols { dht.host.SetStreamHandler(p, dht.handleNewStream) @@ -737,7 +740,7 @@ func (dht *IpfsDHT) moveToServerMode() error { // utilizing the handled protocols. // Note: We may support responding to queries with protocols aside from our primary ones in order to support // interoperability with older versions of the DHT protocol. -func (dht *IpfsDHT) moveToClientMode() error { +func (dht *KadDHT) moveToClientMode() error { dht.mode = modeClient for _, p := range dht.serverProtocols { dht.host.RemoveStreamHandler(p) @@ -760,29 +763,29 @@ func (dht *IpfsDHT) moveToClientMode() error { return nil } -func (dht *IpfsDHT) getMode() mode { +func (dht *KadDHT) getMode() mode { dht.modeLk.Lock() defer dht.modeLk.Unlock() return dht.mode } // Context returns the DHT's context. -func (dht *IpfsDHT) Context() context.Context { +func (dht *KadDHT) Context() context.Context { return dht.ctx } // Process returns the DHT's process. -func (dht *IpfsDHT) Process() goprocess.Process { +func (dht *KadDHT) Process() goprocess.Process { return dht.proc } // RoutingTable returns the DHT's routingTable. -func (dht *IpfsDHT) RoutingTable() *kb.RoutingTable { +func (dht *KadDHT) RoutingTable() *kb.RoutingTable { return dht.routingTable } // Close calls Process Close. -func (dht *IpfsDHT) Close() error { +func (dht *KadDHT) Close() error { return dht.proc.Close() } @@ -791,29 +794,29 @@ func mkDsKey(s string) ds.Key { } // PeerID returns the DHT node's Peer ID. -func (dht *IpfsDHT) PeerID() peer.ID { +func (dht *KadDHT) PeerID() peer.ID { return dht.self } // PeerKey returns a DHT key, converted from the DHT node's Peer ID. -func (dht *IpfsDHT) PeerKey() []byte { +func (dht *KadDHT) PeerKey() []byte { return kb.ConvertPeerID(dht.self) } // Host returns the libp2p host this DHT is operating with. -func (dht *IpfsDHT) Host() host.Host { +func (dht *KadDHT) Host() host.Host { return dht.host } // Ping sends a ping message to the passed peer and waits for a response. -func (dht *IpfsDHT) Ping(ctx context.Context, p peer.ID) error { +func (dht *KadDHT) Ping(ctx context.Context, p peer.ID) error { return dht.protoMessenger.Ping(ctx, p) } // newContextWithLocalTags returns a new context.Context with the InstanceID and // PeerID keys populated. It will also take any extra tags that need adding to // the context as tag.Mutators. -func (dht *IpfsDHT) newContextWithLocalTags(ctx context.Context, extraTags ...tag.Mutator) context.Context { +func (dht *KadDHT) newContextWithLocalTags(ctx context.Context, extraTags ...tag.Mutator) context.Context { extraTags = append( extraTags, tag.Upsert(metrics.KeyPeerID, dht.self.Pretty()), @@ -826,7 +829,7 @@ func (dht *IpfsDHT) newContextWithLocalTags(ctx context.Context, extraTags ...ta return ctx } -func (dht *IpfsDHT) maybeAddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) { +func (dht *KadDHT) maybeAddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) { // Don't add addresses for self or our connected peers. We have better ones. if p == dht.self || dht.host.Network().Connectedness(p) == network.Connected { return diff --git a/dht_bootstrap.go b/dht_bootstrap.go index 72d133af5..7aaeb59c7 100644 --- a/dht_bootstrap.go +++ b/dht_bootstrap.go @@ -57,7 +57,7 @@ func GetDefaultBootstrapPeerAddrInfos() []peer.AddrInfo { // Bootstrap tells the DHT to get into a bootstrapped state satisfying the // IpfsRouter interface. -func (dht *IpfsDHT) Bootstrap(ctx context.Context) error { +func (dht *KadDHT) Bootstrap(ctx context.Context) error { dht.fixRTIfNeeded() dht.rtRefreshManager.RefreshNoWait() return nil @@ -67,7 +67,7 @@ func (dht *IpfsDHT) Bootstrap(ctx context.Context) error { // // The returned channel will block until the refresh finishes, then yield the // error and close. The channel is buffered and safe to ignore. -func (dht *IpfsDHT) RefreshRoutingTable() <-chan error { +func (dht *KadDHT) RefreshRoutingTable() <-chan error { return dht.rtRefreshManager.Refresh(false) } @@ -76,6 +76,6 @@ func (dht *IpfsDHT) RefreshRoutingTable() <-chan error { // // The returned channel will block until the refresh finishes, then yield the // error and close. The channel is buffered and safe to ignore. -func (dht *IpfsDHT) ForceRefresh() <-chan error { +func (dht *KadDHT) ForceRefresh() <-chan error { return dht.rtRefreshManager.Refresh(true) } diff --git a/dht_bootstrap_test.go b/dht_bootstrap_test.go index 8ea9c3609..bf0e82ea6 100644 --- a/dht_bootstrap_test.go +++ b/dht_bootstrap_test.go @@ -19,7 +19,7 @@ func TestSelfWalkOnAddressChange(t *testing.T) { d2 := setupDHT(ctx, t, false, DisableAutoRefresh()) d3 := setupDHT(ctx, t, false, DisableAutoRefresh()) - var connectedTo *IpfsDHT + var connectedTo *KadDHT // connect d1 to whoever is "further" if kb.CommonPrefixLen(kb.ConvertPeerID(d1.self), kb.ConvertPeerID(d2.self)) <= kb.CommonPrefixLen(kb.ConvertPeerID(d1.self), kb.ConvertPeerID(d3.self)) { @@ -34,14 +34,14 @@ func TestSelfWalkOnAddressChange(t *testing.T) { connect(t, ctx, d2, d3) // d1 should have ONLY 1 peer in it's RT - waitForWellFormedTables(t, []*IpfsDHT{d1}, 1, 1, 2*time.Second) + waitForWellFormedTables(t, []*KadDHT{d1}, 1, 1, 2*time.Second) require.Equal(t, connectedTo.self, d1.routingTable.ListPeers()[0]) // now emit the address change event em, err := d1.host.EventBus().Emitter(&event.EvtLocalAddressesUpdated{}) require.NoError(t, err) require.NoError(t, em.Emit(event.EvtLocalAddressesUpdated{})) - waitForWellFormedTables(t, []*IpfsDHT{d1}, 2, 2, 2*time.Second) + waitForWellFormedTables(t, []*KadDHT{d1}, 2, 2, 2*time.Second) // it should now have both peers in the RT ps := d1.routingTable.ListPeers() require.Contains(t, ps, d2.self) @@ -80,8 +80,8 @@ func TestBootstrappersReplacable(t *testing.T) { defer d.host.Close() defer d.Close() - var d1 *IpfsDHT - var d2 *IpfsDHT + var d1 *KadDHT + var d2 *KadDHT // d1 & d2 have a cpl of 0 for { @@ -108,8 +108,8 @@ func TestBootstrappersReplacable(t *testing.T) { require.Len(t, d.routingTable.ListPeers(), 2) // d3 & d4 with cpl=0 will go in as d1 & d2 are replacable. - var d3 *IpfsDHT - var d4 *IpfsDHT + var d3 *KadDHT + var d4 *KadDHT for { d3 = setupDHT(ctx, t, false, disableFixLowPeersRoutine(t)) @@ -142,7 +142,7 @@ func TestBootstrappersReplacable(t *testing.T) { time.Sleep(1 * time.Second) // adding d5 fails because RT is frozen - var d5 *IpfsDHT + var d5 *KadDHT for { d5 = setupDHT(ctx, t, false, disableFixLowPeersRoutine(t)) if kb.CommonPrefixLen(d.selfKey, d5.selfKey) == 0 { diff --git a/dht_filters.go b/dht_filters.go index 655b86f1d..d667c5b5e 100644 --- a/dht_filters.go +++ b/dht_filters.go @@ -17,11 +17,11 @@ import ( ) // QueryFilterFunc is a filter applied when considering peers to dial when querying -type QueryFilterFunc func(dht *IpfsDHT, ai peer.AddrInfo) bool +type QueryFilterFunc func(dht *KadDHT, ai peer.AddrInfo) bool // RouteTableFilterFunc is a filter applied when considering connections to keep in // the local route table. -type RouteTableFilterFunc func(dht *IpfsDHT, conns []network.Conn) bool +type RouteTableFilterFunc func(dht *KadDHT, conns []network.Conn) bool var publicCIDR6 = "2000::/3" var public6 *net.IPNet @@ -59,7 +59,7 @@ func isPrivateAddr(a ma.Multiaddr) bool { } // PublicQueryFilter returns true if the peer is suspected of being publicly accessible -func PublicQueryFilter(_ *IpfsDHT, ai peer.AddrInfo) bool { +func PublicQueryFilter(_ *KadDHT, ai peer.AddrInfo) bool { if len(ai.Addrs) == 0 { return false } @@ -77,7 +77,7 @@ var _ QueryFilterFunc = PublicQueryFilter // PublicRoutingTableFilter allows a peer to be added to the routing table if the connections to that peer indicate // that it is on a public network -func PublicRoutingTableFilter(dht *IpfsDHT, conns []network.Conn) bool { +func PublicRoutingTableFilter(dht *KadDHT, conns []network.Conn) bool { if len(conns) == 0 { return false } @@ -97,7 +97,7 @@ func PublicRoutingTableFilter(dht *IpfsDHT, conns []network.Conn) bool { var _ RouteTableFilterFunc = PublicRoutingTableFilter // PrivateQueryFilter doens't currently restrict which peers we are willing to query from the local DHT. -func PrivateQueryFilter(dht *IpfsDHT, ai peer.AddrInfo) bool { +func PrivateQueryFilter(dht *KadDHT, ai peer.AddrInfo) bool { return len(ai.Addrs) > 0 } @@ -137,7 +137,7 @@ func getCachedRouter() routing.Router { // PrivateRoutingTableFilter allows a peer to be added to the routing table if the connections to that peer indicate // that it is on a private network -func PrivateRoutingTableFilter(dht *IpfsDHT, conns []network.Conn) bool { +func PrivateRoutingTableFilter(dht *KadDHT, conns []network.Conn) bool { router := getCachedRouter() myAdvertisedIPs := make([]net.IP, 0) for _, a := range dht.Host().Addrs() { diff --git a/dht_net.go b/dht_net.go index 278216625..839f07027 100644 --- a/dht_net.go +++ b/dht_net.go @@ -60,7 +60,7 @@ func (w *bufferedDelimitedWriter) Flush() error { } // handleNewStream implements the network.StreamHandler -func (dht *IpfsDHT) handleNewStream(s network.Stream) { +func (dht *KadDHT) handleNewStream(s network.Stream) { if dht.handleNewMessage(s) { // If we exited without error, close gracefully. _ = s.Close() @@ -71,7 +71,7 @@ func (dht *IpfsDHT) handleNewStream(s network.Stream) { } // Returns true on orderly completion of writes (so we can Close the stream). -func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool { +func (dht *KadDHT) handleNewMessage(s network.Stream) bool { ctx := dht.ctx r := msgio.NewVarintReaderSize(s, network.MessageSizeMax) diff --git a/dht_options.go b/dht_options.go index 820145451..edf28329f 100644 --- a/dht_options.go +++ b/dht_options.go @@ -71,8 +71,8 @@ type config struct { testAddressUpdateProcessing bool } -func emptyQueryFilter(_ *IpfsDHT, ai peer.AddrInfo) bool { return true } -func emptyRTFilter(_ *IpfsDHT, conns []network.Conn) bool { return true } +func emptyQueryFilter(_ *KadDHT, ai peer.AddrInfo) bool { return true } +func emptyRTFilter(_ *KadDHT, conns []network.Conn) bool { return true } // apply applies the given options to this Option func (c *config) apply(opts ...Option) error { diff --git a/dht_test.go b/dht_test.go index 74835c8e2..43dc88e45 100644 --- a/dht_test.go +++ b/dht_test.go @@ -103,7 +103,7 @@ func (testAtomicPutValidator) Select(_ string, bs [][]byte) (int, error) { var testPrefix = ProtocolPrefix("/test") -func setupDHT(ctx context.Context, t *testing.T, client bool, options ...Option) *IpfsDHT { +func setupDHT(ctx context.Context, t *testing.T, client bool, options ...Option) *KadDHT { baseOpts := []Option{ testPrefix, NamespacedValidator("v", blankValidator{}), @@ -127,9 +127,9 @@ func setupDHT(ctx context.Context, t *testing.T, client bool, options ...Option) return d } -func setupDHTS(t *testing.T, ctx context.Context, n int, options ...Option) []*IpfsDHT { +func setupDHTS(t *testing.T, ctx context.Context, n int, options ...Option) []*KadDHT { addrs := make([]ma.Multiaddr, n) - dhts := make([]*IpfsDHT, n) + dhts := make([]*KadDHT, n) peers := make([]peer.ID, n) sanityAddrsMap := make(map[string]struct{}) @@ -155,7 +155,7 @@ func setupDHTS(t *testing.T, ctx context.Context, n int, options ...Option) []*I return dhts } -func connectNoSync(t *testing.T, ctx context.Context, a, b *IpfsDHT) { +func connectNoSync(t *testing.T, ctx context.Context, a, b *KadDHT) { t.Helper() idB := b.self @@ -171,7 +171,7 @@ func connectNoSync(t *testing.T, ctx context.Context, a, b *IpfsDHT) { } } -func wait(t *testing.T, ctx context.Context, a, b *IpfsDHT) { +func wait(t *testing.T, ctx context.Context, a, b *KadDHT) { t.Helper() // loop until connection notification has been received. @@ -185,14 +185,14 @@ func wait(t *testing.T, ctx context.Context, a, b *IpfsDHT) { } } -func connect(t *testing.T, ctx context.Context, a, b *IpfsDHT) { +func connect(t *testing.T, ctx context.Context, a, b *KadDHT) { t.Helper() connectNoSync(t, ctx, a, b) wait(t, ctx, a, b) wait(t, ctx, b, a) } -func bootstrap(t *testing.T, ctx context.Context, dhts []*IpfsDHT) { +func bootstrap(t *testing.T, ctx context.Context, dhts []*KadDHT) { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -263,7 +263,7 @@ func TestValueGetSet(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - var dhts [5]*IpfsDHT + var dhts [5]*KadDHT for i := range dhts { dhts[i] = setupDHT(ctx, t, false) @@ -671,7 +671,7 @@ func TestLocalProvides(t *testing.T) { } // if minPeers or avgPeers is 0, dont test for it. -func waitForWellFormedTables(t *testing.T, dhts []*IpfsDHT, minPeers, avgPeers int, timeout time.Duration) { +func waitForWellFormedTables(t *testing.T, dhts []*KadDHT, minPeers, avgPeers int, timeout time.Duration) { // test "well-formed-ness" (>= minPeers peers in every routing table) t.Helper() @@ -709,7 +709,7 @@ func waitForWellFormedTables(t *testing.T, dhts []*IpfsDHT, minPeers, avgPeers i } } -func printRoutingTables(dhts []*IpfsDHT) { +func printRoutingTables(dhts []*KadDHT) { // the routing tables should be full now. let's inspect them. fmt.Printf("checking routing table of %d\n", len(dhts)) for _, dht := range dhts { @@ -804,7 +804,7 @@ func TestRefreshBelowMinRTThreshold(t *testing.T) { // we ONLY init bootstrap on A dhtA.RefreshRoutingTable() // and wait for one round to complete i.e. A should be connected to both B & C - waitForWellFormedTables(t, []*IpfsDHT{dhtA}, 2, 2, 20*time.Second) + waitForWellFormedTables(t, []*KadDHT{dhtA}, 2, 2, 20*time.Second) // now we create two new peers dhtD := setupDHT(ctx, t, false) @@ -826,7 +826,7 @@ func TestRefreshBelowMinRTThreshold(t *testing.T) { connect(t, ctx, dhtA, dhtD) // and because of the above bootstrap, A also discovers E ! - waitForWellFormedTables(t, []*IpfsDHT{dhtA}, 4, 4, 20*time.Second) + waitForWellFormedTables(t, []*KadDHT{dhtA}, 4, 4, 20*time.Second) assert.Equal(t, dhtE.self, dhtA.routingTable.Find(dhtE.self), "A's routing table should have peer E!") } @@ -907,7 +907,7 @@ func TestPeriodicRefresh(t *testing.T) { var wg sync.WaitGroup for _, dht := range dhts { wg.Add(1) - go func(d *IpfsDHT) { + go func(d *KadDHT) { <-d.RefreshRoutingTable() wg.Done() }(dht) @@ -983,7 +983,7 @@ func TestProvidesMany(t *testing.T) { defer cancel() var wg sync.WaitGroup - getProvider := func(dht *IpfsDHT, k cid.Cid) { + getProvider := func(dht *KadDHT, k cid.Cid) { defer wg.Done() expected := providers[k] @@ -1422,7 +1422,7 @@ func testFindPeerQuery(t *testing.T, var wg sync.WaitGroup for _, dht := range dhts { wg.Add(1) - go func(d *IpfsDHT) { + go func(d *KadDHT) { <-d.RefreshRoutingTable() wg.Done() }(dht) @@ -1516,7 +1516,7 @@ func TestFixLowPeers(t *testing.T) { require.NoError(t, mainD.Host().Connect(ctx, peer.AddrInfo{ID: d.self})) } - waitForWellFormedTables(t, []*IpfsDHT{mainD}, minRTRefreshThreshold, minRTRefreshThreshold+4, 5*time.Second) + waitForWellFormedTables(t, []*KadDHT{mainD}, minRTRefreshThreshold, minRTRefreshThreshold+4, 5*time.Second) // run a refresh on all of them for _, d := range dhts { @@ -1530,7 +1530,7 @@ func TestFixLowPeers(t *testing.T) { } // but we will still get enough peers in the RT because of fix low Peers - waitForWellFormedTables(t, []*IpfsDHT{mainD}, minRTRefreshThreshold, minRTRefreshThreshold, 5*time.Second) + waitForWellFormedTables(t, []*KadDHT{mainD}, minRTRefreshThreshold, minRTRefreshThreshold, 5*time.Second) } func TestProvideDisabled(t *testing.T) { @@ -1625,19 +1625,19 @@ func TestHandleRemotePeerProtocolChanges(t *testing.T) { connect(t, ctx, dhtA, dhtB) // now assert both have each other in their RT - waitForWellFormedTables(t, []*IpfsDHT{dhtA, dhtB}, 1, 1, 10*time.Second) + waitForWellFormedTables(t, []*KadDHT{dhtA, dhtB}, 1, 1, 10*time.Second) // dhtB becomes a client require.NoError(t, dhtB.setMode(modeClient)) // which means that dhtA should evict it from it's RT - waitForWellFormedTables(t, []*IpfsDHT{dhtA}, 0, 0, 10*time.Second) + waitForWellFormedTables(t, []*KadDHT{dhtA}, 0, 0, 10*time.Second) // dhtB becomes a server require.NoError(t, dhtB.setMode(modeServer)) // which means dhtA should have it in the RT again because of fixLowPeers - waitForWellFormedTables(t, []*IpfsDHT{dhtA}, 1, 1, 10*time.Second) + waitForWellFormedTables(t, []*KadDHT{dhtA}, 1, 1, 10*time.Second) } func TestGetSetPluggedProtocol(t *testing.T) { @@ -1876,7 +1876,7 @@ func TestV1ProtocolOverride(t *testing.T) { d3 := setupDHT(ctx, t, false, V1ProtocolOverride("/myproto2")) d4 := setupDHT(ctx, t, false) - dhts := []*IpfsDHT{d1, d2, d3, d4} + dhts := []*KadDHT{d1, d2, d3, d4} for i, dout := range dhts { for _, din := range dhts[i+1:] { diff --git a/dual/dual.go b/dual/dual.go index df88dcd14..4de0fbbfa 100644 --- a/dual/dual.go +++ b/dual/dual.go @@ -26,8 +26,8 @@ import ( // DHT implements the routing interface to provide two concrete DHT implementationts for use // in IPFS that are used to support both global network users and disjoint LAN usecases. type DHT struct { - WAN *dht.IpfsDHT - LAN *dht.IpfsDHT + WAN *dht.KadDHT + LAN *dht.KadDHT } // LanExtension is used to differentiate local protocol requests from those on the WAN DHT. @@ -90,7 +90,7 @@ func DHTOption(opts ...dht.Option) Option { } // New creates a new DualDHT instance. Options provided are forwarded on to the two concrete -// IpfsDHT internal constructions, modulo additional options used by the Dual DHT to enforce +// KadDHT internal constructions, modulo additional options used by the Dual DHT to enforce // the LAN-vs-WAN distinction. // Note: query or routing table functional options provided as arguments to this function // will be overriden by this constructor. diff --git a/dual/dual_test.go b/dual/dual_test.go index 323ff1169..53322873f 100644 --- a/dual/dual_test.go +++ b/dual/dual_test.go @@ -33,9 +33,9 @@ type customRtHelper struct { allow peer.ID } -func MkFilterForPeer() (func(d *dht.IpfsDHT, conns []network.Conn) bool, *customRtHelper) { +func MkFilterForPeer() (func(d *dht.KadDHT, conns []network.Conn) bool, *customRtHelper) { helper := customRtHelper{} - f := func(_ *dht.IpfsDHT, conns []network.Conn) bool { + f := func(_ *dht.KadDHT, conns []network.Conn) bool { for _, c := range conns { if c.RemotePeer() == helper.allow { return true @@ -98,7 +98,7 @@ func setupDHT(ctx context.Context, t *testing.T, options ...dht.Option) *DHT { return d } -func connect(ctx context.Context, t *testing.T, a, b *dht.IpfsDHT) { +func connect(ctx context.Context, t *testing.T, a, b *dht.KadDHT) { t.Helper() bid := b.PeerID() baddr := b.Host().Peerstore().Addrs(bid) @@ -112,7 +112,7 @@ func connect(ctx context.Context, t *testing.T, a, b *dht.IpfsDHT) { wait(ctx, t, a, b) } -func wait(ctx context.Context, t *testing.T, a, b *dht.IpfsDHT) { +func wait(ctx context.Context, t *testing.T, a, b *dht.KadDHT) { t.Helper() for a.RoutingTable().Find(b.PeerID()) == "" { //fmt.Fprintf(os.Stderr, "%v\n", a.RoutingTable().GetPeerInfos()) @@ -124,7 +124,7 @@ func wait(ctx context.Context, t *testing.T, a, b *dht.IpfsDHT) { } } -func setupTier(ctx context.Context, t *testing.T) (*DHT, *dht.IpfsDHT, *dht.IpfsDHT) { +func setupTier(ctx context.Context, t *testing.T) (*DHT, *dht.KadDHT, *dht.KadDHT) { t.Helper() baseOpts := []dht.Option{ dht.NamespacedValidator("v", blankValidator{}), diff --git a/handlers.go b/handlers.go index 5160232c0..dcc7990bf 100644 --- a/handlers.go +++ b/handlers.go @@ -23,7 +23,7 @@ import ( // dhthandler specifies the signature of functions that handle DHT messages. type dhtHandler func(context.Context, peer.ID, *pb.Message) (*pb.Message, error) -func (dht *IpfsDHT) handlerForMsgType(t pb.Message_MessageType) dhtHandler { +func (dht *KadDHT) handlerForMsgType(t pb.Message_MessageType) dhtHandler { switch t { case pb.Message_FIND_NODE: return dht.handleFindPeer @@ -52,7 +52,7 @@ func (dht *IpfsDHT) handlerForMsgType(t pb.Message_MessageType) dhtHandler { return nil } -func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Message) (_ *pb.Message, err error) { +func (dht *KadDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Message) (_ *pb.Message, err error) { // first, is there even a key? k := pmes.GetKey() if len(k) == 0 { @@ -90,7 +90,7 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess return resp, nil } -func (dht *IpfsDHT) checkLocalDatastore(k []byte) (*recpb.Record, error) { +func (dht *KadDHT) checkLocalDatastore(k []byte) (*recpb.Record, error) { logger.Debugf("%s handleGetValue looking into ds", dht.self) dskey := convertToDsKey(k) buf, err := dht.datastore.Get(dskey) @@ -149,7 +149,7 @@ func cleanRecord(rec *recpb.Record) { } // Store a value in this peer local storage -func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Message) (_ *pb.Message, err error) { +func (dht *KadDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Message) (_ *pb.Message, err error) { if len(pmes.GetKey()) == 0 { return nil, errors.New("handleGetValue but no key was provided") } @@ -220,7 +220,7 @@ func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Mess // returns nil, nil when either nothing is found or the value found doesn't properly validate. // returns nil, some_error when there's a *datastore* error (i.e., something goes very wrong) -func (dht *IpfsDHT) getRecordFromDatastore(dskey ds.Key) (*recpb.Record, error) { +func (dht *KadDHT) getRecordFromDatastore(dskey ds.Key) (*recpb.Record, error) { buf, err := dht.datastore.Get(dskey) if err == ds.ErrNotFound { return nil, nil @@ -248,12 +248,12 @@ func (dht *IpfsDHT) getRecordFromDatastore(dskey ds.Key) (*recpb.Record, error) return rec, nil } -func (dht *IpfsDHT) handlePing(_ context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) { +func (dht *KadDHT) handlePing(_ context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) { logger.Debugf("%s Responding to ping from %s!\n", dht.self, p) return pmes, nil } -func (dht *IpfsDHT) handleFindPeer(ctx context.Context, from peer.ID, pmes *pb.Message) (_ *pb.Message, _err error) { +func (dht *KadDHT) handleFindPeer(ctx context.Context, from peer.ID, pmes *pb.Message) (_ *pb.Message, _err error) { resp := pb.NewMessage(pmes.GetType(), nil, pmes.GetClusterLevel()) var closest []peer.ID @@ -307,7 +307,7 @@ func (dht *IpfsDHT) handleFindPeer(ctx context.Context, from peer.ID, pmes *pb.M return resp, nil } -func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb.Message) (_ *pb.Message, _err error) { +func (dht *KadDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb.Message) (_ *pb.Message, _err error) { key := pmes.GetKey() if len(key) > 80 { return nil, fmt.Errorf("handleGetProviders key size too large") @@ -337,7 +337,7 @@ func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb. return resp, nil } -func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.Message) (_ *pb.Message, _err error) { +func (dht *KadDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.Message) (_ *pb.Message, _err error) { key := pmes.GetKey() if len(key) > 80 { return nil, fmt.Errorf("handleAddProvider key size too large") diff --git a/lookup.go b/lookup.go index dff8bb244..a45531346 100644 --- a/lookup.go +++ b/lookup.go @@ -16,7 +16,7 @@ import ( // // If the context is canceled, this function will return the context error along // with the closest K peers it has found so far. -func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan peer.ID, error) { +func (dht *KadDHT) GetClosestPeers(ctx context.Context, key string) (<-chan peer.ID, error) { if key == "" { return nil, fmt.Errorf("can't lookup empty key") } diff --git a/query.go b/query.go index 47d87df05..bc80530ec 100644 --- a/query.go +++ b/query.go @@ -35,7 +35,7 @@ type query struct { // the query context. ctx context.Context - dht *IpfsDHT + dht *KadDHT // seedPeers is the set of peers that seed the query seedPeers []peer.ID @@ -76,7 +76,7 @@ type lookupWithFollowupResult struct { // // After the lookup is complete the query function is run (unless stopped) against all of the top K peers from the // lookup that have not already been successfully queried. -func (dht *IpfsDHT) runLookupWithFollowup(ctx context.Context, target string, queryFn queryFn, stopFn stopFn) (*lookupWithFollowupResult, error) { +func (dht *KadDHT) runLookupWithFollowup(ctx context.Context, target string, queryFn queryFn, stopFn stopFn) (*lookupWithFollowupResult, error) { // run the query lookupRes, err := dht.runQuery(ctx, target, queryFn, stopFn) if err != nil { @@ -145,7 +145,7 @@ processFollowUp: return lookupRes, nil } -func (dht *IpfsDHT) runQuery(ctx context.Context, target string, queryFn queryFn, stopFn stopFn) (*lookupWithFollowupResult, error) { +func (dht *KadDHT) runQuery(ctx context.Context, target string, queryFn queryFn, stopFn stopFn) (*lookupWithFollowupResult, error) { // pick the K closest peers to the key in our Routing table. targetKadID := kb.ConvertKey(target) seedPeers := dht.routingTable.NearestPeers(targetKadID, dht.bucketSize) @@ -492,7 +492,7 @@ func (q *query) updateState(ctx context.Context, up *queryUpdate) { } } -func (dht *IpfsDHT) dialPeer(ctx context.Context, p peer.ID) error { +func (dht *KadDHT) dialPeer(ctx context.Context, p peer.ID) error { // short-circuit if we're already connected. if dht.host.Network().Connectedness(p) == network.Connected { return nil diff --git a/query_test.go b/query_test.go index e79af514f..bc96a21bd 100644 --- a/query_test.go +++ b/query_test.go @@ -111,7 +111,7 @@ func TestRTAdditionOnSuccessfulQuery(t *testing.T) { })) } -func checkRoutingTable(a, b *IpfsDHT) bool { +func checkRoutingTable(a, b *KadDHT) bool { // loop until connection notification has been received. // under high load, this may not happen as immediately as we would like. return a.routingTable.Find(b.self) != "" && b.routingTable.Find(a.self) != "" diff --git a/records.go b/records.go index bba505080..b6fd4ec5a 100644 --- a/records.go +++ b/records.go @@ -17,7 +17,7 @@ type pubkrs struct { // GetPublicKey gets the public key when given a Peer ID. It will extract from // the Peer ID if inlined or ask the node it belongs to or ask the DHT. -func (dht *IpfsDHT) GetPublicKey(ctx context.Context, p peer.ID) (ci.PubKey, error) { +func (dht *KadDHT) GetPublicKey(ctx context.Context, p peer.ID) (ci.PubKey, error) { logger.Debugf("getPublicKey for: %s", p) // Check locally. Will also try to extract the public key from the peer @@ -68,7 +68,7 @@ func (dht *IpfsDHT) GetPublicKey(ctx context.Context, p peer.ID) (ci.PubKey, err return nil, err } -func (dht *IpfsDHT) getPublicKeyFromDHT(ctx context.Context, p peer.ID) (ci.PubKey, error) { +func (dht *KadDHT) getPublicKeyFromDHT(ctx context.Context, p peer.ID) (ci.PubKey, error) { // Only retrieve one value, because the public key is immutable // so there's no need to retrieve multiple versions pkkey := routing.KeyForPublicKey(p) @@ -89,7 +89,7 @@ func (dht *IpfsDHT) getPublicKeyFromDHT(ctx context.Context, p peer.ID) (ci.PubK return pubk, nil } -func (dht *IpfsDHT) getPublicKeyFromNode(ctx context.Context, p peer.ID) (ci.PubKey, error) { +func (dht *KadDHT) getPublicKeyFromNode(ctx context.Context, p peer.ID) (ci.PubKey, error) { // check locally, just in case... pk := dht.peerstore.PubKey(p) if pk != nil { diff --git a/routing.go b/routing.go index d14e3845f..36549fc1b 100644 --- a/routing.go +++ b/routing.go @@ -21,13 +21,13 @@ import ( "github.com/multiformats/go-multihash" ) -// This file implements the Routing interface for the IpfsDHT struct. +// This file implements the Routing interface for the KadDHT struct. // Basic Put/Get // PutValue adds value corresponding to given Key. // This is the top level "Store" operation of the DHT -func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts ...routing.Option) (err error) { +func (dht *KadDHT) PutValue(ctx context.Context, key string, value []byte, opts ...routing.Option) (err error) { if !dht.enableValues { return routing.ErrNotSupported } @@ -99,7 +99,7 @@ type RecvdVal struct { } // GetValue searches for the value corresponding to given Key. -func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...routing.Option) (_ []byte, err error) { +func (dht *KadDHT) GetValue(ctx context.Context, key string, opts ...routing.Option) (_ []byte, err error) { if !dht.enableValues { return nil, routing.ErrNotSupported } @@ -133,7 +133,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...routing.Op } // SearchValue searches for the value corresponding to given Key and streams the results. -func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) { +func (dht *KadDHT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) { if !dht.enableValues { return nil, routing.ErrNotSupported } @@ -181,7 +181,7 @@ func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing return out, nil } -func (dht *IpfsDHT) searchValueQuorum(ctx context.Context, key string, valCh <-chan RecvdVal, stopCh chan struct{}, +func (dht *KadDHT) searchValueQuorum(ctx context.Context, key string, valCh <-chan RecvdVal, stopCh chan struct{}, out chan<- []byte, nvals int) ([]byte, map[peer.ID]struct{}, bool) { numResponses := 0 return dht.processValues(ctx, key, valCh, @@ -204,7 +204,7 @@ func (dht *IpfsDHT) searchValueQuorum(ctx context.Context, key string, valCh <-c } // GetValues gets nvals values corresponding to the given key. -func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []RecvdVal, err error) { +func (dht *KadDHT) GetValues(ctx context.Context, key string, nvals int) (_ []RecvdVal, err error) { if !dht.enableValues { return nil, routing.ErrNotSupported } @@ -224,7 +224,7 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []R return out, ctx.Err() } -func (dht *IpfsDHT) processValues(ctx context.Context, key string, vals <-chan RecvdVal, +func (dht *KadDHT) processValues(ctx context.Context, key string, vals <-chan RecvdVal, newVal func(ctx context.Context, v RecvdVal, better bool) bool) (best []byte, peersWithBest map[peer.ID]struct{}, aborted bool) { loop: for { @@ -267,7 +267,7 @@ loop: return } -func (dht *IpfsDHT) updatePeerValues(ctx context.Context, key string, val []byte, peers []peer.ID) { +func (dht *KadDHT) updatePeerValues(ctx context.Context, key string, val []byte, peers []peer.ID) { fixupRec := record.MakePutRecord(key, val) for _, p := range peers { go func(p peer.ID) { @@ -289,7 +289,7 @@ func (dht *IpfsDHT) updatePeerValues(ctx context.Context, key string, val []byte } } -func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan struct{}) (<-chan RecvdVal, <-chan *lookupWithFollowupResult) { +func (dht *KadDHT) getValues(ctx context.Context, key string, stopQuery chan struct{}) (<-chan RecvdVal, <-chan *lookupWithFollowupResult) { valCh := make(chan RecvdVal, 1) lookupResCh := make(chan *lookupWithFollowupResult, 1) @@ -380,7 +380,7 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan st return valCh, lookupResCh } -func (dht *IpfsDHT) refreshRTIfNoShortcut(key kb.ID, lookupRes *lookupWithFollowupResult) { +func (dht *KadDHT) refreshRTIfNoShortcut(key kb.ID, lookupRes *lookupWithFollowupResult) { if lookupRes.completed { // refresh the cpl for this key as the query was successful dht.routingTable.ResetCplRefreshedAtForID(key, time.Now()) @@ -392,7 +392,7 @@ func (dht *IpfsDHT) refreshRTIfNoShortcut(key kb.ID, lookupRes *lookupWithFollow // locations of the value, similarly to Coral and Mainline DHT. // Provide makes this node announce that it can provide a value for the given key -func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err error) { +func (dht *KadDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err error) { if !dht.enableProviders { return routing.ErrNotSupported } else if !key.Defined() { @@ -464,7 +464,7 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err } // FindProviders searches until the context expires. -func (dht *IpfsDHT) FindProviders(ctx context.Context, c cid.Cid) ([]peer.AddrInfo, error) { +func (dht *KadDHT) FindProviders(ctx context.Context, c cid.Cid) ([]peer.AddrInfo, error) { if !dht.enableProviders { return nil, routing.ErrNotSupported } else if !c.Defined() { @@ -483,7 +483,7 @@ func (dht *IpfsDHT) FindProviders(ctx context.Context, c cid.Cid) ([]peer.AddrIn // the search query completes. If count is zero then the query will run until it // completes. Note: not reading from the returned channel may block the query // from progressing. -func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo { +func (dht *KadDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo { if !dht.enableProviders || !key.Defined() { peerOut := make(chan peer.AddrInfo) close(peerOut) @@ -503,7 +503,7 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count i return peerOut } -func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, peerOut chan peer.AddrInfo) { +func (dht *KadDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, peerOut chan peer.AddrInfo) { defer close(peerOut) findAll := count == 0 @@ -589,7 +589,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash } // FindPeer searches for a peer with given ID. -func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, err error) { +func (dht *KadDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, err error) { if err := id.Validate(); err != nil { return peer.AddrInfo{}, err } diff --git a/rt_diversity_filter_test.go b/rt_diversity_filter_test.go index d439e863a..9d1f1d079 100644 --- a/rt_diversity_filter_test.go +++ b/rt_diversity_filter_test.go @@ -68,8 +68,8 @@ func TestRoutingTableEndToEndMaxPerCpl(t *testing.T) { ) require.NoError(t, err) - var d2 *IpfsDHT - var d3 *IpfsDHT + var d2 *KadDHT + var d3 *KadDHT for { d2 = setupDHT(ctx, t, false) @@ -125,15 +125,15 @@ func TestRoutingTableEndToEndMaxPerTable(t *testing.T) { // only 3 peers per prefix for the table. d2 := setupDHT(ctx, t, false, DisableAutoRefresh()) connect(t, ctx, d, d2) - waitForWellFormedTables(t, []*IpfsDHT{d}, 1, 1, 1*time.Second) + waitForWellFormedTables(t, []*KadDHT{d}, 1, 1, 1*time.Second) d3 := setupDHT(ctx, t, false, DisableAutoRefresh()) connect(t, ctx, d, d3) - waitForWellFormedTables(t, []*IpfsDHT{d}, 2, 2, 1*time.Second) + waitForWellFormedTables(t, []*KadDHT{d}, 2, 2, 1*time.Second) d4 := setupDHT(ctx, t, false, DisableAutoRefresh()) connect(t, ctx, d, d4) - waitForWellFormedTables(t, []*IpfsDHT{d}, 3, 3, 1*time.Second) + waitForWellFormedTables(t, []*KadDHT{d}, 3, 3, 1*time.Second) d5 := setupDHT(ctx, t, false, DisableAutoRefresh()) connectNoSync(t, ctx, d, d5) diff --git a/subscriber_notifee.go b/subscriber_notifee.go index 7cc9018f7..169b342bf 100644 --- a/subscriber_notifee.go +++ b/subscriber_notifee.go @@ -17,11 +17,11 @@ import ( // identification events to trigger inclusion in the routing table, and we consume Disconnected events to eject peers // from it. type subscriberNotifee struct { - dht *IpfsDHT + dht *KadDHT subs event.Subscription } -func newSubscriberNotifiee(dht *IpfsDHT) (*subscriberNotifee, error) { +func newSubscriberNotifiee(dht *KadDHT) (*subscriberNotifee, error) { bufSize := eventbus.BufSize(256) evts := []interface{}{ @@ -102,7 +102,7 @@ func (nn *subscriberNotifee) subscribe(proc goprocess.Process) { } } -func handlePeerChangeEvent(dht *IpfsDHT, p peer.ID) { +func handlePeerChangeEvent(dht *KadDHT, p peer.ID) { valid, err := dht.validRTPeer(p) if err != nil { logger.Errorf("could not check peerstore for protocol support: err: %s", err) @@ -115,7 +115,7 @@ func handlePeerChangeEvent(dht *IpfsDHT, p peer.ID) { } } -func handleLocalReachabilityChangedEvent(dht *IpfsDHT, e event.EvtLocalReachabilityChanged) { +func handleLocalReachabilityChangedEvent(dht *KadDHT, e event.EvtLocalReachabilityChanged) { var target mode switch e.Reachability { @@ -145,7 +145,7 @@ func handleLocalReachabilityChangedEvent(dht *IpfsDHT, e event.EvtLocalReachabil // validRTPeer returns true if the peer supports the DHT protocol and false otherwise. Supporting the DHT protocol means // supporting the primary protocols, we do not want to add peers that are speaking obsolete secondary protocols to our // routing table -func (dht *IpfsDHT) validRTPeer(p peer.ID) (bool, error) { +func (dht *KadDHT) validRTPeer(p peer.ID) (bool, error) { b, err := dht.peerstore.FirstSupportedProtocol(p, dht.protocolsStrs...) if len(b) == 0 || err != nil { return false, err