Skip to content

Commit

Permalink
Merge pull request #205 from jbenet/fix/move_proto_dht
Browse files Browse the repository at this point in the history
redux: refactor(dht/pb) move proto to pb package
  • Loading branch information
jbenet committed Oct 25, 2014
2 parents 2ce4187 + 0dba976 commit 40ab188
Show file tree
Hide file tree
Showing 10 changed files with 87 additions and 85 deletions.
3 changes: 1 addition & 2 deletions exchange/bitswap/strategy/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ import (
"sync"

bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
"github.com/jbenet/go-ipfs/peer"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
)

// TODO declare thread-safe datastore
// TODO niceness should be on a per-peer basis. Use-case: Certain peers are
// "trusted" and/or controlled by a single human user. The user may want for
// these peers to exchange data freely
Expand Down
11 changes: 0 additions & 11 deletions routing/dht/Makefile

This file was deleted.

41 changes: 21 additions & 20 deletions routing/dht/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
inet "github.com/jbenet/go-ipfs/net"
msg "github.com/jbenet/go-ipfs/net/message"
peer "github.com/jbenet/go-ipfs/peer"
pb "github.com/jbenet/go-ipfs/routing/dht/pb"
kb "github.com/jbenet/go-ipfs/routing/kbucket"
u "github.com/jbenet/go-ipfs/util"

Expand Down Expand Up @@ -128,7 +129,7 @@ func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) msg.N
}

// deserialize msg
pmes := new(Message)
pmes := new(pb.Message)
err := proto.Unmarshal(mData, pmes)
if err != nil {
log.Error("Error unmarshaling data")
Expand All @@ -140,7 +141,7 @@ func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) msg.N

// Print out diagnostic
log.Debugf("%s got message type: '%s' from %s",
dht.self, Message_MessageType_name[int32(pmes.GetType())], mPeer)
dht.self, pb.Message_MessageType_name[int32(pmes.GetType())], mPeer)

// get handler for this msg type.
handler := dht.handlerForMsgType(pmes.GetType())
Expand Down Expand Up @@ -174,7 +175,7 @@ func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) msg.N

// sendRequest sends out a request using dht.sender, but also makes sure to
// measure the RTT for latency measurements.
func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *Message) (*Message, error) {
func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *pb.Message) (*pb.Message, error) {

mes, err := msg.FromObject(p, pmes)
if err != nil {
Expand All @@ -185,7 +186,7 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *Message)

// Print out diagnostic
log.Debugf("Sent message type: '%s' to %s",
Message_MessageType_name[int32(pmes.GetType())], p)
pb.Message_MessageType_name[int32(pmes.GetType())], p)

rmes, err := dht.sender.SendRequest(ctx, mes)
if err != nil {
Expand All @@ -198,7 +199,7 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *Message)
rtt := time.Since(start)
rmes.Peer().SetLatency(rtt)

