Skip to content

Commit

Permalink
replace GetValues with a GetValue that takes options
Browse files Browse the repository at this point in the history
Changes for: libp2p/go-libp2p-routing#21

(also fixes some test issues)
  • Loading branch information
Stebalien committed Apr 26, 2018
1 parent d38a447 commit b2a323e
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 73 deletions.
6 changes: 2 additions & 4 deletions ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package dht

import (
"context"
"io"
"math/rand"
"testing"
"time"
Expand Down Expand Up @@ -35,9 +34,8 @@ func TestGetFailures(t *testing.T) {
d := NewDHT(ctx, hosts[0], tsds)
d.Update(ctx, hosts[1].ID())

// Reply with failures to every message
hosts[1].SetStreamHandler(ProtocolDHT, func(s inet.Stream) {
s.Close()
// hang forever
})

// This one should time out
Expand All @@ -48,7 +46,7 @@ func TestGetFailures(t *testing.T) {
err = merr[0]
}

if err != io.EOF {
if err != context.DeadlineExceeded {
t.Fatal("Got different error than we expected", err)
}
} else {
Expand Down
29 changes: 29 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package dht

import (
ropts "github.com/libp2p/go-libp2p-routing/options"
)

type quorumOptionKey struct{}

// Quorum is a DHT option that tells the DHT how many peers it needs to get
// values from before returning the best one.
//
// Default: 16
func Quorum(n int) ropts.Option {
return func(opts *ropts.Options) error {
if opts.Other == nil {
opts.Other = make(map[interface{}]interface{}, 1)
}
opts.Other[quorumOptionKey{}] = n
return nil
}
}

func getQuorum(opts *ropts.Options) int64 {
responsesNeeded, ok := opts.Other[quorumOptionKey{}].(int)
if !ok {
responsesNeeded = 16
}
return int64(responsesNeeded)
}
9 changes: 2 additions & 7 deletions records.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,12 @@ func (dht *IpfsDHT) getPublicKeyFromDHT(ctx context.Context, p peer.ID) (ci.PubK
// Only retrieve one value, because the public key is immutable
// so there's no need to retrieve multiple versions
pkkey := routing.KeyForPublicKey(p)
vals, err := dht.GetValues(ctx, pkkey, 1)
val, err := dht.GetValue(ctx, pkkey, Quorum(1))
if err != nil {
return nil, err
}

if len(vals) == 0 || vals[0].Val == nil {
log.Debugf("Could not find public key for %v in DHT", p)
return nil, routing.ErrNotFound
}

pubk, err := ci.UnmarshalPublicKey(vals[0].Val)
pubk, err := ci.UnmarshalPublicKey(val)
if err != nil {
log.Errorf("Could not unmarshall public key retrieved from DHT for %v", p)
return nil, err
Expand Down
166 changes: 104 additions & 62 deletions routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"runtime"
"sync"
"sync/atomic"
"time"

proto "github.com/gogo/protobuf/proto"
Expand All @@ -21,6 +22,7 @@ import (
record "github.com/libp2p/go-libp2p-record"
routing "github.com/libp2p/go-libp2p-routing"
notif "github.com/libp2p/go-libp2p-routing/notifications"
ropts "github.com/libp2p/go-libp2p-routing/options"
)

// asyncQueryBuffer is the size of buffered channels in async queries. This
Expand All @@ -35,7 +37,7 @@ var asyncQueryBuffer = 10

// PutValue adds value corresponding to given Key.
// This is the top level "Store" operation of the DHT
func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte) (err error) {
func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, options ...ropts.Option) (err error) {
eip := log.EventBegin(ctx, "PutValue")
defer func() {
eip.Append(loggableKey(key))
Expand All @@ -46,6 +48,8 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte) (err
}()
log.Debugf("PutValue %s", key)

// TODO: How to handle the offline option?

rec := record.MakePutRecord(key, value)
rec.TimeReceived = proto.String(u.FormatRFC3339(time.Now()))
err = dht.putLocal(key, rec)
Expand Down Expand Up @@ -81,7 +85,7 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte) (err
}

// GetValue searches for the value corresponding to given Key.
func (dht *IpfsDHT) GetValue(ctx context.Context, key string) (_ []byte, err error) {
func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...ropts.Option) (_ []byte, err error) {
eip := log.EventBegin(ctx, "GetValue")
defer func() {
eip.Append(loggableKey(key))
Expand All @@ -93,39 +97,55 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key string) (_ []byte, err err
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()

vals, err := dht.GetValues(ctx, key, 16)
if err != nil {
var cfg ropts.Options
if err := cfg.Apply(opts...); err != nil {
return nil, err
}

recs := make([][]byte, 0, len(vals))
for _, v := range vals {
if v.Val != nil {
recs = append(recs, v.Val)
}
}
if len(recs) == 0 {
return nil, routing.ErrNotFound
}

i, err := dht.Selector.BestRecord(key, recs)
results, err := dht.getValues(ctx, key, &cfg)
if err != nil {
return nil, err
}

best := recs[i]
log.Debugf("GetValue %v %v", key, best)
if best == nil {
log.Errorf("GetValue yielded correct record with nil value.")
return nil, routing.ErrNotFound
var outdatedPeers, currentPeers []peer.ID

var best []byte
for result := range results {
switch {
case result.val == nil:
outdatedPeers = append(outdatedPeers, result.from)
case best == nil:
best = result.val
fallthrough
case bytes.Equal(result.val, best):
currentPeers = append(currentPeers, result.from)
default:
i, err := dht.Selector.BestRecord(key, [][]byte{best, result.val})
if err != nil {
log.Error(err)
return nil, err
}
switch i {
case 0:
outdatedPeers = append(outdatedPeers, result.from)
case 1:
outdatedPeers = append(outdatedPeers, currentPeers...)
currentPeers = append(currentPeers[:0], result.from)
default:
err := fmt.Errorf("invalid bad selector for key: %s", loggableKey(key))
log.Error(err)
return nil, err
}
}
}

fixupRec := record.MakePutRecord(key, best)
for _, v := range vals {
// if someone sent us a different 'less-valid' record, lets correct them
if !bytes.Equal(v.Val, best) {
go func(v routing.RecvdVal) {
if v.From == dht.self {
// if someone sent us a different 'less-valid' record, lets correct them
if best != nil && len(outdatedPeers) > 0 {
fixupRec := record.MakePutRecord(key, best)
for _, p := range outdatedPeers {
// TODO: Use a worker.
go func(p peer.ID) {
if p == dht.self {
err := dht.putLocal(key, fixupRec)
if err != nil {
log.Error("Error correcting local dht entry:", err)
Expand All @@ -134,45 +154,60 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key string) (_ []byte, err err
}
ctx, cancel := context.WithTimeout(dht.Context(), time.Second*30)
defer cancel()
err := dht.putValueToPeer(ctx, v.From, key, fixupRec)
err := dht.putValueToPeer(ctx, p, key, fixupRec)
if err != nil {
log.Error("Error correcting DHT entry: ", err)
}
}(v)
}(p)
}
}

if err := ctx.Err(); err != nil {
return best, err
}
if best == nil {
return nil, routing.ErrNotFound
}

return best, nil
}

func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []routing.RecvdVal, err error) {
eip := log.EventBegin(ctx, "GetValues")
type recvdVal struct {
val []byte
from peer.ID
}

func (dht *IpfsDHT) getValues(ctx context.Context, key string, opts *ropts.Options) (_ <-chan recvdVal, _err error) {
eip := log.EventBegin(ctx, "getValues")
defer func() {
eip.Append(loggableKey(key))
if err != nil {
eip.SetError(err)
if _err != nil {
eip.SetError(_err)
}
eip.Done()
}()
vals := make([]routing.RecvdVal, 0, nvals)
var valslock sync.Mutex

// If we have it local, don't bother doing an RPC!
if err := ctx.Err(); err != nil {
return nil, err
}

vals := make(chan recvdVal, 1)

responsesNeeded := getQuorum(opts)

// If we have it locally, don't bother doing an RPC!
lrec, err := dht.getLocal(key)
if err == nil {
// TODO: this is tricky, we don't always want to trust our own value
// what if the authoritative source updated it?
log.Debug("have it locally")
vals = append(vals, routing.RecvdVal{
Val: lrec.GetValue(),
From: dht.self,
})

if nvals <= 1 {
return vals, nil
vals <- recvdVal{
val: lrec.GetValue(),
from: dht.self,
}
} else if nvals == 0 {
return nil, err
responsesNeeded--
}

if opts.Offline || responsesNeeded <= 0 {
close(vals)
return vals, nil
}

// get closest peers in the routing table
Expand Down Expand Up @@ -212,18 +247,17 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []r
res := &dhtQueryResult{closerPeers: peers}

if rec.GetValue() != nil || err == errInvalidRecord {
rv := routing.RecvdVal{
Val: rec.GetValue(),
From: p,
select {
case vals <- recvdVal{
val: rec.GetValue(),
from: p,
}:
case <-ctx.Done():
return nil, ctx.Err()
}
valslock.Lock()
vals = append(vals, rv)

// If we have collected enough records, we're done
if len(vals) >= nvals {
if atomic.AddInt64(&responsesNeeded, -1) <= 0 {
res.success = true
}
valslock.Unlock()
}

notif.PublishQueryEvent(parent, &notif.QueryEvent{
Expand All @@ -235,13 +269,21 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []r
return res, nil
})

// run it!
_, err = query.Run(ctx, rtp)
if len(vals) == 0 {
if err != nil {
return nil, err
go func() {
defer close(vals)
// run it!
_, err := query.Run(ctx, rtp)

if err == nil || ctx.Err() != nil {
return
}
}

// Not much we can do about the error.
// Any error that's *not* a context related error is likely a
// programmer error. There's not much that a user can do about
// it and no real reason to expose it.
log.Error(err)
}()

return vals, nil
}
Expand Down

0 comments on commit b2a323e

Please sign in to comment.