Skip to content

Commit

Permalink
migrate to consolidated types. (#344)
Browse files Browse the repository at this point in the history
  • Loading branch information
raulk authored May 26, 2019
1 parent 978eca5 commit 3176535
Show file tree
Hide file tree
Showing 22 changed files with 283 additions and 318 deletions.
31 changes: 16 additions & 15 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@ import (
"sync"
"time"

"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-core/routing"

"go.opencensus.io/tag"
"golang.org/x/xerrors"

Expand All @@ -22,15 +29,9 @@ import (
logging "github.com/ipfs/go-log"
goprocess "github.com/jbenet/goprocess"
goprocessctx "github.com/jbenet/goprocess/context"
host "github.com/libp2p/go-libp2p-host"
kb "github.com/libp2p/go-libp2p-kbucket"
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
protocol "github.com/libp2p/go-libp2p-protocol"
record "github.com/libp2p/go-libp2p-record"
recpb "github.com/libp2p/go-libp2p-record/pb"
routing "github.com/libp2p/go-libp2p-routing"
base32 "github.com/whyrusleeping/base32"
)

Expand All @@ -41,11 +42,11 @@ var logger = logging.Logger("dht")
const NumBootstrapQueries = 5

// IpfsDHT is an implementation of Kademlia with S/Kademlia modifications.
// It is used to implement the base IpfsRouting module.
// It is used to implement the base Routing module.
type IpfsDHT struct {
host host.Host // the network services we need
self peer.ID // Local peer (yourself)
peerstore pstore.Peerstore // Peer Registry
host host.Host // the network services we need
self peer.ID // Local peer (yourself)
peerstore peerstore.Peerstore // Peer Registry

datastore ds.Datastore // Local data

Expand All @@ -71,7 +72,7 @@ type IpfsDHT struct {
// guarantee, but we can use them to aid refactoring.
var (
_ routing.ContentRouting = (*IpfsDHT)(nil)
_ routing.IpfsRouting = (*IpfsDHT)(nil)
_ routing.Routing = (*IpfsDHT)(nil)
_ routing.PeerRouting = (*IpfsDHT)(nil)
_ routing.PubKeyFetcher = (*IpfsDHT)(nil)
_ routing.ValueStore = (*IpfsDHT)(nil)
Expand Down Expand Up @@ -182,7 +183,7 @@ var errInvalidRecord = errors.New("received invalid record")
// key. It returns either the value or a list of closer peers.
// NOTE: It will update the dht's peerstore with any new addresses
// it finds for the given peer.
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, key string) (*recpb.Record, []*pstore.PeerInfo, error) {
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, key string) (*recpb.Record, []*peer.AddrInfo, error) {

pmes, err := dht.getValueSingle(ctx, p, key)
if err != nil {
Expand Down Expand Up @@ -278,12 +279,12 @@ func (dht *IpfsDHT) Update(ctx context.Context, p peer.ID) {
}

// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
func (dht *IpfsDHT) FindLocal(id peer.ID) pstore.PeerInfo {
func (dht *IpfsDHT) FindLocal(id peer.ID) peer.AddrInfo {
switch dht.host.Network().Connectedness(id) {
case inet.Connected, inet.CanConnect:
case network.Connected, network.CanConnect:
return dht.peerstore.PeerInfo(id)
default:
return pstore.PeerInfo{}
return peer.AddrInfo{}
}
}

Expand Down
10 changes: 5 additions & 5 deletions dht_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import (
"fmt"
"time"

"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/routing"

u "github.com/ipfs/go-ipfs-util"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
routing "github.com/libp2p/go-libp2p-routing"
multiaddr "github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-multiaddr"
_ "github.com/multiformats/go-multiaddr-dns"
)

Expand Down Expand Up @@ -112,7 +112,7 @@ func newRandomPeerId() peer.ID {
}

// Traverse the DHT toward the given ID.
func (dht *IpfsDHT) walk(ctx context.Context, target peer.ID) (pstore.PeerInfo, error) {
func (dht *IpfsDHT) walk(ctx context.Context, target peer.ID) (peer.AddrInfo, error) {
// TODO: Extract the query action (traversal logic?) inside FindPeer,
// don't actually call through the FindPeer machinery, which can return
// things out of the peer store etc.
Expand Down
27 changes: 16 additions & 11 deletions dht_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,15 @@ import (
"sync"
"time"

ggio "github.com/gogo/protobuf/io"
"github.com/libp2p/go-libp2p-core/helpers"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"

"github.com/libp2p/go-libp2p-kad-dht/metrics"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"

ggio "github.com/gogo/protobuf/io"

"go.opencensus.io/stats"
"go.opencensus.io/tag"
)
Expand Down Expand Up @@ -55,8 +59,8 @@ func (w *bufferedDelimitedWriter) Flush() error {
return w.Writer.Flush()
}

// handleNewStream implements the inet.StreamHandler
func (dht *IpfsDHT) handleNewStream(s inet.Stream) {
// handleNewStream implements the network.StreamHandler
func (dht *IpfsDHT) handleNewStream(s network.Stream) {
defer s.Reset()
if dht.handleNewMessage(s) {
// Gracefully close the stream for writes.
Expand All @@ -65,9 +69,10 @@ func (dht *IpfsDHT) handleNewStream(s inet.Stream) {
}

// Returns true on orderly completion of writes (so we can Close the stream).
func (dht *IpfsDHT) handleNewMessage(s inet.Stream) bool {
func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
ctx := dht.ctx
r := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
r := ggio.NewDelimitedReader(s, network.MessageSizeMax)

mPeer := s.Conn().RemotePeer()

timer := time.AfterFunc(dhtStreamIdleTimeout, func() { s.Reset() })
Expand Down Expand Up @@ -242,7 +247,7 @@ func (dht *IpfsDHT) messageSenderForPeer(ctx context.Context, p peer.ID) (*messa
}

type messageSender struct {
s inet.Stream
s network.Stream
r ggio.ReadCloser
lk sync.Mutex
p peer.ID
Expand Down Expand Up @@ -286,7 +291,7 @@ func (ms *messageSender) prep(ctx context.Context) error {
return err
}

ms.r = ggio.NewDelimitedReader(nstr, inet.MessageSizeMax)
ms.r = ggio.NewDelimitedReader(nstr, network.MessageSizeMax)
ms.s = nstr

return nil
Expand Down Expand Up @@ -322,7 +327,7 @@ func (ms *messageSender) SendMessage(ctx context.Context, pmes *pb.Message) erro
logger.Event(ctx, "dhtSentMessage", ms.dht.self, ms.p, pmes)

if ms.singleMes > streamReuseTries {
go inet.FullClose(ms.s)
go helpers.FullClose(ms.s)
ms.s = nil
} else if retry {
ms.singleMes++
Expand Down Expand Up @@ -371,7 +376,7 @@ func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb
logger.Event(ctx, "dhtSentMessage", ms.dht.self, ms.p, pmes)

if ms.singleMes > streamReuseTries {
go inet.FullClose(ms.s)
go helpers.FullClose(ms.s)
ms.s = nil
} else if retry {
ms.singleMes++
Expand Down
29 changes: 15 additions & 14 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ import (
"testing"
"time"

"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/routing"

multistream "github.com/multiformats/go-multistream"

"golang.org/x/xerrors"
Expand All @@ -25,14 +29,11 @@ import (
cid "github.com/ipfs/go-cid"
u "github.com/ipfs/go-ipfs-util"
kb "github.com/libp2p/go-libp2p-kbucket"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
record "github.com/libp2p/go-libp2p-record"
routing "github.com/libp2p/go-libp2p-routing"
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
ci "github.com/libp2p/go-libp2p-testing/ci"
travisci "github.com/libp2p/go-libp2p-testing/ci/travis"
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
ci "github.com/libp2p/go-testutil/ci"
travisci "github.com/libp2p/go-testutil/ci/travis"
ma "github.com/multiformats/go-multiaddr"
)

Expand Down Expand Up @@ -127,8 +128,8 @@ func connectNoSync(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
t.Fatal("peers setup incorrectly: no local address")
}

a.peerstore.AddAddrs(idB, addrB, pstore.TempAddrTTL)
pi := pstore.PeerInfo{ID: idB}
a.peerstore.AddAddrs(idB, addrB, peerstore.TempAddrTTL)
pi := peer.AddrInfo{ID: idB}
if err := a.host.Connect(ctx, pi); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1012,7 +1013,7 @@ func TestFindPeersConnectedToPeer(t *testing.T) {
}

// shouldFind := []peer.ID{peers[1], peers[3]}
var found []*pstore.PeerInfo
var found []*peer.AddrInfo
for nextp := range pchan {
found = append(found, nextp)
}
Expand Down Expand Up @@ -1056,14 +1057,14 @@ func TestConnectCollision(t *testing.T) {

errs := make(chan error)
go func() {
dhtA.peerstore.AddAddr(peerB, addrB, pstore.TempAddrTTL)
pi := pstore.PeerInfo{ID: peerB}
dhtA.peerstore.AddAddr(peerB, addrB, peerstore.TempAddrTTL)
pi := peer.AddrInfo{ID: peerB}
err := dhtA.host.Connect(ctx, pi)
errs <- err
}()
go func() {
dhtB.peerstore.AddAddr(peerA, addrA, pstore.TempAddrTTL)
pi := pstore.PeerInfo{ID: peerA}
dhtB.peerstore.AddAddr(peerA, addrA, peerstore.TempAddrTTL)
pi := peer.AddrInfo{ID: peerA}
err := dhtB.host.Connect(ctx, pi)
errs <- err
}()
Expand Down Expand Up @@ -1373,7 +1374,7 @@ func TestPing(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ds := setupDHTS(t, ctx, 2)
ds[0].Host().Peerstore().AddAddrs(ds[1].PeerID(), ds[1].Host().Addrs(), pstore.AddressTTL)
ds[0].Host().Peerstore().AddAddrs(ds[1].PeerID(), ds[1].Host().Addrs(), peerstore.AddressTTL)
assert.NoError(t, ds[0].Ping(context.Background(), ds[1].PeerID()))
}

Expand All @@ -1382,7 +1383,7 @@ func TestClientModeAtInit(t *testing.T) {
defer cancel()
pinger := setupDHT(ctx, t, false)
client := setupDHT(ctx, t, true)
pinger.Host().Peerstore().AddAddrs(client.PeerID(), client.Host().Addrs(), pstore.AddressTTL)
pinger.Host().Peerstore().AddAddrs(client.PeerID(), client.Host().Addrs(), peerstore.AddressTTL)
err := pinger.Ping(context.Background(), client.PeerID())
assert.True(t, xerrors.Is(err, multistream.ErrNotSupported))
}
2 changes: 1 addition & 1 deletion dial_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"sync"
"time"

peer "github.com/libp2p/go-libp2p-peer"
"github.com/libp2p/go-libp2p-core/peer"
queue "github.com/libp2p/go-libp2p-peerstore/queue"
)

Expand Down
2 changes: 1 addition & 1 deletion dial_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"testing"
"time"

peer "github.com/libp2p/go-libp2p-peer"
"github.com/libp2p/go-libp2p-core/peer"
queue "github.com/libp2p/go-libp2p-peerstore/queue"
)

Expand Down
Loading

0 comments on commit 3176535

Please sign in to comment.