Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dual DHT scaffold #570

Merged
merged 13 commits into from
Apr 9, 2020
17 changes: 9 additions & 8 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ type IpfsDHT struct {
// DHT protocols we can respond to.
serverProtocols []protocol.ID

auto bool
auto ModeOpt
mode mode
modeLk sync.Mutex

Expand Down Expand Up @@ -159,15 +159,11 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)

dht.Validator = cfg.validator

dht.auto = cfg.mode
switch cfg.mode {
case ModeAuto:
dht.auto = true
case ModeAuto, ModeClient:
dht.mode = modeClient
case ModeClient:
dht.auto = false
dht.mode = modeClient
case ModeServer:
dht.auto = false
case ModeAutoServer, ModeServer:
dht.mode = modeServer
default:
return nil, fmt.Errorf("invalid dht mode %d", cfg.mode)
Expand Down Expand Up @@ -312,6 +308,11 @@ func makeRoutingTable(dht *IpfsDHT, cfg config) (*kb.RoutingTable, error) {
return rt, err
}

// Mode allows introspection of the operation mode of the DHT
func (dht *IpfsDHT) Mode() ModeOpt {
return dht.auto
}

// fixLowPeers tries to get more peers into the routing table if we're below the threshold
func (dht *IpfsDHT) fixLowPeersRoutine(proc goprocess.Process) {
for {
Expand Down
11 changes: 11 additions & 0 deletions dht_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ const (
ModeClient
// ModeServer operates the DHT as a server, it can both send and respond to queries
ModeServer
// ModeAutoServer operates in the same way as ModeAuto, but acts as a server when reachability is unknown
ModeAutoServer
)

// DefaultPrefix is the application specific prefix attached to all DHT protocols by default.
Expand Down Expand Up @@ -256,6 +258,15 @@ func ProtocolPrefix(prefix protocol.ID) Option {
}
}

// ProtocolExtension adds an application specific protocol to the DHT protocol. For example,
// /ipfs/lan/kad/1.0.0 instead of /ipfs/kad/1.0.0. extension should be of the form /lan.
func ProtocolExtension(ext protocol.ID) Option {
return func(c *config) error {
c.protocolPrefix += ext
return nil
}
}

// BucketSize configures the bucket size (k in the Kademlia paper) of the routing table.
//
// The default value is 20.
Expand Down
36 changes: 6 additions & 30 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/stretchr/testify/require"

pb "github.com/libp2p/go-libp2p-kad-dht/pb"
test "github.com/libp2p/go-libp2p-kad-dht/internal/testing"

"github.com/ipfs/go-cid"
u "github.com/ipfs/go-ipfs-util"
Expand Down Expand Up @@ -72,33 +73,8 @@ type blankValidator struct{}
func (blankValidator) Validate(_ string, _ []byte) error { return nil }
func (blankValidator) Select(_ string, _ [][]byte) (int, error) { return 0, nil }

type testValidator struct{}

func (testValidator) Select(_ string, bs [][]byte) (int, error) {
index := -1
for i, b := range bs {
if bytes.Equal(b, []byte("newer")) {
index = i
} else if bytes.Equal(b, []byte("valid")) {
if index == -1 {
index = i
}
}
}
if index == -1 {
return -1, errors.New("no rec found")
}
return index, nil
}
func (testValidator) Validate(_ string, b []byte) error {
if bytes.Equal(b, []byte("expired")) {
return errors.New("expired")
}
return nil
}

type testAtomicPutValidator struct {
testValidator
test.TestValidator
}

// selects the entry with the 'highest' last byte
Expand Down Expand Up @@ -372,7 +348,7 @@ func TestValueSetInvalid(t *testing.T) {
defer dhtA.host.Close()
defer dhtB.host.Close()

dhtA.Validator.(record.NamespacedValidator)["v"] = testValidator{}
dhtA.Validator.(record.NamespacedValidator)["v"] = test.TestValidator{}
dhtB.Validator.(record.NamespacedValidator)["v"] = blankValidator{}

connect(t, ctx, dhtA, dhtB)
Expand Down Expand Up @@ -451,8 +427,8 @@ func TestSearchValue(t *testing.T) {

connect(t, ctx, dhtA, dhtB)

dhtA.Validator.(record.NamespacedValidator)["v"] = testValidator{}
dhtB.Validator.(record.NamespacedValidator)["v"] = testValidator{}
dhtA.Validator.(record.NamespacedValidator)["v"] = test.TestValidator{}
dhtB.Validator.(record.NamespacedValidator)["v"] = test.TestValidator{}

ctxT, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
Expand Down Expand Up @@ -554,7 +530,7 @@ func TestValueGetInvalid(t *testing.T) {
defer dhtB.host.Close()

dhtA.Validator.(record.NamespacedValidator)["v"] = blankValidator{}
dhtB.Validator.(record.NamespacedValidator)["v"] = testValidator{}
dhtB.Validator.(record.NamespacedValidator)["v"] = test.TestValidator{}

connect(t, ctx, dhtA, dhtB)

Expand Down
216 changes: 216 additions & 0 deletions dual/dual.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
// Package dual provides an implementaiton of a split or "dual" dht, where two parallel instances
// are maintained for the global internet and the local LAN respectively.
package dual

import (
"context"
"sync"

"github.com/ipfs/go-cid"
ci "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-core/routing"
dht "github.com/libp2p/go-libp2p-kad-dht"
helper "github.com/libp2p/go-libp2p-routing-helpers"

"github.com/hashicorp/go-multierror"
)

// DHT implements the routing interface to provide two concrete DHT implementationts for use
// in IPFS that are used to support both global network users and disjoint LAN usecases.
type DHT struct {
WAN *dht.IpfsDHT
LAN *dht.IpfsDHT
}

// LanExtension is used to differentiate local protocol requests from those on the WAN DHT.
const LanExtension protocol.ID = "/lan"

// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
// guarantee, but we can use them to aid refactoring.
var (
_ routing.ContentRouting = (*DHT)(nil)
_ routing.Routing = (*DHT)(nil)
_ routing.PeerRouting = (*DHT)(nil)
_ routing.PubKeyFetcher = (*DHT)(nil)
_ routing.ValueStore = (*DHT)(nil)
)

// New creates a new DualDHT instance. Options provided are forwarded on to the two concrete
// IpfsDHT internal constructions, modulo additional options used by the Dual DHT to enforce
// the LAN-vs-WAN distinction.
willscott marked this conversation as resolved.
Show resolved Hide resolved
// Note: query or routing table functional options provided as arguments to this function
// will be overriden by this constructor.
func New(ctx context.Context, h host.Host, options ...dht.Option) (*DHT, error) {
wanOpts := append(options,
dht.QueryFilter(dht.PublicQueryFilter),
dht.RoutingTableFilter(dht.PublicRoutingTableFilter),
)
wan, err := dht.New(ctx, h, wanOpts...)
if err != nil {
return nil, err
}

// Unless overridden by user supplied options, the LAN DHT should default
// to 'AutoServer' mode.
lanOpts := append(options,
dht.ProtocolExtension(LanExtension),
dht.QueryFilter(dht.PrivateQueryFilter),
dht.RoutingTableFilter(dht.PrivateRoutingTableFilter),
)
if wan.Mode() != dht.ModeClient {
lanOpts = append(lanOpts, dht.Mode(dht.ModeServer))
}
lan, err := dht.New(ctx, h, lanOpts...)
willscott marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}

impl := DHT{wan, lan}
return &impl, nil
}

// Close closes the DHT context.
func (dht *DHT) Close() error {
return multierror.Append(dht.WAN.Close(), dht.LAN.Close()).ErrorOrNil()
}

func (dht *DHT) activeWAN() bool {
return dht.WAN.RoutingTable().Size() > 0
}

// Provide adds the given cid to the content routing system.
func (dht *DHT) Provide(ctx context.Context, key cid.Cid, announce bool) error {
if dht.activeWAN() {
return dht.WAN.Provide(ctx, key, announce)
}
return dht.LAN.Provide(ctx, key, announce)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Stebalien @willscott I thought we were concerned about constantly providing/putting to people on our LANs. Did we decide that wasn't a big deal or am I missing something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this only does the puts/provides to the LAN-filtered DHT when there are no discovered peers in the WAN DHT routing table.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh right, I'm being silly didn't see the return 🤦

}

// FindProvidersAsync searches for peers who are able to provide a given key
func (dht *DHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo {
reqCtx, cancel := context.WithCancel(ctx)
outCh := make(chan peer.AddrInfo)
wanCh := dht.WAN.FindProvidersAsync(reqCtx, key, count)
lanCh := dht.LAN.FindProvidersAsync(reqCtx, key, count)
zeroCount := (count == 0)
go func() {
defer cancel()
defer close(outCh)

found := make(map[peer.ID]struct{}, count)
var pi peer.AddrInfo
for (zeroCount || count > 0) && (wanCh != nil || lanCh != nil) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, fine, this is definitely cleaner than the way I fixed this...

var ok bool
select {
case pi, ok = <-wanCh:
if !ok {
wanCh = nil
continue
}
case pi, ok = <-lanCh:
if !ok {
lanCh = nil
continue
}
}
// already found
if _, ok = found[pi.ID]; ok {
continue
}

select {
case outCh <- pi:
found[pi.ID] = struct{}{}
count--
case <-ctx.Done():
return
}
}
}()
return outCh
}

// FindPeer searches for a peer with given ID
// Note: with signed peer records, we can change this to short circuit once either DHT returns.
func (dht *DHT) FindPeer(ctx context.Context, pid peer.ID) (peer.AddrInfo, error) {
var wg sync.WaitGroup
wg.Add(2)
var wanInfo, lanInfo peer.AddrInfo
var wanErr, lanErr error
go func() {
defer wg.Done()
wanInfo, wanErr = dht.WAN.FindPeer(ctx, pid)
}()
go func() {
defer wg.Done()
lanInfo, lanErr = dht.LAN.FindPeer(ctx, pid)
}()

wg.Wait()

return peer.AddrInfo{
ID: pid,
Addrs: append(wanInfo.Addrs, lanInfo.Addrs...),
}, multierror.Append(wanErr, lanErr).ErrorOrNil()
}

// Bootstrap allows callers to hint to the routing system to get into a
// Boostrapped state and remain there.
func (dht *DHT) Bootstrap(ctx context.Context) error {
erra := dht.WAN.Bootstrap(ctx)
errb := dht.LAN.Bootstrap(ctx)
return multierror.Append(erra, errb).ErrorOrNil()
}

// PutValue adds value corresponding to given Key.
func (dht *DHT) PutValue(ctx context.Context, key string, val []byte, opts ...routing.Option) error {
if dht.activeWAN() {
return dht.WAN.PutValue(ctx, key, val, opts...)
}
return dht.LAN.PutValue(ctx, key, val, opts...)
}

// GetValue searches for the value corresponding to given Key.
func (d *DHT) GetValue(ctx context.Context, key string, opts ...routing.Option) ([]byte, error) {
lanCtx, cancelLan := context.WithCancel(ctx)
defer cancelLan()

var (
lanVal []byte
lanErr error
lanWaiter sync.WaitGroup
)
lanWaiter.Add(1)
go func() {
defer lanWaiter.Done()
lanVal, lanErr = d.LAN.GetValue(lanCtx, key, opts...)
}()

wanVal, wanErr := d.WAN.GetValue(ctx, key, opts...)
if wanErr == nil {
cancelLan()
}
lanWaiter.Wait()
if wanErr != nil {
if lanErr != nil {
return nil, multierror.Append(wanErr, lanErr).ErrorOrNil()
}
return lanVal, nil
}
return wanVal, nil
}

// SearchValue searches for better values from this value
func (dht *DHT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) {
p := helper.Parallel{Routers: []routing.Routing{dht.WAN, dht.LAN}, Validator: dht.WAN.Validator}
return p.SearchValue(ctx, key, opts...)
}

// GetPublicKey returns the public key for the given peer.
func (dht *DHT) GetPublicKey(ctx context.Context, pid peer.ID) (ci.PubKey, error) {
p := helper.Parallel{Routers: []routing.Routing{dht.WAN, dht.LAN}, Validator: dht.WAN.Validator}
return p.GetPublicKey(ctx, pid)
}
Loading