From cad57471f57ef82eb5227da02c8ad5bb63c2eea8 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Thu, 3 May 2018 14:19:53 -0700 Subject: [PATCH 1/4] update for the routing refactor GetValues was very DHT specific so the routing interface has been updated to remove that function. Instead, it has introduced general-purpose options. This is a minimal alternative to #141 to avoid bundling too many changes together. --- options.go | 29 +++++++++++++++++++++++++++++ package.json | 4 ++-- records.go | 9 ++------- routing.go | 34 ++++++++++++++++++++++++++-------- 4 files changed, 59 insertions(+), 17 deletions(-) create mode 100644 options.go diff --git a/options.go b/options.go new file mode 100644 index 000000000..e2b7af1f9 --- /dev/null +++ b/options.go @@ -0,0 +1,29 @@ +package dht + +import ( + ropts "github.com/libp2p/go-libp2p-routing/options" +) + +type quorumOptionKey struct{} + +// Quorum is a DHT option that tells the DHT how many peers it needs to get +// values from before returning the best one. +// +// Default: 16 +func Quorum(n int) ropts.Option { + return func(opts *ropts.Options) error { + if opts.Other == nil { + opts.Other = make(map[interface{}]interface{}, 1) + } + opts.Other[quorumOptionKey{}] = n + return nil + } +} + +func getQuorum(opts *ropts.Options) int { + responsesNeeded, ok := opts.Other[quorumOptionKey{}].(int) + if !ok { + responsesNeeded = 16 + } + return responsesNeeded +} diff --git a/package.json b/package.json index 74a133af0..ca81103b7 100644 --- a/package.json +++ b/package.json @@ -84,9 +84,9 @@ }, { "author": "whyrusleeping", - "hash": "QmZix3EdeAdc4wnRksRXWEQ6kbqiFAP16h3Sq9JnEiP71N", + "hash": "QmUHRKTeaoASDvDj7cTAXsmjAY7KQ13ErtzkQHZQq6uFUz", "name": "go-libp2p-routing", - "version": "2.2.22" + "version": "2.3.0" }, { "author": "whyrusleeping", diff --git a/records.go b/records.go index 20ab78c7d..29dae735d 100644 --- a/records.go +++ b/records.go @@ -78,17 +78,12 @@ func (dht *IpfsDHT) getPublicKeyFromDHT(ctx context.Context, p peer.ID) (ci.PubK // Only retrieve one value, because the public key is immutable // so there's no need to retrieve multiple versions pkkey := routing.KeyForPublicKey(p) - vals, err := dht.GetValues(ctx, pkkey, 1) + val, err := dht.GetValue(ctx, pkkey, Quorum(1)) if err != nil { return nil, err } - if len(vals) == 0 || vals[0].Val == nil { - log.Debugf("Could not find public key for %v in DHT", p) - return nil, routing.ErrNotFound - } - - pubk, err := ci.UnmarshalPublicKey(vals[0].Val) + pubk, err := ci.UnmarshalPublicKey(val) if err != nil { log.Errorf("Could not unmarshall public key retrieved from DHT for %v", p) return nil, err diff --git a/routing.go b/routing.go index 9db9d2646..f44c7c89e 100644 --- a/routing.go +++ b/routing.go @@ -21,6 +21,7 @@ import ( record "github.com/libp2p/go-libp2p-record" routing "github.com/libp2p/go-libp2p-routing" notif "github.com/libp2p/go-libp2p-routing/notifications" + ropts "github.com/libp2p/go-libp2p-routing/options" ) // asyncQueryBuffer is the size of buffered channels in async queries. This @@ -35,7 +36,7 @@ var asyncQueryBuffer = 10 // PutValue adds value corresponding to given Key. // This is the top level "Store" operation of the DHT -func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte) (err error) { +func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts ...ropts.Option) (err error) { eip := log.EventBegin(ctx, "PutValue") defer func() { eip.Append(loggableKey(key)) @@ -80,8 +81,14 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte) (err return nil } +// RecvdVal stores a value and the peer from which we got the value. +type RecvdVal struct { + Val []byte + From peer.ID +} + // GetValue searches for the value corresponding to given Key. -func (dht *IpfsDHT) GetValue(ctx context.Context, key string) (_ []byte, err error) { +func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...ropts.Option) (_ []byte, err error) { eip := log.EventBegin(ctx, "GetValue") defer func() { eip.Append(loggableKey(key)) @@ -93,7 +100,17 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key string) (_ []byte, err err ctx, cancel := context.WithTimeout(ctx, time.Minute) defer cancel() - vals, err := dht.GetValues(ctx, key, 16) + var cfg ropts.Options + if err := cfg.Apply(opts...); err != nil { + return nil, err + } + + responsesNeeded := 0 + if !cfg.Offline { + responsesNeeded = getQuorum(&cfg) + } + + vals, err := dht.GetValues(ctx, key, responsesNeeded) if err != nil { return nil, err } @@ -124,7 +141,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key string) (_ []byte, err err for _, v := range vals { // if someone sent us a different 'less-valid' record, lets correct them if !bytes.Equal(v.Val, best) { - go func(v routing.RecvdVal) { + go func(v RecvdVal) { if v.From == dht.self { err := dht.putLocal(key, fixupRec) if err != nil { @@ -145,7 +162,8 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key string) (_ []byte, err err return best, nil } -func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []routing.RecvdVal, err error) { +// GetValues gets nvals values corresponding to the given key. +func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []RecvdVal, err error) { eip := log.EventBegin(ctx, "GetValues") defer func() { eip.Append(loggableKey(key)) @@ -154,7 +172,7 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []r } eip.Done() }() - vals := make([]routing.RecvdVal, 0, nvals) + vals := make([]RecvdVal, 0, nvals) var valslock sync.Mutex // If we have it local, don't bother doing an RPC! @@ -163,7 +181,7 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []r // TODO: this is tricky, we don't always want to trust our own value // what if the authoritative source updated it? log.Debug("have it locally") - vals = append(vals, routing.RecvdVal{ + vals = append(vals, RecvdVal{ Val: lrec.GetValue(), From: dht.self, }) @@ -212,7 +230,7 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []r res := &dhtQueryResult{closerPeers: peers} if rec.GetValue() != nil || err == errInvalidRecord { - rv := routing.RecvdVal{ + rv := RecvdVal{ Val: rec.GetValue(), From: p, } From 3befc403d71033fae607d1fb28933c1d72c382ae Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Thu, 3 May 2018 16:36:48 -0700 Subject: [PATCH 2/4] require that the validator be explicitly passed in Note: this does mean that the DHT won't work with peer keys by default and that the constructor signature changes. Given all the changes that'll come with the libp2p refactor, I don't feel too bad about this. --- dht.go | 21 ++++++-------- dht_test.go | 78 +++++++++++++++++++++++++--------------------------- ext_test.go | 8 +++--- handlers.go | 15 ++++++---- package.json | 5 ++-- routing.go | 2 +- 6 files changed, 64 insertions(+), 65 deletions(-) diff --git a/dht.go b/dht.go index dacfed534..e7e59ccd8 100644 --- a/dht.go +++ b/dht.go @@ -54,8 +54,7 @@ type IpfsDHT struct { birth time.Time // When this peer started up - Validator record.Validator // record validator funcs - Selector record.Selector // record selection funcs + Validator record.Validator ctx context.Context proc goprocess.Process @@ -69,8 +68,8 @@ type IpfsDHT struct { // NewDHT creates a new DHT object with the given peer as the 'local' host. // IpfsDHT's initialized with this function will respond to DHT requests, // whereas IpfsDHT's initialized with NewDHTClient will not. -func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT { - dht := NewDHTClient(ctx, h, dstore) +func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching, validator record.Validator) *IpfsDHT { + dht := NewDHTClient(ctx, h, dstore, validator) h.SetStreamHandler(ProtocolDHT, dht.handleNewStream) h.SetStreamHandler(ProtocolDHTOld, dht.handleNewStream) @@ -81,7 +80,8 @@ func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT { // NewDHTClient creates a new DHT object with the given peer as the 'local' // host. IpfsDHT clients initialized with this function will not respond to DHT // requests. If you need a peer to respond to DHT requests, use NewDHT instead. -func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT { +// NewDHTClient creates a new DHT object with the given peer as the 'local' host +func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching, validator record.Validator) *IpfsDHT { dht := makeDHT(ctx, h, dstore) // register for network notifs. @@ -94,9 +94,7 @@ func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT }) dht.proc.AddChild(dht.providers.Process()) - - dht.Validator["pk"] = record.PublicKeyValidator - dht.Selector["pk"] = record.PublicKeySelector + dht.Validator = validator return dht } @@ -122,9 +120,6 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT { providers: providers.NewProviderManager(ctx, h.ID(), dstore), birth: time.Now(), routingTable: rt, - - Validator: make(record.Validator), - Selector: make(record.Selector), } } @@ -176,7 +171,7 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, key string) log.Debug("getValueOrPeers: got value") // make sure record is valid. - err = dht.Validator.VerifyRecord(record) + err = dht.Validator.Validate(record.GetKey(), record.GetValue()) if err != nil { log.Info("Received invalid record! (discarded)") // return a sentinal to signify an invalid record was received @@ -239,7 +234,7 @@ func (dht *IpfsDHT) getLocal(key string) (*recpb.Record, error) { return nil, err } - err = dht.Validator.VerifyRecord(rec) + err = dht.Validator.Validate(rec.GetKey(), rec.GetValue()) if err != nil { log.Debugf("local record verify failed: %s (discarded)", err) return nil, err diff --git a/dht_test.go b/dht_test.go index 7a3b939ac..782bfbeb5 100644 --- a/dht_test.go +++ b/dht_test.go @@ -41,19 +41,50 @@ func init() { } } +type blankValidator struct{} + +func (blankValidator) Validate(_ string, _ []byte) error { return nil } +func (blankValidator) Select(_ string, _ [][]byte) (int, error) { return 0, nil } + +type testValidator struct{} + +func (testValidator) Select(_ string, bs [][]byte) (int, error) { + index := -1 + for i, b := range bs { + if bytes.Compare(b, []byte("newer")) == 0 { + index = i + } else if bytes.Compare(b, []byte("valid")) == 0 { + if index == -1 { + index = i + } + } + } + if index == -1 { + return -1, errors.New("no rec found") + } + return index, nil +} +func (testValidator) Validate(_ string, b []byte) error { + if bytes.Compare(b, []byte("expired")) == 0 { + return errors.New("expired") + } + return nil +} + func setupDHT(ctx context.Context, t *testing.T, client bool) *IpfsDHT { h := bhost.New(netutil.GenSwarmNetwork(t, ctx)) dss := dssync.MutexWrap(ds.NewMapDatastore()) var d *IpfsDHT + validator := record.NamespacedValidator{ + "v": blankValidator{}, + "pk": record.PublicKeyValidator{}, + } if client { - d = NewDHTClient(ctx, h, dss) + d = NewDHTClient(ctx, h, dss, validator) } else { - d = NewDHT(ctx, h, dss) + d = NewDHT(ctx, h, dss, validator) } - - d.Validator["v"] = func(*record.ValidationRecord) error { return nil } - d.Selector["v"] = func(_ string, bs [][]byte) (int, error) { return 0, nil } return d } @@ -148,14 +179,6 @@ func TestValueGetSet(t *testing.T) { defer dhtA.host.Close() defer dhtB.host.Close() - vf := func(*record.ValidationRecord) error { return nil } - nulsel := func(_ string, bs [][]byte) (int, error) { return 0, nil } - - dhtA.Validator["v"] = vf - dhtB.Validator["v"] = vf - dhtA.Selector["v"] = nulsel - dhtB.Selector["v"] = nulsel - connect(t, ctx, dhtA, dhtB) log.Debug("adding value on: ", dhtA.self) @@ -203,33 +226,8 @@ func TestValueSetInvalid(t *testing.T) { defer dhtA.host.Close() defer dhtB.host.Close() - vf := func(r *record.ValidationRecord) error { - if bytes.Compare(r.Value, []byte("expired")) == 0 { - return errors.New("expired") - } - return nil - } - nulsel := func(k string, bs [][]byte) (int, error) { - index := -1 - for i, b := range bs { - if bytes.Compare(b, []byte("newer")) == 0 { - index = i - } else if bytes.Compare(b, []byte("valid")) == 0 { - if index == -1 { - index = i - } - } - } - if index == -1 { - return -1, errors.New("no rec found") - } - return index, nil - } - - dhtA.Validator["v"] = vf - dhtB.Validator["v"] = vf - dhtA.Selector["v"] = nulsel - dhtB.Selector["v"] = nulsel + dhtA.Validator.(record.NamespacedValidator)["v"] = testValidator{} + dhtB.Validator.(record.NamespacedValidator)["v"] = testValidator{} connect(t, ctx, dhtA, dhtB) diff --git a/ext_test.go b/ext_test.go index 2a5340b32..aae1a8b24 100644 --- a/ext_test.go +++ b/ext_test.go @@ -32,7 +32,7 @@ func TestGetFailures(t *testing.T) { hosts := mn.Hosts() tsds := dssync.MutexWrap(ds.NewMapDatastore()) - d := NewDHT(ctx, hosts[0], tsds) + d := NewDHT(ctx, hosts[0], tsds, record.NamespacedValidator{}) d.Update(ctx, hosts[1].ID()) // Reply with failures to every message @@ -149,7 +149,7 @@ func TestNotFound(t *testing.T) { } hosts := mn.Hosts() tsds := dssync.MutexWrap(ds.NewMapDatastore()) - d := NewDHT(ctx, hosts[0], tsds) + d := NewDHT(ctx, hosts[0], tsds, record.NamespacedValidator{}) for _, p := range hosts { d.Update(ctx, p.ID()) @@ -226,7 +226,7 @@ func TestLessThanKResponses(t *testing.T) { hosts := mn.Hosts() tsds := dssync.MutexWrap(ds.NewMapDatastore()) - d := NewDHT(ctx, hosts[0], tsds) + d := NewDHT(ctx, hosts[0], tsds, record.NamespacedValidator{}) for i := 1; i < 5; i++ { d.Update(ctx, hosts[i].ID()) @@ -293,7 +293,7 @@ func TestMultipleQueries(t *testing.T) { } hosts := mn.Hosts() tsds := dssync.MutexWrap(ds.NewMapDatastore()) - d := NewDHT(ctx, hosts[0], tsds) + d := NewDHT(ctx, hosts[0], tsds, record.NamespacedValidator{}) d.Update(ctx, hosts[1].ID()) diff --git a/handlers.go b/handlers.go index f95f0a2e9..1e48f6e7e 100644 --- a/handlers.go +++ b/handlers.go @@ -163,21 +163,26 @@ func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Mess eip.Done() }() - dskey := convertToDsKey(pmes.GetKey()) - rec := pmes.GetRecord() if rec == nil { log.Infof("Got nil record from: %s", p.Pretty()) return nil, errors.New("nil record") } + + if pmes.GetKey() != rec.GetKey() { + return nil, errors.New("put key doesn't match record key") + } + cleanRecord(rec) // Make sure the record is valid (not expired, valid signature etc) - if err = dht.Validator.VerifyRecord(rec); err != nil { + if err = dht.Validator.Validate(rec.GetKey(), rec.GetValue()); err != nil { log.Warningf("Bad dht record in PUT from: %s. %s", p.Pretty(), err) return nil, err } + dskey := convertToDsKey(rec.GetKey()) + // Make sure the new record is "better" than the record we have locally. // This prevents a record with for example a lower sequence number from // overwriting a record with a higher sequence number. @@ -188,7 +193,7 @@ func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Mess if existing != nil { recs := [][]byte{rec.GetValue(), existing.GetValue()} - i, err := dht.Selector.BestRecord(pmes.GetKey(), recs) + i, err := dht.Validator.Select(rec.GetKey(), recs) if err != nil { log.Warningf("Bad dht record in PUT from %s: %s", p.Pretty(), err) return nil, err @@ -237,7 +242,7 @@ func (dht *IpfsDHT) getRecordFromDatastore(dskey ds.Key) (*recpb.Record, error) return nil, nil } - err = dht.Validator.VerifyRecord(rec) + err = dht.Validator.Validate(rec.GetKey(), rec.GetValue()) if err != nil { // Invalid record in datastore, probably expired but don't return an error, // we'll just overwrite it diff --git a/package.json b/package.json index ca81103b7..821532d1f 100644 --- a/package.json +++ b/package.json @@ -72,9 +72,9 @@ }, { "author": "whyrusleeping", - "hash": "QmZ9V14gpwKsTUG7y5mHZDnHSF4Fa4rKsXNx7jSTEQ4JWs", + "hash": "QmTUyK82BVPA6LmSzEJpfEunk9uBaQzWtMsNP917tVj4sT", "name": "go-libp2p-record", - "version": "4.0.1" + "version": "4.1.0" }, { "author": "whyrusleeping", @@ -168,3 +168,4 @@ "releaseCmd": "git commit -a -m \"gx publish $VERSION\"", "version": "4.0.4" } + diff --git a/routing.go b/routing.go index f44c7c89e..4f7f34423 100644 --- a/routing.go +++ b/routing.go @@ -125,7 +125,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...ropts.Opti return nil, routing.ErrNotFound } - i, err := dht.Selector.BestRecord(key, recs) + i, err := dht.Validator.Select(key, recs) if err != nil { return nil, err } From c0d3351b8d25dc56887037141e4bf6dd5011e8ab Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Tue, 15 May 2018 19:00:16 +0100 Subject: [PATCH 3/4] revert interface changes and add options Instead of changing the existing constructors, add a new DHT constructor that takes options (and add DHT options). --- dht.go | 57 +++++++++++++-------- dht_test.go | 24 ++++----- ext_test.go | 26 ++++++---- opts/options.go | 87 ++++++++++++++++++++++++++++++++ options.go => routing_options.go | 0 5 files changed, 150 insertions(+), 44 deletions(-) create mode 100644 opts/options.go rename options.go => routing_options.go (100%) diff --git a/dht.go b/dht.go index e7e59ccd8..d304c3a1b 100644 --- a/dht.go +++ b/dht.go @@ -10,9 +10,9 @@ import ( "sync" "time" + opts "github.com/libp2p/go-libp2p-kad-dht/opts" pb "github.com/libp2p/go-libp2p-kad-dht/pb" providers "github.com/libp2p/go-libp2p-kad-dht/providers" - routing "github.com/libp2p/go-libp2p-routing" proto "github.com/gogo/protobuf/proto" cid "github.com/ipfs/go-cid" @@ -28,6 +28,7 @@ import ( protocol "github.com/libp2p/go-libp2p-protocol" record "github.com/libp2p/go-libp2p-record" recpb "github.com/libp2p/go-libp2p-record/pb" + routing "github.com/libp2p/go-libp2p-routing" base32 "github.com/whyrusleeping/base32" ) @@ -65,24 +66,13 @@ type IpfsDHT struct { plk sync.Mutex } -// NewDHT creates a new DHT object with the given peer as the 'local' host. -// IpfsDHT's initialized with this function will respond to DHT requests, -// whereas IpfsDHT's initialized with NewDHTClient will not. -func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching, validator record.Validator) *IpfsDHT { - dht := NewDHTClient(ctx, h, dstore, validator) - - h.SetStreamHandler(ProtocolDHT, dht.handleNewStream) - h.SetStreamHandler(ProtocolDHTOld, dht.handleNewStream) - - return dht -} - -// NewDHTClient creates a new DHT object with the given peer as the 'local' -// host. IpfsDHT clients initialized with this function will not respond to DHT -// requests. If you need a peer to respond to DHT requests, use NewDHT instead. -// NewDHTClient creates a new DHT object with the given peer as the 'local' host -func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching, validator record.Validator) *IpfsDHT { - dht := makeDHT(ctx, h, dstore) +// New creates a new DHT with the specified host and options. +func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, error) { + var cfg opts.Options + if err := cfg.Apply(append([]opts.Option{opts.Defaults}, options...)...); err != nil { + return nil, err + } + dht := makeDHT(ctx, h, cfg.Datastore) // register for network notifs. dht.host.Network().Notify((*netNotifiee)(dht)) @@ -94,8 +84,35 @@ func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching, validato }) dht.proc.AddChild(dht.providers.Process()) - dht.Validator = validator + dht.Validator = cfg.Validator + if !cfg.Client { + h.SetStreamHandler(ProtocolDHT, dht.handleNewStream) + h.SetStreamHandler(ProtocolDHTOld, dht.handleNewStream) + } + return dht, nil +} + +// NewDHT creates a new DHT object with the given peer as the 'local' host. +// IpfsDHT's initialized with this function will respond to DHT requests, +// whereas IpfsDHT's initialized with NewDHTClient will not. +func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT { + dht, err := New(ctx, h, opts.Datastore(dstore)) + if err != nil { + panic(err) + } + return dht +} + +// NewDHTClient creates a new DHT object with the given peer as the 'local' +// host. IpfsDHT clients initialized with this function will not respond to DHT +// requests. If you need a peer to respond to DHT requests, use NewDHT instead. +// NewDHTClient creates a new DHT object with the given peer as the 'local' host +func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT { + dht, err := New(ctx, h, opts.Datastore(dstore), opts.Client(true)) + if err != nil { + panic(err) + } return dht } diff --git a/dht_test.go b/dht_test.go index 782bfbeb5..3d22920fd 100644 --- a/dht_test.go +++ b/dht_test.go @@ -11,11 +11,10 @@ import ( "testing" "time" + opts "github.com/libp2p/go-libp2p-kad-dht/opts" pb "github.com/libp2p/go-libp2p-kad-dht/pb" cid "github.com/ipfs/go-cid" - ds "github.com/ipfs/go-datastore" - dssync "github.com/ipfs/go-datastore/sync" u "github.com/ipfs/go-ipfs-util" kb "github.com/libp2p/go-libp2p-kbucket" netutil "github.com/libp2p/go-libp2p-netutil" @@ -72,18 +71,15 @@ func (testValidator) Validate(_ string, b []byte) error { } func setupDHT(ctx context.Context, t *testing.T, client bool) *IpfsDHT { - h := bhost.New(netutil.GenSwarmNetwork(t, ctx)) - - dss := dssync.MutexWrap(ds.NewMapDatastore()) - var d *IpfsDHT - validator := record.NamespacedValidator{ - "v": blankValidator{}, - "pk": record.PublicKeyValidator{}, - } - if client { - d = NewDHTClient(ctx, h, dss, validator) - } else { - d = NewDHT(ctx, h, dss, validator) + d, err := New( + ctx, + bhost.New(netutil.GenSwarmNetwork(t, ctx)), + opts.Client(client), + opts.NamespacedValidator("v", blankValidator{}), + ) + + if err != nil { + t.Fatal(err) } return d } diff --git a/ext_test.go b/ext_test.go index aae1a8b24..90e48187a 100644 --- a/ext_test.go +++ b/ext_test.go @@ -8,8 +8,6 @@ import ( "time" ggio "github.com/gogo/protobuf/io" - ds "github.com/ipfs/go-datastore" - dssync "github.com/ipfs/go-datastore/sync" u "github.com/ipfs/go-ipfs-util" pb "github.com/libp2p/go-libp2p-kad-dht/pb" inet "github.com/libp2p/go-libp2p-net" @@ -31,8 +29,10 @@ func TestGetFailures(t *testing.T) { } hosts := mn.Hosts() - tsds := dssync.MutexWrap(ds.NewMapDatastore()) - d := NewDHT(ctx, hosts[0], tsds, record.NamespacedValidator{}) + d, err := New(ctx, hosts[0]) + if err != nil { + t.Fatal(err) + } d.Update(ctx, hosts[1].ID()) // Reply with failures to every message @@ -148,8 +148,10 @@ func TestNotFound(t *testing.T) { t.Fatal(err) } hosts := mn.Hosts() - tsds := dssync.MutexWrap(ds.NewMapDatastore()) - d := NewDHT(ctx, hosts[0], tsds, record.NamespacedValidator{}) + d, err := New(ctx, hosts[0]) + if err != nil { + t.Fatal(err) + } for _, p := range hosts { d.Update(ctx, p.ID()) @@ -225,8 +227,10 @@ func TestLessThanKResponses(t *testing.T) { } hosts := mn.Hosts() - tsds := dssync.MutexWrap(ds.NewMapDatastore()) - d := NewDHT(ctx, hosts[0], tsds, record.NamespacedValidator{}) + d, err := New(ctx, hosts[0]) + if err != nil { + t.Fatal(err) + } for i := 1; i < 5; i++ { d.Update(ctx, hosts[i].ID()) @@ -292,8 +296,10 @@ func TestMultipleQueries(t *testing.T) { t.Fatal(err) } hosts := mn.Hosts() - tsds := dssync.MutexWrap(ds.NewMapDatastore()) - d := NewDHT(ctx, hosts[0], tsds, record.NamespacedValidator{}) + d, err := New(ctx, hosts[0]) + if err != nil { + t.Fatal(err) + } d.Update(ctx, hosts[1].ID()) diff --git a/opts/options.go b/opts/options.go new file mode 100644 index 000000000..aec6a8c82 --- /dev/null +++ b/opts/options.go @@ -0,0 +1,87 @@ +package dhtopts + +import ( + "fmt" + + ds "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" + record "github.com/libp2p/go-libp2p-record" +) + +// Options is a structure containing all the options that can be used when constructing a DHT. +type Options struct { + Datastore ds.Batching + Validator record.Validator + Client bool +} + +// Apply applies the given options to this Option +func (o *Options) Apply(opts ...Option) error { + for i, opt := range opts { + if err := opt(o); err != nil { + return fmt.Errorf("dht option %d failed: %s", i, err) + } + } + return nil +} + +// Option DHT option type. +type Option func(*Options) error + +// Defaults are the default DHT options. This option will be automatically +// prepended to any options you pass to the DHT constructor. +var Defaults = func(o *Options) error { + o.Validator = record.NamespacedValidator{ + "pk": record.PublicKeyValidator{}, + } + o.Datastore = dssync.MutexWrap(ds.NewMapDatastore()) + return nil +} + +// Datastore configures the DHT to use the specified datastore. +// +// Defaults to an in-memory (temporary) map. +func Datastore(ds ds.Batching) Option { + return func(o *Options) error { + o.Datastore = ds + return nil + } +} + +// Client configures whether or not the DHT operates in client-only mode. +// +// Defaults to false. +func Client(only bool) Option { + return func(o *Options) error { + o.Client = only + return nil + } +} + +// Validator configures the DHT to use the specified validator. +// +// Defaults to a namespaced validator that can only validate public keys. +func Validator(v record.Validator) Option { + return func(o *Options) error { + o.Validator = v + return nil + } +} + +// NamespacedValidator adds a validator namespaced under `ns`. This option fails +// if the DHT is not using a `record.NamespacedValidator` as it's validator (it +// uses one by default but this can be overridden with the `Validator` option). +// +// Example: Given a validator registered as `NamespacedValidator("ipns", +// myValidator)`, all records with keys starting with `/ipns/` will be validated +// with `myValidator`. +func NamespacedValidator(ns string, v record.Validator) Option { + return func(o *Options) error { + nsval, ok := o.Validator.(record.NamespacedValidator) + if !ok { + return fmt.Errorf("can only add namespaced validators to a NamespacedValidator") + } + nsval[ns] = v + return nil + } +} diff --git a/options.go b/routing_options.go similarity index 100% rename from options.go rename to routing_options.go From 093ebc54cbe0261773d34a742bc4cdb7dfda3bb8 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Thu, 31 May 2018 17:55:19 -0700 Subject: [PATCH 4/4] gx publish 4.1.0 --- .gx/lastpubver | 2 +- package.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.gx/lastpubver b/.gx/lastpubver index f3b591abb..f993bd2d9 100644 --- a/.gx/lastpubver +++ b/.gx/lastpubver @@ -1 +1 @@ -4.0.4: QmSBxn1eLMdViZRDGW9rRHRYwtqq5bqUgipqTMPuTim616 +4.1.0: Qmd3jqhBQFvhfBNTSJMQL15GgyVMpdxKTta69Napvx6Myd diff --git a/package.json b/package.json index 821532d1f..5f3c1e239 100644 --- a/package.json +++ b/package.json @@ -166,6 +166,6 @@ "license": "MIT", "name": "go-libp2p-kad-dht", "releaseCmd": "git commit -a -m \"gx publish $VERSION\"", - "version": "4.0.4" + "version": "4.1.0" }