diff --git a/dht.go b/dht.go index 471ae4e7b..ac833bd5d 100644 --- a/dht.go +++ b/dht.go @@ -69,7 +69,9 @@ type IpfsDHT struct { bootstrapCfg opts.BootstrapConfig - triggerBootstrap chan struct{} + triggerAutoBootstrap bool + triggerBootstrap chan struct{} + latestSelfWalk time.Time // the last time we looked-up our own peerID in the network } // Assert that IPFS assumptions about interfaces aren't broken. These aren't a @@ -103,12 +105,14 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er dht.proc.AddChild(dht.providers.Process()) dht.Validator = cfg.Validator + dht.triggerAutoBootstrap = cfg.TriggerAutoBootstrap if !cfg.Client { for _, p := range cfg.Protocols { h.SetStreamHandler(p, dht.handleNewStream) } } + dht.startBootstrapping() return dht, nil } @@ -174,7 +178,7 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p writeResp := func(errorChan chan error, err error) { select { case <-proc.Closing(): - case errorChan <- err: + case errorChan <- errChan: } close(errorChan) } diff --git a/dht_bootstrap.go b/dht_bootstrap.go index 6e40c597a..914e01cfe 100644 --- a/dht_bootstrap.go +++ b/dht_bootstrap.go @@ -7,6 +7,8 @@ import ( "sync" "time" + process "github.com/jbenet/goprocess" + processctx "github.com/jbenet/goprocess/context" "github.com/libp2p/go-libp2p-core/routing" "github.com/multiformats/go-multiaddr" _ "github.com/multiformats/go-multiaddr-dns" @@ -41,52 +43,57 @@ func init() { } } -// BootstrapConfig runs cfg.Queries bootstrap queries every cfg.BucketPeriod. -func (dht *IpfsDHT) Bootstrap(ctx context.Context) error { - triggerBootstrapFnc := func() { - logger.Infof("triggerBootstrapFnc: RT only has %d peers which is less than the min threshold of %d, triggering self & bucket bootstrap", - dht.routingTable.Size(), minRTBootstrapThreshold) - - if err := dht.selfWalk(ctx); err != nil { - logger.Warningf("triggerBootstrapFnc: self walk: error: %s", err) - } +// Start the bootstrap worker. +func (dht *IpfsDHT) startBootstrapping() error { + // scan the RT table periodically & do a random walk on k-buckets that haven't been queried since the given bucket period + dht.proc.Go(func(proc process.Process) { + ctx := processctx.OnClosingContext(proc) - if err := dht.bootstrapBuckets(ctx); err != nil { - logger.Warningf("triggerBootstrapFnc: bootstrap buckets: error bootstrapping: %s", err) - } - } + scanInterval := time.NewTicker(dht.bootstrapCfg.BucketPeriod) + defer scanInterval.Stop() - // we should query for self periodically so we can discover closer peers - go func() { - for { - err := dht.selfWalk(ctx) - if err != nil { - logger.Warningf("self walk: error: %s", err) - } - select { - case <-time.After(dht.bootstrapCfg.SelfQueryInterval): - case <-ctx.Done(): - return + // run bootstrap if option is set + if dht.triggerAutoBootstrap { + if err := dht.doBootstrap(ctx, true); err != nil { + logger.Warningf("bootstrap error: %s", err) } + } else { + // disable the "auto-bootstrap" ticker so that no more ticks are sent to this channel + scanInterval.Stop() } - }() - // scan the RT table periodically & do a random walk on k-buckets that haven't been queried since the given bucket period - go func() { for { - err := dht.bootstrapBuckets(ctx) - if err != nil { - logger.Warningf("bootstrap buckets: error bootstrapping: %s", err) - } select { - case <-time.After(dht.bootstrapCfg.RoutingTableScanInterval): + case now := <-scanInterval.C: + walkSelf := now.After(dht.latestSelfWalk.Add(dht.bootstrapCfg.SelfQueryInterval)) + if err := dht.doBootstrap(ctx, walkSelf); err != nil { + logger.Warning("bootstrap error: %s", err) + } case <-dht.triggerBootstrap: - triggerBootstrapFnc() + logger.Infof("triggering a bootstrap: RT has %d peers", dht.routingTable.Size()) + if err := dht.doBootstrap(ctx, true); err != nil { + logger.Warning("bootstrap error: %s", err) + } case <-ctx.Done(): return } } - }() + }) + + return nil +} + +func (dht *IpfsDHT) doBootstrap(ctx context.Context, walkSelf bool) error { + if walkSelf { + if err := dht.selfWalk(ctx); err != nil { + return fmt.Errorf("self walk: error: %s", err) + } + dht.latestSelfWalk = time.Now() + } + + if err := dht.bootstrapBuckets(ctx); err != nil { + return fmt.Errorf("bootstrap buckets: error bootstrapping: %s", err) + } return nil } @@ -166,11 +173,13 @@ func (dht *IpfsDHT) selfWalk(ctx context.Context) error { return err } -// synchronous bootstrap. -func (dht *IpfsDHT) bootstrapOnce(ctx context.Context) error { - if err := dht.selfWalk(ctx); err != nil { - return errors.Wrap(err, "failed bootstrap while searching for self") - } else { - return dht.bootstrapBuckets(ctx) +// Bootstrap tells the DHT to get into a bootstrapped state. +// +// Note: the context is ignored. +func (dht *IpfsDHT) Bootstrap(_ context.Context) error { + select { + case dht.triggerBootstrap <- struct{}{}: + default: } + return nil } diff --git a/dht_test.go b/dht_test.go index bba86c84e..3611ab4a7 100644 --- a/dht_test.go +++ b/dht_test.go @@ -113,6 +113,7 @@ func setupDHT(ctx context.Context, t *testing.T, client bool) *IpfsDHT { bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)), opts.Client(client), opts.NamespacedValidator("v", blankValidator{}), + opts.DisableAutoBootstrap(), ) if err != nil { t.Fatal(err) @@ -200,7 +201,7 @@ func bootstrap(t *testing.T, ctx context.Context, dhts []*IpfsDHT) { start := rand.Intn(len(dhts)) // randomize to decrease bias. for i := range dhts { dht := dhts[(start+i)%len(dhts)] - dht.bootstrapOnce(ctx) + dht.Bootstrap(ctx) } } @@ -690,7 +691,18 @@ func TestBootstrap(t *testing.T) { func TestBootstrapBelowMinRTThreshold(t *testing.T) { ctx := context.Background() - dhtA := setupDHT(ctx, t, false) + + // enable auto bootstrap on A + dhtA, err := New( + ctx, + bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)), + opts.Client(false), + opts.NamespacedValidator("v", blankValidator{}), + ) + if err != nil { + t.Fatal(err) + } + dhtB := setupDHT(ctx, t, false) dhtC := setupDHT(ctx, t, false) @@ -783,7 +795,7 @@ func TestPeriodicBootstrap(t *testing.T) { t.Logf("bootstrapping them so they find each other. %d", nDHTs) for _, dht := range dhts { - go dht.bootstrapOnce(ctx) + go dht.Bootstrap(ctx) } // this is async, and we dont know when it's finished with one cycle, so keep checking @@ -1416,6 +1428,7 @@ func TestGetSetPluggedProtocol(t *testing.T) { opts.Protocols("/esh/dht"), opts.Client(false), opts.NamespacedValidator("v", blankValidator{}), + opts.DisableAutoBootstrap(), } dhtA, err := New(ctx, bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)), os...) @@ -1454,6 +1467,7 @@ func TestGetSetPluggedProtocol(t *testing.T) { opts.Protocols("/esh/dht"), opts.Client(false), opts.NamespacedValidator("v", blankValidator{}), + opts.DisableAutoBootstrap(), }...) if err != nil { t.Fatal(err) @@ -1463,6 +1477,7 @@ func TestGetSetPluggedProtocol(t *testing.T) { opts.Protocols("/lsr/dht"), opts.Client(false), opts.NamespacedValidator("v", blankValidator{}), + opts.DisableAutoBootstrap(), }...) if err != nil { t.Fatal(err) diff --git a/ext_test.go b/ext_test.go index cb0ac7634..fb920506f 100644 --- a/ext_test.go +++ b/ext_test.go @@ -9,6 +9,7 @@ import ( "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/routing" + opts "github.com/libp2p/go-libp2p-kad-dht/opts" ggio "github.com/gogo/protobuf/io" u "github.com/ipfs/go-ipfs-util" @@ -29,7 +30,8 @@ func TestGetFailures(t *testing.T) { } hosts := mn.Hosts() - d, err := New(ctx, hosts[0]) + os := []opts.Option{opts.DisableAutoBootstrap()} + d, err := New(ctx, hosts[0], os...) if err != nil { t.Fatal(err) } @@ -149,7 +151,9 @@ func TestNotFound(t *testing.T) { t.Fatal(err) } hosts := mn.Hosts() - d, err := New(ctx, hosts[0]) + + os := []opts.Option{opts.DisableAutoBootstrap()} + d, err := New(ctx, hosts[0], os...) if err != nil { t.Fatal(err) } @@ -228,7 +232,8 @@ func TestLessThanKResponses(t *testing.T) { } hosts := mn.Hosts() - d, err := New(ctx, hosts[0]) + os := []opts.Option{opts.DisableAutoBootstrap()} + d, err := New(ctx, hosts[0], os...) if err != nil { t.Fatal(err) } @@ -297,7 +302,8 @@ func TestMultipleQueries(t *testing.T) { t.Fatal(err) } hosts := mn.Hosts() - d, err := New(ctx, hosts[0]) + os := []opts.Option{opts.DisableAutoBootstrap()} + d, err := New(ctx, hosts[0], os...) if err != nil { t.Fatal(err) } diff --git a/notif.go b/notif.go index 556cadd5f..7c47215c3 100644 --- a/notif.go +++ b/notif.go @@ -34,7 +34,7 @@ func (nn *netNotifiee) Connected(n network.Network, v network.Conn) { if dht.host.Network().Connectedness(p) == network.Connected { bootstrap := dht.routingTable.Size() <= minRTBootstrapThreshold dht.Update(dht.Context(), p) - if bootstrap { + if bootstrap && dht.triggerAutoBootstrap { select { case dht.triggerBootstrap <- struct{}{}: default: @@ -80,7 +80,7 @@ func (nn *netNotifiee) testConnection(v network.Conn) { if dht.host.Network().Connectedness(p) == network.Connected { bootstrap := dht.routingTable.Size() <= minRTBootstrapThreshold dht.Update(dht.Context(), p) - if bootstrap { + if bootstrap && dht.triggerAutoBootstrap { select { case dht.triggerBootstrap <- struct{}{}: default: diff --git a/opts/options.go b/opts/options.go index 2fbb91e5a..39a29cced 100644 --- a/opts/options.go +++ b/opts/options.go @@ -21,20 +21,20 @@ var ( // BootstrapConfig specifies parameters used for bootstrapping the DHT. type BootstrapConfig struct { - BucketPeriod time.Duration // how long to wait for a k-bucket to be queried before doing a random walk on it - Timeout time.Duration // how long to wait for a bootstrap query to run - RoutingTableScanInterval time.Duration // how often to scan the RT for k-buckets that haven't been queried since the given period - SelfQueryInterval time.Duration // how often to query for self + BucketPeriod time.Duration // how long to wait for a k-bucket to be queried before doing a random walk on it + Timeout time.Duration // how long to wait for a bootstrap query to run + SelfQueryInterval time.Duration // how often to query for self } // Options is a structure containing all the options that can be used when constructing a DHT. type Options struct { - Datastore ds.Batching - Validator record.Validator - Client bool - Protocols []protocol.ID - BucketSize int - BootstrapConfig BootstrapConfig + Datastore ds.Batching + Validator record.Validator + Client bool + Protocols []protocol.ID + BucketSize int + BootstrapConfig BootstrapConfig + TriggerAutoBootstrap bool } // Apply applies the given options to this Option @@ -63,14 +63,13 @@ var Defaults = func(o *Options) error { // same as that mentioned in the kad dht paper BucketPeriod: 1 * time.Hour, - // since the default bucket period is 1 hour, a scan interval of 30 minutes sounds reasonable - RoutingTableScanInterval: 30 * time.Minute, - Timeout: 10 * time.Second, SelfQueryInterval: 1 * time.Hour, } + o.TriggerAutoBootstrap = true + return nil } @@ -149,3 +148,13 @@ func BucketSize(bucketSize int) Option { return nil } } + +// DisableAutoBootstrap completely disables 'auto-bootstrap' on the Dht +// This means that neither will we do periodic bootstrap nor will we +// bootstrap the Dht even if the Routing Table size goes below the minimum threshold +func DisableAutoBootstrap() Option { + return func(o *Options) error { + o.TriggerAutoBootstrap = false + return nil + } +}