Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade DHT version #479

Merged
merged 9 commits into from
Mar 10, 2020
62 changes: 44 additions & 18 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ const (
modeClient = 2
)

const (
kad1 protocol.ID = "/kad/1.0.0"
kad2 protocol.ID = "/kad/2.0.0"
)

// IpfsDHT is an implementation of Kademlia with S/Kademlia modifications.
// It is used to implement the base Routing module.
type IpfsDHT struct {
Expand Down Expand Up @@ -73,7 +78,11 @@ type IpfsDHT struct {

stripedPutLocks [256]sync.Mutex

protocols []protocol.ID // DHT protocols
// Primary DHT protocols - we query and respond to these protocols
protocols []protocol.ID

// DHT protocols we can respond to (may contain protocols in addition to the primary protocols)
serverProtocols []protocol.ID
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably also need kad1Protocol and kad2Protocol for figuring out how we should respond to peers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM, we can tackle that once we actually have different protocol handlers to use (hopefully won't be too long 🙏)


auto bool
mode mode
Expand Down Expand Up @@ -109,12 +118,16 @@ var (
// New creates a new DHT with the specified host and options.
func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error) {
var cfg config
if err := cfg.Apply(append([]Option{defaults}, options...)...); err != nil {
if err := cfg.apply(append([]Option{defaults}, options...)...); err != nil {
return nil, err
}
if cfg.disjointPaths == 0 {
cfg.disjointPaths = cfg.bucketSize / 2
if err := cfg.applyFallbacks(); err != nil {
return nil, err
}
if err := cfg.validate(); err != nil {
return nil, err
}

dht := makeDHT(ctx, h, cfg)
dht.autoRefresh = cfg.routingTable.autoRefresh
dht.rtRefreshPeriod = cfg.routingTable.refreshPeriod
Expand Down Expand Up @@ -175,7 +188,7 @@ func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
// requests. If you need a peer to respond to DHT requests, use NewDHT instead.
// NewDHTClient creates a new DHT object with the given peer as the 'local' host
func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
dht, err := New(ctx, h, Datastore(dstore), Client(true))
dht, err := New(ctx, h, Datastore(dstore), Mode(ModeClient))
if err != nil {
panic(err)
}
Expand All @@ -196,6 +209,19 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) *IpfsDHT {
cmgr.UntagPeer(p, "kbucket")
}

protocols := []protocol.ID{cfg.protocolPrefix + kad2}
serverProtocols := []protocol.ID{cfg.protocolPrefix + kad2, cfg.protocolPrefix + kad1}

// check if custom test protocols were set
if len(cfg.testProtocols) > 0 {
protocols = make([]protocol.ID, len(cfg.testProtocols))
serverProtocols = make([]protocol.ID, len(cfg.testProtocols))
for i, p := range cfg.testProtocols {
protocols[i] = cfg.protocolPrefix + p
serverProtocols[i] = cfg.protocolPrefix + p
}
}

dht := &IpfsDHT{
datastore: cfg.datastore,
self: h.ID(),
Expand All @@ -205,7 +231,8 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) *IpfsDHT {
birth: time.Now(),
rng: rand.New(rand.NewSource(rand.Int63())),
routingTable: rt,
protocols: cfg.protocols,
protocols: protocols,
serverProtocols: serverProtocols,
bucketSize: cfg.bucketSize,
alpha: cfg.concurrency,
d: cfg.disjointPaths,
Expand Down Expand Up @@ -483,22 +510,30 @@ func (dht *IpfsDHT) setMode(m mode) error {
}
}

// moveToServerMode advertises (via libp2p identify updates) that we are able to respond to DHT queries and sets the appropriate stream handlers.
// Note: We may support responding to queries with protocols aside from our primary ones in order to support
// interoperability with older versions of the DHT protocol.
func (dht *IpfsDHT) moveToServerMode() error {
dht.mode = modeServer
for _, p := range dht.protocols {
for _, p := range dht.serverProtocols {
dht.host.SetStreamHandler(p, dht.handleNewStream)
}
return nil
}

// moveToClientMode stops advertising (and rescinds advertisements via libp2p identify updates) that we are able to
// respond to DHT queries and removes the appropriate stream handlers. We also kill all inbound streams that were
// utilizing the handled protocols.
// Note: We may support responding to queries with protocols aside from our primary ones in order to support
// interoperability with older versions of the DHT protocol.
func (dht *IpfsDHT) moveToClientMode() error {
dht.mode = modeClient
for _, p := range dht.protocols {
for _, p := range dht.serverProtocols {
dht.host.RemoveStreamHandler(p)
}

pset := make(map[protocol.ID]bool)
for _, p := range dht.protocols {
for _, p := range dht.serverProtocols {
pset[p] = true
}

Expand Down Expand Up @@ -540,15 +575,6 @@ func (dht *IpfsDHT) Close() error {
return dht.proc.Close()
}

func (dht *IpfsDHT) protocolStrs() []string {
pstrs := make([]string, len(dht.protocols))
for idx, proto := range dht.protocols {
pstrs[idx] = string(proto)
}

return pstrs
}

func mkDsKey(s string) ds.Key {
return ds.NewKey(base32.RawStdEncoding.EncodeToString([]byte(s)))
}
Expand Down
3 changes: 3 additions & 0 deletions dht_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,9 @@ func (ms *messageSender) prep(ctx context.Context) error {
return nil
}

// We only want to speak to peers using our primary protocols. We do not want to query any peer that only speaks
// one of the secondary "server" protocols that we happen to support (e.g. older nodes that we can respond to for
// backwards compatibility reasons).
nstr, err := ms.dht.host.NewStream(ctx, ms.p, ms.dht.protocols...)
if err != nil {
return err
Expand Down
Loading