Skip to content

Commit

Permalink
Upgrade DHT version (ipfs#479)
Browse files Browse the repository at this point in the history
* upgraded the protocol id to version 2 (i.e. /kad/2.0.0) and made it so v2 peers running in server mode respond to queries from v1 peers. Note: v2 peers will only send queries using the v2 protocol, will only add v2 peers to their routing tables, and will only tell v1 peers about v2 peers.
* to run a forked network we now use network specific protocol prefixes instead of manually setting protocol IDs. Use the ProtocolPrefix option instead of the Protocols option.
* emit errors during initialization if the user misuses the default protocol prefix by setting parameters inconsistent with the default protocol's network specification
* since the Client option has been deprecated it's been removed from the dht's options. While deprecated it is still available in the dht options package. Setting `Client(false)` now puts the node into ModeAuto.
  • Loading branch information
aschmahmann authored and Stebalien committed Apr 3, 2020
1 parent 30fa086 commit 07dff1f
Show file tree
Hide file tree
Showing 8 changed files with 295 additions and 103 deletions.
62 changes: 44 additions & 18 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ const (
modeClient = 2
)

const (
kad1 protocol.ID = "/kad/1.0.0"
kad2 protocol.ID = "/kad/2.0.0"
)

// IpfsDHT is an implementation of Kademlia with S/Kademlia modifications.
// It is used to implement the base Routing module.
type IpfsDHT struct {
Expand Down Expand Up @@ -73,7 +78,11 @@ type IpfsDHT struct {

stripedPutLocks [256]sync.Mutex

protocols []protocol.ID // DHT protocols
// Primary DHT protocols - we query and respond to these protocols
protocols []protocol.ID

// DHT protocols we can respond to (may contain protocols in addition to the primary protocols)
serverProtocols []protocol.ID

auto bool
mode mode
Expand Down Expand Up @@ -109,12 +118,16 @@ var (
// New creates a new DHT with the specified host and options.
func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error) {
var cfg config
if err := cfg.Apply(append([]Option{defaults}, options...)...); err != nil {
if err := cfg.apply(append([]Option{defaults}, options...)...); err != nil {
return nil, err
}
if cfg.disjointPaths == 0 {
cfg.disjointPaths = cfg.bucketSize / 2
if err := cfg.applyFallbacks(); err != nil {
return nil, err
}
if err := cfg.validate(); err != nil {
return nil, err
}

dht := makeDHT(ctx, h, cfg)
dht.autoRefresh = cfg.routingTable.autoRefresh
dht.rtRefreshPeriod = cfg.routingTable.refreshPeriod
Expand Down Expand Up @@ -175,7 +188,7 @@ func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
// requests. If you need a peer to respond to DHT requests, use NewDHT instead.
// NewDHTClient creates a new DHT object with the given peer as the 'local' host
func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
dht, err := New(ctx, h, Datastore(dstore), Client(true))
dht, err := New(ctx, h, Datastore(dstore), Mode(ModeClient))
if err != nil {
panic(err)
}
Expand All @@ -196,6 +209,19 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) *IpfsDHT {
cmgr.UntagPeer(p, "kbucket")
}

protocols := []protocol.ID{cfg.protocolPrefix + kad2}
serverProtocols := []protocol.ID{cfg.protocolPrefix + kad2, cfg.protocolPrefix + kad1}

// check if custom test protocols were set
if len(cfg.testProtocols) > 0 {
protocols = make([]protocol.ID, len(cfg.testProtocols))
serverProtocols = make([]protocol.ID, len(cfg.testProtocols))
for i, p := range cfg.testProtocols {
protocols[i] = cfg.protocolPrefix + p
serverProtocols[i] = cfg.protocolPrefix + p
}
}

dht := &IpfsDHT{
datastore: cfg.datastore,
self: h.ID(),
Expand All @@ -205,7 +231,8 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) *IpfsDHT {
birth: time.Now(),
rng: rand.New(rand.NewSource(rand.Int63())),
routingTable: rt,
protocols: cfg.protocols,
protocols: protocols,
serverProtocols: serverProtocols,
bucketSize: cfg.bucketSize,
alpha: cfg.concurrency,
d: cfg.disjointPaths,
Expand Down Expand Up @@ -483,22 +510,30 @@ func (dht *IpfsDHT) setMode(m mode) error {
}
}

// moveToServerMode advertises (via libp2p identify updates) that we are able to respond to DHT queries and sets the appropriate stream handlers.
// Note: We may support responding to queries with protocols aside from our primary ones in order to support
// interoperability with older versions of the DHT protocol.
func (dht *IpfsDHT) moveToServerMode() error {
dht.mode = modeServer
for _, p := range dht.protocols {
for _, p := range dht.serverProtocols {
dht.host.SetStreamHandler(p, dht.handleNewStream)
}
return nil
}

// moveToClientMode stops advertising (and rescinds advertisements via libp2p identify updates) that we are able to
// respond to DHT queries and removes the appropriate stream handlers. We also kill all inbound streams that were
// utilizing the handled protocols.
// Note: We may support responding to queries with protocols aside from our primary ones in order to support
// interoperability with older versions of the DHT protocol.
func (dht *IpfsDHT) moveToClientMode() error {
dht.mode = modeClient
for _, p := range dht.protocols {
for _, p := range dht.serverProtocols {
dht.host.RemoveStreamHandler(p)
}

pset := make(map[protocol.ID]bool)
for _, p := range dht.protocols {
for _, p := range dht.serverProtocols {
pset[p] = true
}

Expand Down Expand Up @@ -540,15 +575,6 @@ func (dht *IpfsDHT) Close() error {
return dht.proc.Close()
}

func (dht *IpfsDHT) protocolStrs() []string {
pstrs := make([]string, len(dht.protocols))
for idx, proto := range dht.protocols {
pstrs[idx] = string(proto)
}

return pstrs
}

func mkDsKey(s string) ds.Key {
return ds.NewKey(base32.RawStdEncoding.EncodeToString([]byte(s)))
}
Expand Down
3 changes: 3 additions & 0 deletions dht_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,9 @@ func (ms *messageSender) prep(ctx context.Context) error {
return nil
}

// We only want to speak to peers using our primary protocols. We do not want to query any peer that only speaks
// one of the secondary "server" protocols that we happen to support (e.g. older nodes that we can respond to for
// backwards compatibility reasons).
nstr, err := ms.dht.host.NewStream(ctx, ms.p, ms.dht.protocols...)
if err != nil {
return err
Expand Down
Loading

0 comments on commit 07dff1f

Please sign in to comment.