rpmes := new(Message)
rpmes := new(pb.Message)
if err := proto.Unmarshal(rmes.Data(), rpmes); err != nil {
return nil, err
}
Expand All @@ -210,7 +211,7 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *Message)
func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p peer.Peer,
key string, value []byte) error {

pmes := newMessage(Message_PUT_VALUE, string(key), 0)
pmes := pb.NewMessage(pb.Message_PUT_VALUE, string(key), 0)
pmes.Value = value
rpmes, err := dht.sendRequest(ctx, p, pmes)
if err != nil {
Expand All @@ -225,10 +226,10 @@ func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p peer.Peer,

func (dht *IpfsDHT) putProvider(ctx context.Context, p peer.Peer, key string) error {

pmes := newMessage(Message_ADD_PROVIDER, string(key), 0)
pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, string(key), 0)

// add self as the provider
pmes.ProviderPeers = peersToPBPeers([]peer.Peer{dht.self})
pmes.ProviderPeers = pb.PeersToPBPeers([]peer.Peer{dht.self})

rpmes, err := dht.sendRequest(ctx, p, pmes)
if err != nil {
Expand Down Expand Up @@ -290,9 +291,9 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.Peer,

// getValueSingle simply performs the get value RPC with the given parameters
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.Peer,
key u.Key, level int) (*Message, error) {
key u.Key, level int) (*pb.Message, error) {

pmes := newMessage(Message_GET_VALUE, string(key), level)
pmes := pb.NewMessage(pb.Message_GET_VALUE, string(key), level)
return dht.sendRequest(ctx, p, pmes)
}

Expand All @@ -301,7 +302,7 @@ func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.Peer,
// one to get the value from? Or just connect to one at a time until we get a
// successful connection and request the value from it?
func (dht *IpfsDHT) getFromPeerList(ctx context.Context, key u.Key,
peerlist []*Message_Peer, level int) ([]byte, error) {
peerlist []*pb.Message_Peer, level int) ([]byte, error) {

for _, pinfo := range peerlist {
p, err := dht.ensureConnectedToPeer(pinfo)
Expand Down Expand Up @@ -379,17 +380,17 @@ func (dht *IpfsDHT) FindLocal(id peer.ID) (peer.Peer, *kb.RoutingTable) {
return nil, nil
}

func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.Peer, id peer.ID, level int) (*Message, error) {
pmes := newMessage(Message_FIND_NODE, string(id), level)
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.Peer, id peer.ID, level int) (*pb.Message, error) {
pmes := pb.NewMessage(pb.Message_FIND_NODE, string(id), level)
return dht.sendRequest(ctx, p, pmes)
}

func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.Peer, key u.Key, level int) (*Message, error) {
pmes := newMessage(Message_GET_PROVIDERS, string(key), level)
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.Peer, key u.Key, level int) (*pb.Message, error) {
pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, string(key), level)
return dht.sendRequest(ctx, p, pmes)
}

func (dht *IpfsDHT) addProviders(key u.Key, peers []*Message_Peer) []peer.Peer {
func (dht *IpfsDHT) addProviders(key u.Key, peers []*pb.Message_Peer) []peer.Peer {
var provArr []peer.Peer
for _, prov := range peers {
p, err := dht.peerFromInfo(prov)
Expand All @@ -413,7 +414,7 @@ func (dht *IpfsDHT) addProviders(key u.Key, peers []*Message_Peer) []peer.Peer {
}

// nearestPeersToQuery returns the routing tables closest peers.
func (dht *IpfsDHT) nearestPeersToQuery(pmes *Message, count int) []peer.Peer {
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.Peer {
level := pmes.GetClusterLevel()
cluster := dht.routingTables[level]

Expand All @@ -423,7 +424,7 @@ func (dht *IpfsDHT) nearestPeersToQuery(pmes *Message, count int) []peer.Peer {
}

// betterPeerToQuery returns nearestPeersToQuery, but iff closer than self.
func (dht *IpfsDHT) betterPeersToQuery(pmes *Message, count int) []peer.Peer {
func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, count int) []peer.Peer {
closer := dht.nearestPeersToQuery(pmes, count)

// no node? nil
Expand Down Expand Up @@ -462,7 +463,7 @@ func (dht *IpfsDHT) getPeer(id peer.ID) (peer.Peer, error) {
return p, nil
}

func (dht *IpfsDHT) peerFromInfo(pbp *Message_Peer) (peer.Peer, error) {
func (dht *IpfsDHT) peerFromInfo(pbp *pb.Message_Peer) (peer.Peer, error) {

id := peer.ID(pbp.GetId())

Expand All @@ -485,7 +486,7 @@ func (dht *IpfsDHT) peerFromInfo(pbp *Message_Peer) (peer.Peer, error) {
return p, nil
}

func (dht *IpfsDHT) ensureConnectedToPeer(pbp *Message_Peer) (peer.Peer, error) {
func (dht *IpfsDHT) ensureConnectedToPeer(pbp *pb.Message_Peer) (peer.Peer, error) {
p, err := dht.peerFromInfo(pbp)
if err != nil {
return nil, err
Expand Down
27 changes: 14 additions & 13 deletions routing/dht/ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
msg "github.com/jbenet/go-ipfs/net/message"
mux "github.com/jbenet/go-ipfs/net/mux"
peer "github.com/jbenet/go-ipfs/peer"
pb "github.com/jbenet/go-ipfs/routing/dht/pb"
u "github.com/jbenet/go-ipfs/util"

"time"
Expand Down Expand Up @@ -127,13 +128,13 @@ func TestGetFailures(t *testing.T) {
// u.POut("NotFound Test\n")
// Reply with failures to every message
fs.AddHandler(func(mes msg.NetMessage) msg.NetMessage {
pmes := new(Message)
pmes := new(pb.Message)
err := proto.Unmarshal(mes.Data(), pmes)
if err != nil {
t.Fatal(err)
}

resp := &Message{
resp := &pb.Message{
Type: pmes.Type,
}
m, err := msg.FromObject(mes.Peer(), resp)
Expand All @@ -153,9 +154,9 @@ func TestGetFailures(t *testing.T) {

fs.handlers = nil
// Now we test this DHT's handleGetValue failure
typ := Message_GET_VALUE
typ := pb.Message_GET_VALUE
str := "hello"
req := Message{
req := pb.Message{
Type: &typ,
Key: &str,
Value: []byte{0},
Expand All @@ -169,7 +170,7 @@ func TestGetFailures(t *testing.T) {

mes = d.HandleMessage(ctx, mes)

pmes := new(Message)
pmes := new(pb.Message)
err = proto.Unmarshal(mes.Data(), pmes)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -215,21 +216,21 @@ func TestNotFound(t *testing.T) {

// Reply with random peers to every message
fs.AddHandler(func(mes msg.NetMessage) msg.NetMessage {
pmes := new(Message)
pmes := new(pb.Message)
err := proto.Unmarshal(mes.Data(), pmes)
if err != nil {
t.Fatal(err)
}

switch pmes.GetType() {
case Message_GET_VALUE:
resp := &Message{Type: pmes.Type}
case pb.Message_GET_VALUE:
resp := &pb.Message{Type: pmes.Type}

peers := []peer.Peer{}
for i := 0; i < 7; i++ {
peers = append(peers, _randPeer())
}
resp.CloserPeers = peersToPBPeers(peers)
resp.CloserPeers = pb.PeersToPBPeers(peers)
mes, err := msg.FromObject(mes.Peer(), resp)
if err != nil {
t.Error(err)
Expand Down Expand Up @@ -282,17 +283,17 @@ func TestLessThanKResponses(t *testing.T) {

// Reply with random peers to every message
fs.AddHandler(func(mes msg.NetMessage) msg.NetMessage {
pmes := new(Message)
pmes := new(pb.Message)
err := proto.Unmarshal(mes.Data(), pmes)
if err != nil {
t.Fatal(err)
}

switch pmes.GetType() {
case Message_GET_VALUE:
resp := &Message{
case pb.Message_GET_VALUE:
resp := &pb.Message{
Type: pmes.Type,
CloserPeers: peersToPBPeers([]peer.Peer{other}),
CloserPeers: pb.PeersToPBPeers([]peer.Peer{other}),
}

mes, err := msg.FromObject(mes.Peer(), resp)
Expand Down
Loading

0 comments on commit 40ab188

Please sign in to comment.