Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dht mode toggling (modulo dynamic switching) #350

Merged
merged 10 commits into from
Jun 26, 2019
67 changes: 67 additions & 0 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ var logger = logging.Logger("dht")
// collect members of the routing table.
const NumBootstrapQueries = 5

type DHTMode int

const (
ModeServer = DHTMode(1)
ModeClient = DHTMode(2)
)

// IpfsDHT is an implementation of Kademlia with S/Kademlia modifications.
// It is used to implement the base Routing module.
type IpfsDHT struct {
Expand All @@ -66,6 +73,9 @@ type IpfsDHT struct {
plk sync.Mutex

protocols []protocol.ID // DHT protocols

mode DHTMode
modeLk sync.Mutex
}

// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
Expand Down Expand Up @@ -97,8 +107,10 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er

dht.proc.AddChild(dht.providers.Process())
dht.Validator = cfg.Validator
dht.mode = ModeClient

if !cfg.Client {
dht.mode = ModeServer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just call modeToServer?

for _, p := range cfg.Protocols {
h.SetStreamHandler(p, dht.handleNewStream)
}
Expand Down Expand Up @@ -365,6 +377,61 @@ func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, p peer.ID, count int) [
return filtered
}

func (dht *IpfsDHT) SetMode(m DHTMode) error {
dht.modeLk.Lock()
defer dht.modeLk.Unlock()

if m == dht.mode {
return nil
}

switch m {
case ModeServer:
return dht.moveToServerMode()
case ModeClient:
return dht.moveToClientMode()
default:
raulk marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("unrecognized dht mode: %d", m)
}
}

func (dht *IpfsDHT) moveToServerMode() error {
dht.mode = ModeServer
for _, p := range dht.protocols {
dht.host.SetStreamHandler(p, dht.handleNewStream)
}
return nil
}

func (dht *IpfsDHT) moveToClientMode() error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be fine but I'd feel safer if we checked if we were in client mode in the stream handler, just in case.

dht.mode = ModeClient
for _, p := range dht.protocols {
dht.host.RemoveStreamHandler(p)
}

pset := make(map[protocol.ID]bool)
for _, p := range dht.protocols {
pset[p] = true
}

for _, c := range dht.host.Network().Conns() {
for _, s := range c.GetStreams() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re: backwards-compatibility. This is where the tricky part comes in. This drops streams for peers in our routing table. It will not invite the peer to drop us from theirs. So they’ll keep querying us, and we’ll “na” all their negotiations. We’ll basically become an unhelpful peer taking up a slot in their table, unless we disconnect to trigger the removal.

On a related note, there seems to be a race between identify and the DHT notifee. Even if we disconnect and reconnect, if the DHT notifee runs before identify finishes, we might be deciding on stale protocols: https://github.com/libp2p/go-libp2p-kad-dht/blob/master/notif.go#L27

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah... the backwards compatibility bit kinda sucks. I don't know that a nice backwards compatible solution exists aside from hard disconnecting from those peers.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd much rather track streams manually.

if pset[s.Protocol()] {
if s.Stat().Direction == network.DirInbound {
s.Reset()
}
}
}
}
return nil
}

func (dht *IpfsDHT) getMode() DHTMode {
dht.modeLk.Lock()
defer dht.modeLk.Unlock()
return dht.mode
}

// Context return dht's context
func (dht *IpfsDHT) Context() context.Context {
return dht.ctx
Expand Down
18 changes: 18 additions & 0 deletions dht_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/libp2p/go-libp2p-kad-dht/metrics"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
msmux "github.com/multiformats/go-multistream"

ggio "github.com/gogo/protobuf/io"

Expand Down Expand Up @@ -79,6 +80,11 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
defer timer.Stop()

for {
if dht.getMode() != ModeServer {
logger.Errorf("responding to dht message while not in server mode")
whyrusleeping marked this conversation as resolved.
Show resolved Hide resolved
return false
}

var req pb.Message
switch err := r.ReadMsg(&req); err {
case io.EOF:
Expand Down Expand Up @@ -153,6 +159,9 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message

ms, err := dht.messageSenderForPeer(ctx, p)
if err != nil {
if err == msmux.ErrNotSupported {
raulk marked this conversation as resolved.
Show resolved Hide resolved
dht.RoutingTable().Remove(p)
}
stats.Record(ctx, metrics.SentRequestErrors.M(1))
return nil, err
}
Expand All @@ -161,6 +170,9 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message

rpmes, err := ms.SendRequest(ctx, pmes)
if err != nil {
if err == msmux.ErrNotSupported {
dht.RoutingTable().Remove(p)
}
stats.Record(ctx, metrics.SentRequestErrors.M(1))
return nil, err
}
Expand All @@ -187,11 +199,17 @@ func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message

ms, err := dht.messageSenderForPeer(ctx, p)
if err != nil {
if err == msmux.ErrNotSupported {
dht.RoutingTable().Remove(p)
}
stats.Record(ctx, metrics.SentMessageErrors.M(1))
return err
}

if err := ms.SendMessage(ctx, pmes); err != nil {
if err == msmux.ErrNotSupported {
dht.RoutingTable().Remove(p)
}
stats.Record(ctx, metrics.SentMessageErrors.M(1))
return err
}
Expand Down