Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update for the routing refactor #148

Merged
merged 4 commits into from
Jun 2, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gx/lastpubver
Original file line number Diff line number Diff line change
@@ -1 +1 @@
4.0.4: QmSBxn1eLMdViZRDGW9rRHRYwtqq5bqUgipqTMPuTim616
4.1.0: Qmd3jqhBQFvhfBNTSJMQL15GgyVMpdxKTta69Napvx6Myd
66 changes: 39 additions & 27 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (
"sync"
"time"

opts "github.com/libp2p/go-libp2p-kad-dht/opts"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
providers "github.com/libp2p/go-libp2p-kad-dht/providers"
routing "github.com/libp2p/go-libp2p-routing"

proto "github.com/gogo/protobuf/proto"
cid "github.com/ipfs/go-cid"
Expand All @@ -28,6 +28,7 @@ import (
protocol "github.com/libp2p/go-libp2p-protocol"
record "github.com/libp2p/go-libp2p-record"
recpb "github.com/libp2p/go-libp2p-record/pb"
routing "github.com/libp2p/go-libp2p-routing"
base32 "github.com/whyrusleeping/base32"
)

Expand All @@ -54,8 +55,7 @@ type IpfsDHT struct {

birth time.Time // When this peer started up

Validator record.Validator // record validator funcs
Selector record.Selector // record selection funcs
Validator record.Validator

ctx context.Context
proc goprocess.Process
Expand All @@ -66,23 +66,13 @@ type IpfsDHT struct {
plk sync.Mutex
}

// NewDHT creates a new DHT object with the given peer as the 'local' host.
// IpfsDHT's initialized with this function will respond to DHT requests,
// whereas IpfsDHT's initialized with NewDHTClient will not.
func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
dht := NewDHTClient(ctx, h, dstore)

h.SetStreamHandler(ProtocolDHT, dht.handleNewStream)
h.SetStreamHandler(ProtocolDHTOld, dht.handleNewStream)

return dht
}

// NewDHTClient creates a new DHT object with the given peer as the 'local'
// host. IpfsDHT clients initialized with this function will not respond to DHT
// requests. If you need a peer to respond to DHT requests, use NewDHT instead.
func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
dht := makeDHT(ctx, h, dstore)
// New creates a new DHT with the specified host and options.
func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, error) {
var cfg opts.Options
if err := cfg.Apply(append([]opts.Option{opts.Defaults}, options...)...); err != nil {
return nil, err
}
dht := makeDHT(ctx, h, cfg.Datastore)

// register for network notifs.
dht.host.Network().Notify((*netNotifiee)(dht))
Expand All @@ -94,10 +84,35 @@ func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT
})

dht.proc.AddChild(dht.providers.Process())
dht.Validator = cfg.Validator

dht.Validator["pk"] = record.PublicKeyValidator
dht.Selector["pk"] = record.PublicKeySelector
if !cfg.Client {
h.SetStreamHandler(ProtocolDHT, dht.handleNewStream)
h.SetStreamHandler(ProtocolDHTOld, dht.handleNewStream)
}
return dht, nil
}

// NewDHT creates a new DHT object with the given peer as the 'local' host.
// IpfsDHT's initialized with this function will respond to DHT requests,
// whereas IpfsDHT's initialized with NewDHTClient will not.
func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
dht, err := New(ctx, h, opts.Datastore(dstore))
if err != nil {
panic(err)
}
return dht
}

// NewDHTClient creates a new DHT object with the given peer as the 'local'
// host. IpfsDHT clients initialized with this function will not respond to DHT
// requests. If you need a peer to respond to DHT requests, use NewDHT instead.
// NewDHTClient creates a new DHT object with the given peer as the 'local' host
func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
dht, err := New(ctx, h, opts.Datastore(dstore), opts.Client(true))
if err != nil {
panic(err)
}
return dht
}

Expand All @@ -122,9 +137,6 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
providers: providers.NewProviderManager(ctx, h.ID(), dstore),
birth: time.Now(),
routingTable: rt,

Validator: make(record.Validator),
Selector: make(record.Selector),
}
}

Expand Down Expand Up @@ -176,7 +188,7 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, key string)
log.Debug("getValueOrPeers: got value")

