Skip to content

Commit

Permalink
Merge pull request #403 from aarshkshah1992/feat/finish-auto-bootstrap
Browse files Browse the repository at this point in the history
feat(bootstrap): take autobootstrap to completion
  • Loading branch information
Stebalien authored Nov 5, 2019
2 parents 9c02087 + 2fdad28 commit 4fd6498
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 64 deletions.
8 changes: 6 additions & 2 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ type IpfsDHT struct {

bootstrapCfg opts.BootstrapConfig

triggerBootstrap chan struct{}
triggerAutoBootstrap bool
triggerBootstrap chan struct{}
latestSelfWalk time.Time // the last time we looked-up our own peerID in the network
}

// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
Expand Down Expand Up @@ -103,12 +105,14 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er

dht.proc.AddChild(dht.providers.Process())
dht.Validator = cfg.Validator
dht.triggerAutoBootstrap = cfg.TriggerAutoBootstrap

if !cfg.Client {
for _, p := range cfg.Protocols {
h.SetStreamHandler(p, dht.handleNewStream)
}
}
dht.startBootstrapping()
return dht, nil
}

Expand Down Expand Up @@ -174,7 +178,7 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p
writeResp := func(errorChan chan error, err error) {
select {
case <-proc.Closing():
case errorChan <- err:
case errorChan <- errChan:
}
close(errorChan)
}
Expand Down
89 changes: 49 additions & 40 deletions dht_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"sync"
"time"

process "github.com/jbenet/goprocess"
processctx "github.com/jbenet/goprocess/context"
"github.com/libp2p/go-libp2p-core/routing"
"github.com/multiformats/go-multiaddr"
_ "github.com/multiformats/go-multiaddr-dns"
Expand Down Expand Up @@ -41,52 +43,57 @@ func init() {
}
}

