Skip to content

Commit

Permalink
Merge pull request ipfs#17 from libp2p/feat/no-more-keys
Browse files Browse the repository at this point in the history
Feat/no more keys
  • Loading branch information
whyrusleeping authored Sep 30, 2016
2 parents 347e481 + 2604f34 commit 596b945
Show file tree
Hide file tree
Showing 14 changed files with 258 additions and 286 deletions.
56 changes: 32 additions & 24 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,31 @@ package dht

import (
"bytes"
"context"
"errors"
"fmt"
"sync"
"time"

pb "github.com/libp2p/go-libp2p-kad-dht/pb"
providers "github.com/libp2p/go-libp2p-kad-dht/providers"
routing "github.com/libp2p/go-libp2p-routing"

proto "github.com/gogo/protobuf/proto"
cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
key "github.com/ipfs/go-key"
ci "github.com/ipfs/go-libp2p-crypto"
peer "github.com/ipfs/go-libp2p-peer"
pstore "github.com/ipfs/go-libp2p-peerstore"
logging "github.com/ipfs/go-log"
goprocess "github.com/jbenet/goprocess"
goprocessctx "github.com/jbenet/goprocess/context"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
providers "github.com/libp2p/go-libp2p-kad-dht/providers"
kb "github.com/libp2p/go-libp2p-kbucket"
record "github.com/libp2p/go-libp2p-record"
recpb "github.com/libp2p/go-libp2p-record/pb"
routing "github.com/libp2p/go-libp2p-routing"
host "github.com/libp2p/go-libp2p/p2p/host"
protocol "github.com/libp2p/go-libp2p/p2p/protocol"
context "golang.org/x/net/context"
base32 "github.com/whyrusleeping/base32"
)

var log = logging.Logger("dht")
Expand Down Expand Up @@ -131,9 +133,9 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {

// putValueToPeer stores the given key/value pair at the peer 'p'
func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID,
key key.Key, rec *recpb.Record) error {
key string, rec *recpb.Record) error {

pmes := pb.NewMessage(pb.Message_PUT_VALUE, string(key), 0)
pmes := pb.NewMessage(pb.Message_PUT_VALUE, key, 0)
pmes.Record = rec
rpmes, err := dht.sendRequest(ctx, p, pmes)
switch err {
Expand Down Expand Up @@ -162,8 +164,7 @@ var errInvalidRecord = errors.New("received invalid record")
// key. It returns either the value or a list of closer peers.
// NOTE: It will update the dht's peerstore with any new addresses
// it finds for the given peer.
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID,
key key.Key) (*recpb.Record, []pstore.PeerInfo, error) {
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, key string) (*recpb.Record, []pstore.PeerInfo, error) {

pmes, err := dht.getValueSingle(ctx, p, key)
if err != nil {
Expand Down Expand Up @@ -198,11 +199,15 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID,
}

// getValueSingle simply performs the get value RPC with the given parameters
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID,
key key.Key) (*pb.Message, error) {
defer log.EventBegin(ctx, "getValueSingle", p, &key).Done()
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID, key string) (*pb.Message, error) {
meta := logging.LoggableMap{
"key": key,
"peer": p,
}

defer log.EventBegin(ctx, "getValueSingle", meta).Done()

pmes := pb.NewMessage(pb.Message_GET_VALUE, string(key), 0)
pmes := pb.NewMessage(pb.Message_GET_VALUE, key, 0)
resp, err := dht.sendRequest(ctx, p, pmes)
switch err {
case nil:
Expand All @@ -216,14 +221,14 @@ func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID,
}

// getLocal attempts to retrieve the value from the datastore
func (dht *IpfsDHT) getLocal(key key.Key) (*recpb.Record, error) {
func (dht *IpfsDHT) getLocal(key string) (*recpb.Record, error) {
log.Debugf("getLocal %s", key)

log.Debug("getLocal %s", key)
v, err := dht.datastore.Get(key.DsKey())
v, err := dht.datastore.Get(mkDsKey(key))
if err != nil {
return nil, err
}
log.Debug("found in db")
log.Debugf("found %s in local datastore")

byt, ok := v.([]byte)
if !ok {
Expand Down Expand Up @@ -256,13 +261,13 @@ func (dht *IpfsDHT) getOwnPrivateKey() (ci.PrivKey, error) {
}

// putLocal stores the key value pair in the datastore
func (dht *IpfsDHT) putLocal(key key.Key, rec *recpb.Record) error {
func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error {
data, err := proto.Marshal(rec)
if err != nil {
return err
}

return dht.datastore.Put(key.DsKey(), data)
return dht.datastore.Put(mkDsKey(key), data)
}

// Update signals the routingTable to Update its last-seen status
Expand Down Expand Up @@ -298,10 +303,10 @@ func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (
}
}

func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key key.Key) (*pb.Message, error) {
defer log.EventBegin(ctx, "findProvidersSingle", p, &key).Done()
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key *cid.Cid) (*pb.Message, error) {
defer log.EventBegin(ctx, "findProvidersSingle", p, key).Done()

pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, string(key), 0)
pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, key.KeyString(), 0)
resp, err := dht.sendRequest(ctx, p, pmes)
switch err {
case nil:
Expand All @@ -316,8 +321,7 @@ func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key key.

// nearestPeersToQuery returns the routing tables closest peers.
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID {
key := key.Key(pmes.GetKey())
closer := dht.routingTable.NearestPeers(kb.ConvertKey(key), count)
closer := dht.routingTable.NearestPeers(kb.ConvertKey(pmes.GetKey()), count)
return closer
}

Expand Down Expand Up @@ -366,3 +370,7 @@ func (dht *IpfsDHT) Process() goprocess.Process {
func (dht *IpfsDHT) Close() error {
return dht.proc.Close()
}

func mkDsKey(s string) ds.Key {
return ds.NewKey(base32.RawStdEncoding.EncodeToString([]byte(s)))
}
2 changes: 1 addition & 1 deletion dht_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package dht

import (
"context"
"crypto/rand"
"fmt"
"sync"
Expand All @@ -13,7 +14,6 @@ import (
goprocess "github.com/jbenet/goprocess"
periodicproc "github.com/jbenet/goprocess/periodic"
routing "github.com/libp2p/go-libp2p-routing"
context "golang.org/x/net/context"
)

// BootstrapConfig specifies parameters used bootstrapping the DHT.
Expand Down
5 changes: 2 additions & 3 deletions dht_net.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
package dht

import (
"context"
"fmt"
"sync"
"time"

ggio "github.com/gogo/protobuf/io"
peer "github.com/ipfs/go-libp2p-peer"
ctxio "github.com/jbenet/go-context/io"
inet "github.com/libp2p/go-libp2p/p2p/net"
context "golang.org/x/net/context"

pb "github.com/libp2p/go-libp2p-kad-dht/pb"
inet "github.com/libp2p/go-libp2p/p2p/net"
)

var dhtReadMessageTimeout = time.Minute
Expand Down
Loading

0 comments on commit 596b945

Please sign in to comment.