// make sure record is valid.
err = dht.Validator.VerifyRecord(record)
err = dht.Validator.Validate(record.GetKey(), record.GetValue())
if err != nil {
log.Info("Received invalid record! (discarded)")
// return a sentinal to signify an invalid record was received
Expand Down Expand Up @@ -239,7 +251,7 @@ func (dht *IpfsDHT) getLocal(key string) (*recpb.Record, error) {
return nil, err
}

err = dht.Validator.VerifyRecord(rec)
err = dht.Validator.Validate(rec.GetKey(), rec.GetValue())
if err != nil {
log.Debugf("local record verify failed: %s (discarded)", err)
return nil, err
Expand Down
88 changes: 41 additions & 47 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@ import (
"testing"
"time"

opts "github.com/libp2p/go-libp2p-kad-dht/opts"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"

cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
u "github.com/ipfs/go-ipfs-util"
kb "github.com/libp2p/go-libp2p-kbucket"
netutil "github.com/libp2p/go-libp2p-netutil"
Expand All @@ -41,19 +40,47 @@ func init() {
}
}

func setupDHT(ctx context.Context, t *testing.T, client bool) *IpfsDHT {
h := bhost.New(netutil.GenSwarmNetwork(t, ctx))
type blankValidator struct{}

func (blankValidator) Validate(_ string, _ []byte) error { return nil }
func (blankValidator) Select(_ string, _ [][]byte) (int, error) { return 0, nil }

dss := dssync.MutexWrap(ds.NewMapDatastore())
var d *IpfsDHT
if client {
d = NewDHTClient(ctx, h, dss)
} else {
d = NewDHT(ctx, h, dss)
type testValidator struct{}

func (testValidator) Select(_ string, bs [][]byte) (int, error) {
index := -1
for i, b := range bs {
if bytes.Compare(b, []byte("newer")) == 0 {
index = i
} else if bytes.Compare(b, []byte("valid")) == 0 {
if index == -1 {
index = i
}
}
}
if index == -1 {
return -1, errors.New("no rec found")
}
return index, nil
}
func (testValidator) Validate(_ string, b []byte) error {
if bytes.Compare(b, []byte("expired")) == 0 {
return errors.New("expired")
}
return nil
}

d.Validator["v"] = func(*record.ValidationRecord) error { return nil }
d.Selector["v"] = func(_ string, bs [][]byte) (int, error) { return 0, nil }
func setupDHT(ctx context.Context, t *testing.T, client bool) *IpfsDHT {
d, err := New(
ctx,
bhost.New(netutil.GenSwarmNetwork(t, ctx)),
opts.Client(client),
opts.NamespacedValidator("v", blankValidator{}),
)

if err != nil {
t.Fatal(err)
}
return d
}

Expand Down Expand Up @@ -148,14 +175,6 @@ func TestValueGetSet(t *testing.T) {
defer dhtA.host.Close()
defer dhtB.host.Close()

vf := func(*record.ValidationRecord) error { return nil }
nulsel := func(_ string, bs [][]byte) (int, error) { return 0, nil }

dhtA.Validator["v"] = vf
dhtB.Validator["v"] = vf
dhtA.Selector["v"] = nulsel
dhtB.Selector["v"] = nulsel

connect(t, ctx, dhtA, dhtB)

log.Debug("adding value on: ", dhtA.self)
Expand Down Expand Up @@ -203,33 +222,8 @@ func TestValueSetInvalid(t *testing.T) {
defer dhtA.host.Close()
defer dhtB.host.Close()

vf := func(r *record.ValidationRecord) error {
if bytes.Compare(r.Value, []byte("expired")) == 0 {
return errors.New("expired")
}
return nil
}
nulsel := func(k string, bs [][]byte) (int, error) {
index := -1
for i, b := range bs {
if bytes.Compare(b, []byte("newer")) == 0 {
index = i
} else if bytes.Compare(b, []byte("valid")) == 0 {
if index == -1 {
index = i
}
}
}
if index == -1 {
return -1, errors.New("no rec found")
}
return index, nil
}

dhtA.Validator["v"] = vf
dhtB.Validator["v"] = vf
dhtA.Selector["v"] = nulsel
dhtB.Selector["v"] = nulsel
dhtA.Validator.(record.NamespacedValidator)["v"] = testValidator{}
dhtB.Validator.(record.NamespacedValidator)["v"] = testValidator{}

connect(t, ctx, dhtA, dhtB)

Expand Down
26 changes: 16 additions & 10 deletions ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"time"

ggio "github.com/gogo/protobuf/io"
ds "github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
u "github.com/ipfs/go-ipfs-util"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
inet "github.com/libp2p/go-libp2p-net"
Expand All @@ -31,8 +29,10 @@ func TestGetFailures(t *testing.T) {
}
hosts := mn.Hosts()

tsds := dssync.MutexWrap(ds.NewMapDatastore())
d := NewDHT(ctx, hosts[0], tsds)
d, err := New(ctx, hosts[0])
if err != nil {
t.Fatal(err)
}
d.Update(ctx, hosts[1].ID())

// Reply with failures to every message
Expand Down Expand Up @@ -148,8 +148,10 @@ func TestNotFound(t *testing.T) {
t.Fatal(err)
}
hosts := mn.Hosts()
tsds := dssync.MutexWrap(ds.NewMapDatastore())
d := NewDHT(ctx, hosts[0], tsds)
d, err := New(ctx, hosts[0])
if err != nil {
t.Fatal(err)
}

for _, p := range hosts {
d.Update(ctx, p.ID())
Expand Down Expand Up @@ -225,8 +227,10 @@ func TestLessThanKResponses(t *testing.T) {
}
hosts := mn.Hosts()

tsds := dssync.MutexWrap(ds.NewMapDatastore())
d := NewDHT(ctx, hosts[0], tsds)
d, err := New(ctx, hosts[0])
if err != nil {
t.Fatal(err)
}

for i := 1; i < 5; i++ {
d.Update(ctx, hosts[i].ID())
Expand Down Expand Up @@ -292,8 +296,10 @@ func TestMultipleQueries(t *testing.T) {
t.Fatal(err)
}
hosts := mn.Hosts()
tsds := dssync.MutexWrap(ds.NewMapDatastore())
d := NewDHT(ctx, hosts[0], tsds)
d, err := New(ctx, hosts[0])
if err != nil {
t.Fatal(err)
}

d.Update(ctx, hosts[1].ID())

Expand Down
15 changes: 10 additions & 5 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,21 +163,26 @@ func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Mess
eip.Done()
}()

dskey := convertToDsKey(pmes.GetKey())

rec := pmes.GetRecord()
if rec == nil {
log.Infof("Got nil record from: %s", p.Pretty())
return nil, errors.New("nil record")
}

if pmes.GetKey() != rec.GetKey() {
return nil, errors.New("put key doesn't match record key")
}

cleanRecord(rec)

// Make sure the record is valid (not expired, valid signature etc)
if err = dht.Validator.VerifyRecord(rec); err != nil {
if err = dht.Validator.Validate(rec.GetKey(), rec.GetValue()); err != nil {
log.Warningf("Bad dht record in PUT from: %s. %s", p.Pretty(), err)
return nil, err
}

dskey := convertToDsKey(rec.GetKey())

// Make sure the new record is "better" than the record we have locally.
// This prevents a record with for example a lower sequence number from
// overwriting a record with a higher sequence number.
Expand All @@ -188,7 +193,7 @@ func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Mess

if existing != nil {
recs := [][]byte{rec.GetValue(), existing.GetValue()}
i, err := dht.Selector.BestRecord(pmes.GetKey(), recs)
i, err := dht.Validator.Select(rec.GetKey(), recs)
if err != nil {
log.Warningf("Bad dht record in PUT from %s: %s", p.Pretty(), err)
return nil, err
Expand Down Expand Up @@ -237,7 +242,7 @@ func (dht *IpfsDHT) getRecordFromDatastore(dskey ds.Key) (*recpb.Record, error)
return nil, nil
}

err = dht.Validator.VerifyRecord(rec)
err = dht.Validator.Validate(rec.GetKey(), rec.GetValue())
if err != nil {
// Invalid record in datastore, probably expired but don't return an error,
// we'll just overwrite it
Expand Down
Loading