// BootstrapConfig runs cfg.Queries bootstrap queries every cfg.BucketPeriod.
func (dht *IpfsDHT) Bootstrap(ctx context.Context) error {
triggerBootstrapFnc := func() {
logger.Infof("triggerBootstrapFnc: RT only has %d peers which is less than the min threshold of %d, triggering self & bucket bootstrap",
dht.routingTable.Size(), minRTBootstrapThreshold)

if err := dht.selfWalk(ctx); err != nil {
logger.Warningf("triggerBootstrapFnc: self walk: error: %s", err)
}
// Start the bootstrap worker.
func (dht *IpfsDHT) startBootstrapping() error {
// scan the RT table periodically & do a random walk on k-buckets that haven't been queried since the given bucket period
dht.proc.Go(func(proc process.Process) {
ctx := processctx.OnClosingContext(proc)

if err := dht.bootstrapBuckets(ctx); err != nil {
logger.Warningf("triggerBootstrapFnc: bootstrap buckets: error bootstrapping: %s", err)
}
}
scanInterval := time.NewTicker(dht.bootstrapCfg.BucketPeriod)
defer scanInterval.Stop()

// we should query for self periodically so we can discover closer peers
go func() {
for {
err := dht.selfWalk(ctx)
if err != nil {
logger.Warningf("self walk: error: %s", err)
}
select {
case <-time.After(dht.bootstrapCfg.SelfQueryInterval):
case <-ctx.Done():
return
// run bootstrap if option is set
if dht.triggerAutoBootstrap {
if err := dht.doBootstrap(ctx, true); err != nil {
logger.Warningf("bootstrap error: %s", err)
}
} else {
// disable the "auto-bootstrap" ticker so that no more ticks are sent to this channel
scanInterval.Stop()
}
}()

// scan the RT table periodically & do a random walk on k-buckets that haven't been queried since the given bucket period
go func() {
for {
err := dht.bootstrapBuckets(ctx)
if err != nil {
logger.Warningf("bootstrap buckets: error bootstrapping: %s", err)
}
select {
case <-time.After(dht.bootstrapCfg.RoutingTableScanInterval):
case now := <-scanInterval.C:
walkSelf := now.After(dht.latestSelfWalk.Add(dht.bootstrapCfg.SelfQueryInterval))
if err := dht.doBootstrap(ctx, walkSelf); err != nil {
logger.Warning("bootstrap error: %s", err)
}
case <-dht.triggerBootstrap:
triggerBootstrapFnc()
logger.Infof("triggering a bootstrap: RT has %d peers", dht.routingTable.Size())
if err := dht.doBootstrap(ctx, true); err != nil {
logger.Warning("bootstrap error: %s", err)
}
case <-ctx.Done():
return
}
}
}()
})

return nil
}

func (dht *IpfsDHT) doBootstrap(ctx context.Context, walkSelf bool) error {
if walkSelf {
if err := dht.selfWalk(ctx); err != nil {
return fmt.Errorf("self walk: error: %s", err)
}
dht.latestSelfWalk = time.Now()
}

if err := dht.bootstrapBuckets(ctx); err != nil {
return fmt.Errorf("bootstrap buckets: error bootstrapping: %s", err)
}

return nil
}
Expand Down Expand Up @@ -166,11 +173,13 @@ func (dht *IpfsDHT) selfWalk(ctx context.Context) error {
return err
}

// synchronous bootstrap.
func (dht *IpfsDHT) bootstrapOnce(ctx context.Context) error {
if err := dht.selfWalk(ctx); err != nil {
return errors.Wrap(err, "failed bootstrap while searching for self")
} else {
return dht.bootstrapBuckets(ctx)
// Bootstrap tells the DHT to get into a bootstrapped state.
//
// Note: the context is ignored.
func (dht *IpfsDHT) Bootstrap(_ context.Context) error {
select {
case dht.triggerBootstrap <- struct{}{}:
default:
}
return nil
}
21 changes: 18 additions & 3 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func setupDHT(ctx context.Context, t *testing.T, client bool) *IpfsDHT {
bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)),
opts.Client(client),
opts.NamespacedValidator("v", blankValidator{}),
opts.DisableAutoBootstrap(),
)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -200,7 +201,7 @@ func bootstrap(t *testing.T, ctx context.Context, dhts []*IpfsDHT) {
start := rand.Intn(len(dhts)) // randomize to decrease bias.
for i := range dhts {
dht := dhts[(start+i)%len(dhts)]
dht.bootstrapOnce(ctx)
dht.Bootstrap(ctx)
}
}

Expand Down Expand Up @@ -690,7 +691,18 @@ func TestBootstrap(t *testing.T) {

func TestBootstrapBelowMinRTThreshold(t *testing.T) {
ctx := context.Background()
dhtA := setupDHT(ctx, t, false)

// enable auto bootstrap on A
dhtA, err := New(
ctx,
bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)),
opts.Client(false),
opts.NamespacedValidator("v", blankValidator{}),
)
if err != nil {
t.Fatal(err)
}

dhtB := setupDHT(ctx, t, false)
dhtC := setupDHT(ctx, t, false)

Expand Down Expand Up @@ -783,7 +795,7 @@ func TestPeriodicBootstrap(t *testing.T) {

t.Logf("bootstrapping them so they find each other. %d", nDHTs)
for _, dht := range dhts {
go dht.bootstrapOnce(ctx)
go dht.Bootstrap(ctx)
}

// this is async, and we dont know when it's finished with one cycle, so keep checking
Expand Down Expand Up @@ -1416,6 +1428,7 @@ func TestGetSetPluggedProtocol(t *testing.T) {
opts.Protocols("/esh/dht"),
opts.Client(false),
opts.NamespacedValidator("v", blankValidator{}),
opts.DisableAutoBootstrap(),
}

dhtA, err := New(ctx, bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)), os...)
Expand Down Expand Up @@ -1454,6 +1467,7 @@ func TestGetSetPluggedProtocol(t *testing.T) {
opts.Protocols("/esh/dht"),
opts.Client(false),
opts.NamespacedValidator("v", blankValidator{}),
opts.DisableAutoBootstrap(),
}...)
if err != nil {
t.Fatal(err)
Expand All @@ -1463,6 +1477,7 @@ func TestGetSetPluggedProtocol(t *testing.T) {
opts.Protocols("/lsr/dht"),
opts.Client(false),
opts.NamespacedValidator("v", blankValidator{}),
opts.DisableAutoBootstrap(),
}...)
if err != nil {
t.Fatal(err)
Expand Down
14 changes: 10 additions & 4 deletions ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/routing"
opts "github.com/libp2p/go-libp2p-kad-dht/opts"

ggio "github.com/gogo/protobuf/io"
u "github.com/ipfs/go-ipfs-util"
Expand All @@ -29,7 +30,8 @@ func TestGetFailures(t *testing.T) {
}
hosts := mn.Hosts()

d, err := New(ctx, hosts[0])
os := []opts.Option{opts.DisableAutoBootstrap()}
d, err := New(ctx, hosts[0], os...)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -149,7 +151,9 @@ func TestNotFound(t *testing.T) {
t.Fatal(err)
}
hosts := mn.Hosts()
d, err := New(ctx, hosts[0])

os := []opts.Option{opts.DisableAutoBootstrap()}
d, err := New(ctx, hosts[0], os...)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -228,7 +232,8 @@ func TestLessThanKResponses(t *testing.T) {
}
hosts := mn.Hosts()

d, err := New(ctx, hosts[0])
os := []opts.Option{opts.DisableAutoBootstrap()}
d, err := New(ctx, hosts[0], os...)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -297,7 +302,8 @@ func TestMultipleQueries(t *testing.T) {
t.Fatal(err)
}
hosts := mn.Hosts()
d, err := New(ctx, hosts[0])
os := []opts.Option{opts.DisableAutoBootstrap()}
d, err := New(ctx, hosts[0], os...)
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions notif.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (nn *netNotifiee) Connected(n network.Network, v network.Conn) {
if dht.host.Network().Connectedness(p) == network.Connected {
bootstrap := dht.routingTable.Size() <= minRTBootstrapThreshold
dht.Update(dht.Context(), p)
if bootstrap {
if bootstrap && dht.triggerAutoBootstrap {
select {
case dht.triggerBootstrap <- struct{}{}:
default:
Expand Down Expand Up @@ -80,7 +80,7 @@ func (nn *netNotifiee) testConnection(v network.Conn) {
if dht.host.Network().Connectedness(p) == network.Connected {
bootstrap := dht.routingTable.Size() <= minRTBootstrapThreshold
dht.Update(dht.Context(), p)
if bootstrap {
if bootstrap && dht.triggerAutoBootstrap {
select {
case dht.triggerBootstrap <- struct{}{}:
default:
Expand Down
35 changes: 22 additions & 13 deletions opts/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,20 @@ var (

// BootstrapConfig specifies parameters used for bootstrapping the DHT.
type BootstrapConfig struct {
BucketPeriod time.Duration // how long to wait for a k-bucket to be queried before doing a random walk on it
Timeout time.Duration // how long to wait for a bootstrap query to run
RoutingTableScanInterval time.Duration // how often to scan the RT for k-buckets that haven't been queried since the given period
SelfQueryInterval time.Duration // how often to query for self
BucketPeriod time.Duration // how long to wait for a k-bucket to be queried before doing a random walk on it
Timeout time.Duration // how long to wait for a bootstrap query to run
SelfQueryInterval time.Duration // how often to query for self
}

// Options is a structure containing all the options that can be used when constructing a DHT.
type Options struct {
Datastore ds.Batching
Validator record.Validator
Client bool
Protocols []protocol.ID
BucketSize int
BootstrapConfig BootstrapConfig
Datastore ds.Batching
Validator record.Validator
Client bool
Protocols []protocol.ID
BucketSize int
BootstrapConfig BootstrapConfig
TriggerAutoBootstrap bool
}

// Apply applies the given options to this Option
Expand Down Expand Up @@ -63,14 +63,13 @@ var Defaults = func(o *Options) error {
// same as that mentioned in the kad dht paper
BucketPeriod: 1 * time.Hour,

// since the default bucket period is 1 hour, a scan interval of 30 minutes sounds reasonable
RoutingTableScanInterval: 30 * time.Minute,

Timeout: 10 * time.Second,

SelfQueryInterval: 1 * time.Hour,
}

o.TriggerAutoBootstrap = true

return nil
}

Expand Down Expand Up @@ -149,3 +148,13 @@ func BucketSize(bucketSize int) Option {
return nil
}
}

// DisableAutoBootstrap completely disables 'auto-bootstrap' on the Dht
// This means that neither will we do periodic bootstrap nor will we
// bootstrap the Dht even if the Routing Table size goes below the minimum threshold
func DisableAutoBootstrap() Option {
return func(o *Options) error {
o.TriggerAutoBootstrap = false
return nil
}
}

0 comments on commit 4fd6498

Please sign in to comment.