diff --git a/dht.go b/dht.go index da3c5cc33..316ecb401 100644 --- a/dht.go +++ b/dht.go @@ -29,9 +29,12 @@ import ( logging "github.com/ipfs/go-log" goprocess "github.com/jbenet/goprocess" goprocessctx "github.com/jbenet/goprocess/context" + circuit "github.com/libp2p/go-libp2p-circuit" kb "github.com/libp2p/go-libp2p-kbucket" record "github.com/libp2p/go-libp2p-record" recpb "github.com/libp2p/go-libp2p-record/pb" + ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr-net" base32 "github.com/whyrusleeping/base32" ) @@ -274,8 +277,82 @@ func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error { // Update signals the routingTable to Update its last-seen status // on the given peer. func (dht *IpfsDHT) Update(ctx context.Context, p peer.ID) { - logger.Event(ctx, "updatePeer", p) - dht.routingTable.Update(p) + conns := dht.host.Network().ConnsToPeer(p) + switch len(conns) { + default: + logger.Warning("unclear if we should add peer with multiple open connections to us to our routing table") + case 0: + logger.Error("called update on a peer with no connection") + case 1: + dht.UpdateConn(ctx, conns[0]) + } +} + +func (dht *IpfsDHT) UpdateConn(ctx context.Context, c network.Conn) { + if dht.shouldAddPeerToRoutingTable(c) { + logger.Event(ctx, "updatePeer", c.RemotePeer()) + dht.routingTable.Update(c.RemotePeer()) + } +} + +func (dht *IpfsDHT) shouldAddPeerToRoutingTable(c network.Conn) bool { + if isRelayAddr(c.RemoteMultiaddr()) { + return false + } + if c.Stat().Direction == network.DirOutbound { + // we established this connection, so they're definitely diallable. + return true + } + + ai := dht.host.Peerstore().PeerInfo(c.RemotePeer()) + if dht.peerIsOnSameSubnet(c) { + // TODO: for now, we can't easily tell if the peer on our subnet + // is dialable or not, so don't discriminate. + + // We won't return these peers in queries unless the requester's + // remote addr is also private. + return len(ai.Addrs) > 0 + } + + return dht.isPubliclyRoutable(ai) +} + +func (dht *IpfsDHT) isPubliclyRoutable(ai peer.AddrInfo) bool { + if len(ai.Addrs) == 0 { + return false + } + + var hasPublicAddr bool + for _, a := range ai.Addrs { + if isRelayAddr(a) { + return false + } + if manet.IsPublicAddr(a) { + hasPublicAddr = true + } + } + return hasPublicAddr +} + +// taken from go-libp2p/p2p/host/relay +func isRelayAddr(a ma.Multiaddr) bool { + isRelay := false + + ma.ForEach(a, func(c ma.Component) bool { + switch c.Protocol().Code { + case circuit.P_CIRCUIT: + isRelay = true + return false + default: + return true + } + }) + + return isRelay +} + +func (dht *IpfsDHT) peerIsOnSameSubnet(c network.Conn) bool { + return manet.IsPrivateAddr(c.RemoteMultiaddr()) } // FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in. diff --git a/dht_net.go b/dht_net.go index 685b155be..ae9a23e0c 100644 --- a/dht_net.go +++ b/dht_net.go @@ -139,7 +139,7 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool { return false } - dht.updateFromMessage(ctx, mPeer, &req) + dht.updateFromMessage(ctx, s.Conn(), &req) if resp == nil { continue @@ -179,7 +179,7 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message } // update the peer (on valid msgs only) - dht.updateFromMessage(ctx, p, rpmes) + dht.updateFromMessage(ctx, ms.s.Conn(), rpmes) stats.Record( ctx, @@ -218,11 +218,11 @@ func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message return nil } -func (dht *IpfsDHT) updateFromMessage(ctx context.Context, p peer.ID, mes *pb.Message) error { +func (dht *IpfsDHT) updateFromMessage(ctx context.Context, c network.Conn, mes *pb.Message) error { // Make sure that this node is actually a DHT server, not just a client. - protos, err := dht.peerstore.SupportsProtocols(p, dht.protocolStrs()...) + protos, err := dht.peerstore.SupportsProtocols(c.RemotePeer(), dht.protocolStrs()...) if err == nil && len(protos) > 0 { - dht.Update(ctx, p) + dht.UpdateConn(ctx, c) } return nil } diff --git a/ext_test.go b/ext_test.go index cb0ac7634..d7754bb98 100644 --- a/ext_test.go +++ b/ext_test.go @@ -33,6 +33,9 @@ func TestGetFailures(t *testing.T) { if err != nil { t.Fatal(err) } + + // TODO: replace with identify event bus event + time.Sleep(time.Millisecond * 100) d.Update(ctx, hosts[1].ID()) // Reply with failures to every message @@ -302,6 +305,8 @@ func TestMultipleQueries(t *testing.T) { t.Fatal(err) } + // TODO: need to wait for Identify Event from event bus + time.Sleep(time.Millisecond * 100) d.Update(ctx, hosts[1].ID()) // It would be nice to be able to just get a value and succeed but then diff --git a/go.mod b/go.mod index 1cbc5a07e..dec761f87 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/ipfs/go-todocounter v0.0.1 github.com/jbenet/goprocess v0.1.3 github.com/libp2p/go-libp2p v0.1.0 + github.com/libp2p/go-libp2p-circuit v0.1.0 github.com/libp2p/go-libp2p-core v0.0.1 github.com/libp2p/go-libp2p-kbucket v0.2.0 github.com/libp2p/go-libp2p-peerstore v0.1.0 @@ -21,6 +22,7 @@ require ( github.com/mr-tron/base58 v1.1.2 github.com/multiformats/go-multiaddr v0.0.4 github.com/multiformats/go-multiaddr-dns v0.0.2 + github.com/multiformats/go-multiaddr-net v0.0.1 github.com/multiformats/go-multistream v0.1.0 github.com/stretchr/testify v1.3.0 github.com/whyrusleeping/base32 v0.0.0-20170828182744-c30ac30633cc diff --git a/go.sum b/go.sum index bd8387422..f1d2fc11f 100644 --- a/go.sum +++ b/go.sum @@ -111,6 +111,7 @@ github.com/libp2p/go-libp2p v0.1.0/go.mod h1:6D/2OBauqLUoqcADOJpn9WbKqvaM07tDw68 github.com/libp2p/go-libp2p-autonat v0.1.0/go.mod h1:1tLf2yXxiE/oKGtDwPYWTSYG3PtvYlJmg7NeVtPRqH8= github.com/libp2p/go-libp2p-blankhost v0.1.1 h1:X919sCh+KLqJcNRApj43xCSiQRYqOSI88Fdf55ngf78= github.com/libp2p/go-libp2p-blankhost v0.1.1/go.mod h1:pf2fvdLJPsC1FsVrNP3DUUvMzUts2dsLLBEpo1vW1ro= +github.com/libp2p/go-libp2p-circuit v0.1.0 h1:eniLL3Y9aq/sryfyV1IAHj5rlvuyj3b7iz8tSiZpdhY= github.com/libp2p/go-libp2p-circuit v0.1.0/go.mod h1:Ahq4cY3V9VJcHcn1SBXjr78AbFkZeIRmfunbA7pmFh8= github.com/libp2p/go-libp2p-core v0.0.1 h1:HSTZtFIq/W5Ue43Zw+uWZyy2Vl5WtF0zDjKN8/DT/1I= github.com/libp2p/go-libp2p-core v0.0.1/go.mod h1:g/VxnTZ/1ygHxH3dKok7Vno1VfpvGcGip57wjTU4fco= diff --git a/notif.go b/notif.go index 3af758492..9d1d15c93 100644 --- a/notif.go +++ b/notif.go @@ -32,7 +32,7 @@ func (nn *netNotifiee) Connected(n network.Network, v network.Conn) { dht.plk.Lock() defer dht.plk.Unlock() if dht.host.Network().Connectedness(p) == network.Connected { - dht.Update(dht.Context(), p) + dht.UpdateConn(dht.Context(), v) } return } @@ -71,7 +71,7 @@ func (nn *netNotifiee) testConnection(v network.Conn) { dht.plk.Lock() defer dht.plk.Unlock() if dht.host.Network().Connectedness(p) == network.Connected { - dht.Update(dht.Context(), p) + dht.UpdateConn(dht.Context(), v) } }