From e2842f0317575187a3fb1ebab080f57413bbedf6 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Fri, 1 Nov 2019 00:38:28 -0700 Subject: [PATCH 1/6] feat(bootstrap): autobootstrap 1. Auto bootstrap on start. 2. Make `Bootstrap(ctx)` trigger a bootstrap but not _start_ the bootstrapping process. --- dht.go | 1 + dht_bootstrap.go | 74 ++++++++++++++++++++++++------------------------ 2 files changed, 38 insertions(+), 37 deletions(-) diff --git a/dht.go b/dht.go index 471ae4e7b..19096b4fa 100644 --- a/dht.go +++ b/dht.go @@ -109,6 +109,7 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er h.SetStreamHandler(p, dht.handleNewStream) } } + dht.startBootstrapping() return dht, nil } diff --git a/dht_bootstrap.go b/dht_bootstrap.go index 6e40c597a..ecc131d24 100644 --- a/dht_bootstrap.go +++ b/dht_bootstrap.go @@ -7,6 +7,8 @@ import ( "sync" "time" + process "github.com/jbenet/goprocess" + processctx "github.com/jbenet/goprocess/context" "github.com/libp2p/go-libp2p-core/routing" "github.com/multiformats/go-multiaddr" _ "github.com/multiformats/go-multiaddr-dns" @@ -41,52 +43,47 @@ func init() { } } -// BootstrapConfig runs cfg.Queries bootstrap queries every cfg.BucketPeriod. -func (dht *IpfsDHT) Bootstrap(ctx context.Context) error { - triggerBootstrapFnc := func() { - logger.Infof("triggerBootstrapFnc: RT only has %d peers which is less than the min threshold of %d, triggering self & bucket bootstrap", - dht.routingTable.Size(), minRTBootstrapThreshold) - - if err := dht.selfWalk(ctx); err != nil { - logger.Warningf("triggerBootstrapFnc: self walk: error: %s", err) - } +// Bootstrap i +func (dht *IpfsDHT) startBootstrapping() error { + // scan the RT table periodically & do a random walk on k-buckets that haven't been queried since the given bucket period + dht.proc.Go(func(proc process.Process) { + ctx := processctx.OnClosingContext(proc) + scanInterval := time.NewTicker(dht.bootstrapCfg.RoutingTableScanInterval) + defer scanInterval.Stop() - if err := dht.bootstrapBuckets(ctx); err != nil { - logger.Warningf("triggerBootstrapFnc: bootstrap buckets: error bootstrapping: %s", err) - } - } + var ( + lastSelfWalk time.Time + walkSelf = true + ) - // we should query for self periodically so we can discover closer peers - go func() { for { - err := dht.selfWalk(ctx) - if err != nil { - logger.Warningf("self walk: error: %s", err) - } - select { - case <-time.After(dht.bootstrapCfg.SelfQueryInterval): - case <-ctx.Done(): - return + if walkSelf { + walkSelf = false + err := dht.selfWalk(ctx) + if err != nil { + logger.Warningf("self walk: error: %s", err) + } else { + lastSelfWalk = time.Now() + } } - } - }() - // scan the RT table periodically & do a random walk on k-buckets that haven't been queried since the given bucket period - go func() { - for { err := dht.bootstrapBuckets(ctx) if err != nil { logger.Warningf("bootstrap buckets: error bootstrapping: %s", err) } + select { - case <-time.After(dht.bootstrapCfg.RoutingTableScanInterval): + case now := <-scanInterval.C: + // It doesn't make sense to query for self unless we're _also_ going to fill out the routing table. + walkSelf = now.After(lastSelfWalk.Add(dht.bootstrapCfg.SelfQueryInterval)) case <-dht.triggerBootstrap: - triggerBootstrapFnc() + walkSelf = true + logger.Infof("triggering a bootstrap: RT has %d peers", dht.routingTable.Size()) case <-ctx.Done(): return } } - }() + }) return nil } @@ -166,11 +163,14 @@ func (dht *IpfsDHT) selfWalk(ctx context.Context) error { return err } -// synchronous bootstrap. -func (dht *IpfsDHT) bootstrapOnce(ctx context.Context) error { - if err := dht.selfWalk(ctx); err != nil { - return errors.Wrap(err, "failed bootstrap while searching for self") - } else { - return dht.bootstrapBuckets(ctx) +// Bootstrap tells the DHT to get into a bootstrapped state. +// +// Note: the context is ignored. +func (dht *IpfsDHT) Bootstrap(_ context.Context) error { + // Returns an error just in case we want to do that in the future. + select { + case dht.triggerBootstrap <- struct{}{}: + default: } + return nil } From 7cce5bdd96b85ebf02a4b328316f7b71ac35ce96 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Sat, 2 Nov 2019 00:37:15 +0800 Subject: [PATCH 2/6] make bootstrap synchronous & get tests to work --- dht.go | 10 ++++--- dht_bootstrap.go | 69 +++++++++++++++++++++++++++++++++--------------- dht_test.go | 21 ++++++++++++--- ext_test.go | 14 +++++++--- notif.go | 8 +++--- opts/options.go | 33 ++++++++++++++--------- 6 files changed, 105 insertions(+), 50 deletions(-) diff --git a/dht.go b/dht.go index 19096b4fa..e18a90ba0 100644 --- a/dht.go +++ b/dht.go @@ -69,7 +69,8 @@ type IpfsDHT struct { bootstrapCfg opts.BootstrapConfig - triggerBootstrap chan struct{} + triggerAutoBootstrap bool + triggerBootstrap chan *bootstrapReq } // Assert that IPFS assumptions about interfaces aren't broken. These aren't a @@ -103,6 +104,7 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er dht.proc.AddChild(dht.providers.Process()) dht.Validator = cfg.Validator + dht.triggerAutoBootstrap = cfg.TriggerAutoBootstrap if !cfg.Client { for _, p := range cfg.Protocols { @@ -160,7 +162,7 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p routingTable: rt, protocols: protocols, bucketSize: bucketSize, - triggerBootstrap: make(chan struct{}), + triggerBootstrap: make(chan *bootstrapReq), } dht.ctx = dht.newContextWithLocalTags(ctx) @@ -172,10 +174,10 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p // come up with an alternative solution. // issue is being tracked at https://github.com/libp2p/go-libp2p-kad-dht/issues/387 /*func (dht *IpfsDHT) rtRecovery(proc goprocess.Process) { - writeResp := func(errorChan chan error, err error) { + writeResp := func(errorChan chan error, errChan error) { select { case <-proc.Closing(): - case errorChan <- err: + case errorChan <- errChan: } close(errorChan) } diff --git a/dht_bootstrap.go b/dht_bootstrap.go index ecc131d24..92d887aec 100644 --- a/dht_bootstrap.go +++ b/dht_bootstrap.go @@ -43,42 +43,50 @@ func init() { } } +type bootstrapReq struct { + errChan chan error +} + +func makeBootstrapReq() *bootstrapReq { + errChan := make(chan error, 1) + return &bootstrapReq{errChan} +} + // Bootstrap i func (dht *IpfsDHT) startBootstrapping() error { // scan the RT table periodically & do a random walk on k-buckets that haven't been queried since the given bucket period dht.proc.Go(func(proc process.Process) { ctx := processctx.OnClosingContext(proc) - scanInterval := time.NewTicker(dht.bootstrapCfg.RoutingTableScanInterval) + + scanInterval := time.NewTicker(dht.bootstrapCfg.BucketPeriod) defer scanInterval.Stop() var ( lastSelfWalk time.Time - walkSelf = true ) - for { - if walkSelf { - walkSelf = false - err := dht.selfWalk(ctx) - if err != nil { - logger.Warningf("self walk: error: %s", err) - } else { - lastSelfWalk = time.Now() - } - } - - err := dht.bootstrapBuckets(ctx) - if err != nil { - logger.Warningf("bootstrap buckets: error bootstrapping: %s", err) + // run bootstrap if option is set + if dht.triggerAutoBootstrap { + if err := dht.doBootstrap(ctx, true, &lastSelfWalk); err != nil { + logger.Warningf("bootstrap error: %s", err) } + } + for { select { case now := <-scanInterval.C: - // It doesn't make sense to query for self unless we're _also_ going to fill out the routing table. - walkSelf = now.After(lastSelfWalk.Add(dht.bootstrapCfg.SelfQueryInterval)) - case <-dht.triggerBootstrap: - walkSelf = true + walkSelf := now.After(lastSelfWalk.Add(dht.bootstrapCfg.SelfQueryInterval)) + if err := dht.doBootstrap(ctx, walkSelf, &lastSelfWalk); err != nil { + logger.Warning("bootstrap error: %s", err) + } + case req := <-dht.triggerBootstrap: logger.Infof("triggering a bootstrap: RT has %d peers", dht.routingTable.Size()) + err := dht.doBootstrap(ctx, true, &lastSelfWalk) + select { + case req.errChan <- err: + close(req.errChan) + default: + } case <-ctx.Done(): return } @@ -88,6 +96,22 @@ func (dht *IpfsDHT) startBootstrapping() error { return nil } +func (dht *IpfsDHT) doBootstrap(ctx context.Context, walkSelf bool, latestSelfWalk *time.Time) error { + if walkSelf { + if err := dht.selfWalk(ctx); err != nil { + return fmt.Errorf("self walk: error: %s", err) + } else { + *latestSelfWalk = time.Now() + } + } + + if err := dht.bootstrapBuckets(ctx); err != nil { + return fmt.Errorf("bootstrap buckets: error bootstrapping: %s", err) + } + + return nil +} + // bootstrapBuckets scans the routing table, and does a random walk on k-buckets that haven't been queried since the given bucket period func (dht *IpfsDHT) bootstrapBuckets(ctx context.Context) error { doQuery := func(bucketId int, target string, f func(context.Context) error) error { @@ -167,9 +191,10 @@ func (dht *IpfsDHT) selfWalk(ctx context.Context) error { // // Note: the context is ignored. func (dht *IpfsDHT) Bootstrap(_ context.Context) error { - // Returns an error just in case we want to do that in the future. + req := makeBootstrapReq() select { - case dht.triggerBootstrap <- struct{}{}: + case dht.triggerBootstrap <- req: + return <-req.errChan default: } return nil diff --git a/dht_test.go b/dht_test.go index bba86c84e..3611ab4a7 100644 --- a/dht_test.go +++ b/dht_test.go @@ -113,6 +113,7 @@ func setupDHT(ctx context.Context, t *testing.T, client bool) *IpfsDHT { bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)), opts.Client(client), opts.NamespacedValidator("v", blankValidator{}), + opts.DisableAutoBootstrap(), ) if err != nil { t.Fatal(err) @@ -200,7 +201,7 @@ func bootstrap(t *testing.T, ctx context.Context, dhts []*IpfsDHT) { start := rand.Intn(len(dhts)) // randomize to decrease bias. for i := range dhts { dht := dhts[(start+i)%len(dhts)] - dht.bootstrapOnce(ctx) + dht.Bootstrap(ctx) } } @@ -690,7 +691,18 @@ func TestBootstrap(t *testing.T) { func TestBootstrapBelowMinRTThreshold(t *testing.T) { ctx := context.Background() - dhtA := setupDHT(ctx, t, false) + + // enable auto bootstrap on A + dhtA, err := New( + ctx, + bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)), + opts.Client(false), + opts.NamespacedValidator("v", blankValidator{}), + ) + if err != nil { + t.Fatal(err) + } + dhtB := setupDHT(ctx, t, false) dhtC := setupDHT(ctx, t, false) @@ -783,7 +795,7 @@ func TestPeriodicBootstrap(t *testing.T) { t.Logf("bootstrapping them so they find each other. %d", nDHTs) for _, dht := range dhts { - go dht.bootstrapOnce(ctx) + go dht.Bootstrap(ctx) } // this is async, and we dont know when it's finished with one cycle, so keep checking @@ -1416,6 +1428,7 @@ func TestGetSetPluggedProtocol(t *testing.T) { opts.Protocols("/esh/dht"), opts.Client(false), opts.NamespacedValidator("v", blankValidator{}), + opts.DisableAutoBootstrap(), } dhtA, err := New(ctx, bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)), os...) @@ -1454,6 +1467,7 @@ func TestGetSetPluggedProtocol(t *testing.T) { opts.Protocols("/esh/dht"), opts.Client(false), opts.NamespacedValidator("v", blankValidator{}), + opts.DisableAutoBootstrap(), }...) if err != nil { t.Fatal(err) @@ -1463,6 +1477,7 @@ func TestGetSetPluggedProtocol(t *testing.T) { opts.Protocols("/lsr/dht"), opts.Client(false), opts.NamespacedValidator("v", blankValidator{}), + opts.DisableAutoBootstrap(), }...) if err != nil { t.Fatal(err) diff --git a/ext_test.go b/ext_test.go index cb0ac7634..fb920506f 100644 --- a/ext_test.go +++ b/ext_test.go @@ -9,6 +9,7 @@ import ( "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/routing" + opts "github.com/libp2p/go-libp2p-kad-dht/opts" ggio "github.com/gogo/protobuf/io" u "github.com/ipfs/go-ipfs-util" @@ -29,7 +30,8 @@ func TestGetFailures(t *testing.T) { } hosts := mn.Hosts() - d, err := New(ctx, hosts[0]) + os := []opts.Option{opts.DisableAutoBootstrap()} + d, err := New(ctx, hosts[0], os...) if err != nil { t.Fatal(err) } @@ -149,7 +151,9 @@ func TestNotFound(t *testing.T) { t.Fatal(err) } hosts := mn.Hosts() - d, err := New(ctx, hosts[0]) + + os := []opts.Option{opts.DisableAutoBootstrap()} + d, err := New(ctx, hosts[0], os...) if err != nil { t.Fatal(err) } @@ -228,7 +232,8 @@ func TestLessThanKResponses(t *testing.T) { } hosts := mn.Hosts() - d, err := New(ctx, hosts[0]) + os := []opts.Option{opts.DisableAutoBootstrap()} + d, err := New(ctx, hosts[0], os...) if err != nil { t.Fatal(err) } @@ -297,7 +302,8 @@ func TestMultipleQueries(t *testing.T) { t.Fatal(err) } hosts := mn.Hosts() - d, err := New(ctx, hosts[0]) + os := []opts.Option{opts.DisableAutoBootstrap()} + d, err := New(ctx, hosts[0], os...) if err != nil { t.Fatal(err) } diff --git a/notif.go b/notif.go index 556cadd5f..25fe7102a 100644 --- a/notif.go +++ b/notif.go @@ -34,9 +34,9 @@ func (nn *netNotifiee) Connected(n network.Network, v network.Conn) { if dht.host.Network().Connectedness(p) == network.Connected { bootstrap := dht.routingTable.Size() <= minRTBootstrapThreshold dht.Update(dht.Context(), p) - if bootstrap { + if bootstrap && dht.triggerAutoBootstrap { select { - case dht.triggerBootstrap <- struct{}{}: + case dht.triggerBootstrap <- makeBootstrapReq(): default: } } @@ -80,9 +80,9 @@ func (nn *netNotifiee) testConnection(v network.Conn) { if dht.host.Network().Connectedness(p) == network.Connected { bootstrap := dht.routingTable.Size() <= minRTBootstrapThreshold dht.Update(dht.Context(), p) - if bootstrap { + if bootstrap && dht.triggerAutoBootstrap { select { - case dht.triggerBootstrap <- struct{}{}: + case dht.triggerBootstrap <- makeBootstrapReq(): default: } } diff --git a/opts/options.go b/opts/options.go index 2fbb91e5a..fd3eb3c5a 100644 --- a/opts/options.go +++ b/opts/options.go @@ -21,20 +21,20 @@ var ( // BootstrapConfig specifies parameters used for bootstrapping the DHT. type BootstrapConfig struct { - BucketPeriod time.Duration // how long to wait for a k-bucket to be queried before doing a random walk on it - Timeout time.Duration // how long to wait for a bootstrap query to run - RoutingTableScanInterval time.Duration // how often to scan the RT for k-buckets that haven't been queried since the given period - SelfQueryInterval time.Duration // how often to query for self + BucketPeriod time.Duration // how long to wait for a k-bucket to be queried before doing a random walk on it + Timeout time.Duration // how long to wait for a bootstrap query to run + SelfQueryInterval time.Duration // how often to query for self } // Options is a structure containing all the options that can be used when constructing a DHT. type Options struct { - Datastore ds.Batching - Validator record.Validator - Client bool - Protocols []protocol.ID - BucketSize int - BootstrapConfig BootstrapConfig + Datastore ds.Batching + Validator record.Validator + Client bool + Protocols []protocol.ID + BucketSize int + BootstrapConfig BootstrapConfig + TriggerAutoBootstrap bool } // Apply applies the given options to this Option @@ -63,14 +63,13 @@ var Defaults = func(o *Options) error { // same as that mentioned in the kad dht paper BucketPeriod: 1 * time.Hour, - // since the default bucket period is 1 hour, a scan interval of 30 minutes sounds reasonable - RoutingTableScanInterval: 30 * time.Minute, - Timeout: 10 * time.Second, SelfQueryInterval: 1 * time.Hour, } + o.TriggerAutoBootstrap = true + return nil } @@ -149,3 +148,11 @@ func BucketSize(bucketSize int) Option { return nil } } + +// DisableAutoBootstrap disables auto bootstrap on the dht +func DisableAutoBootstrap() Option { + return func(o *Options) error { + o.TriggerAutoBootstrap = false + return nil + } +} From da6edafb21d2a6f6bcdc4fe9364e02e0c706eade Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Sat, 2 Nov 2019 11:37:44 +0800 Subject: [PATCH 3/6] make Bootstrap() async and latestSelfWalk a dht field --- dht.go | 3 ++- dht_bootstrap.go | 20 +++++++------------- 2 files changed, 9 insertions(+), 14 deletions(-) diff --git a/dht.go b/dht.go index e18a90ba0..de14842ba 100644 --- a/dht.go +++ b/dht.go @@ -71,6 +71,7 @@ type IpfsDHT struct { triggerAutoBootstrap bool triggerBootstrap chan *bootstrapReq + latestSelfWalk time.Time // the last time we looked-up our own peerID in the network } // Assert that IPFS assumptions about interfaces aren't broken. These aren't a @@ -174,7 +175,7 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p // come up with an alternative solution. // issue is being tracked at https://github.com/libp2p/go-libp2p-kad-dht/issues/387 /*func (dht *IpfsDHT) rtRecovery(proc goprocess.Process) { - writeResp := func(errorChan chan error, errChan error) { + writeResp := func(errorChan chan error, err error) { select { case <-proc.Closing(): case errorChan <- errChan: diff --git a/dht_bootstrap.go b/dht_bootstrap.go index 92d887aec..aaae79f23 100644 --- a/dht_bootstrap.go +++ b/dht_bootstrap.go @@ -61,13 +61,9 @@ func (dht *IpfsDHT) startBootstrapping() error { scanInterval := time.NewTicker(dht.bootstrapCfg.BucketPeriod) defer scanInterval.Stop() - var ( - lastSelfWalk time.Time - ) - // run bootstrap if option is set if dht.triggerAutoBootstrap { - if err := dht.doBootstrap(ctx, true, &lastSelfWalk); err != nil { + if err := dht.doBootstrap(ctx, true); err != nil { logger.Warningf("bootstrap error: %s", err) } } @@ -75,13 +71,13 @@ func (dht *IpfsDHT) startBootstrapping() error { for { select { case now := <-scanInterval.C: - walkSelf := now.After(lastSelfWalk.Add(dht.bootstrapCfg.SelfQueryInterval)) - if err := dht.doBootstrap(ctx, walkSelf, &lastSelfWalk); err != nil { + walkSelf := now.After(dht.latestSelfWalk.Add(dht.bootstrapCfg.SelfQueryInterval)) + if err := dht.doBootstrap(ctx, walkSelf); err != nil { logger.Warning("bootstrap error: %s", err) } case req := <-dht.triggerBootstrap: logger.Infof("triggering a bootstrap: RT has %d peers", dht.routingTable.Size()) - err := dht.doBootstrap(ctx, true, &lastSelfWalk) + err := dht.doBootstrap(ctx, true) select { case req.errChan <- err: close(req.errChan) @@ -96,12 +92,12 @@ func (dht *IpfsDHT) startBootstrapping() error { return nil } -func (dht *IpfsDHT) doBootstrap(ctx context.Context, walkSelf bool, latestSelfWalk *time.Time) error { +func (dht *IpfsDHT) doBootstrap(ctx context.Context, walkSelf bool) error { if walkSelf { if err := dht.selfWalk(ctx); err != nil { return fmt.Errorf("self walk: error: %s", err) } else { - *latestSelfWalk = time.Now() + dht.latestSelfWalk = time.Now() } } @@ -191,10 +187,8 @@ func (dht *IpfsDHT) selfWalk(ctx context.Context) error { // // Note: the context is ignored. func (dht *IpfsDHT) Bootstrap(_ context.Context) error { - req := makeBootstrapReq() select { - case dht.triggerBootstrap <- req: - return <-req.errChan + case dht.triggerBootstrap <- makeBootstrapReq(): default: } return nil From 632f3c5cb51cca73908ad84b39cc26e870d6e259 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Sun, 3 Nov 2019 19:57:50 +0800 Subject: [PATCH 4/6] better handling for disable bootstrap option --- dht_bootstrap.go | 6 ++++-- opts/options.go | 4 +++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/dht_bootstrap.go b/dht_bootstrap.go index aaae79f23..7bf467865 100644 --- a/dht_bootstrap.go +++ b/dht_bootstrap.go @@ -66,6 +66,9 @@ func (dht *IpfsDHT) startBootstrapping() error { if err := dht.doBootstrap(ctx, true); err != nil { logger.Warningf("bootstrap error: %s", err) } + } else { + // disable the "auto-bootstrap" ticker so that no more ticks are sent to his channel + scanInterval.Stop() } for { @@ -96,9 +99,8 @@ func (dht *IpfsDHT) doBootstrap(ctx context.Context, walkSelf bool) error { if walkSelf { if err := dht.selfWalk(ctx); err != nil { return fmt.Errorf("self walk: error: %s", err) - } else { - dht.latestSelfWalk = time.Now() } + dht.latestSelfWalk = time.Now() } if err := dht.bootstrapBuckets(ctx); err != nil { diff --git a/opts/options.go b/opts/options.go index fd3eb3c5a..39a29cced 100644 --- a/opts/options.go +++ b/opts/options.go @@ -149,7 +149,9 @@ func BucketSize(bucketSize int) Option { } } -// DisableAutoBootstrap disables auto bootstrap on the dht +// DisableAutoBootstrap completely disables 'auto-bootstrap' on the Dht +// This means that neither will we do periodic bootstrap nor will we +// bootstrap the Dht even if the Routing Table size goes below the minimum threshold func DisableAutoBootstrap() Option { return func(o *Options) error { o.TriggerAutoBootstrap = false From 98cf91494dd4a8c0e66a6b7ec8c88ccac1184682 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Tue, 5 Nov 2019 11:10:46 +0000 Subject: [PATCH 5/6] chore(bootstrap): remove unecessary request structure --- dht.go | 4 ++-- dht_bootstrap.go | 20 ++++---------------- notif.go | 4 ++-- 3 files changed, 8 insertions(+), 20 deletions(-) diff --git a/dht.go b/dht.go index de14842ba..ac833bd5d 100644 --- a/dht.go +++ b/dht.go @@ -70,7 +70,7 @@ type IpfsDHT struct { bootstrapCfg opts.BootstrapConfig triggerAutoBootstrap bool - triggerBootstrap chan *bootstrapReq + triggerBootstrap chan struct{} latestSelfWalk time.Time // the last time we looked-up our own peerID in the network } @@ -163,7 +163,7 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p routingTable: rt, protocols: protocols, bucketSize: bucketSize, - triggerBootstrap: make(chan *bootstrapReq), + triggerBootstrap: make(chan struct{}), } dht.ctx = dht.newContextWithLocalTags(ctx) diff --git a/dht_bootstrap.go b/dht_bootstrap.go index 7bf467865..193c49d80 100644 --- a/dht_bootstrap.go +++ b/dht_bootstrap.go @@ -43,15 +43,6 @@ func init() { } } -type bootstrapReq struct { - errChan chan error -} - -func makeBootstrapReq() *bootstrapReq { - errChan := make(chan error, 1) - return &bootstrapReq{errChan} -} - // Bootstrap i func (dht *IpfsDHT) startBootstrapping() error { // scan the RT table periodically & do a random walk on k-buckets that haven't been queried since the given bucket period @@ -78,13 +69,10 @@ func (dht *IpfsDHT) startBootstrapping() error { if err := dht.doBootstrap(ctx, walkSelf); err != nil { logger.Warning("bootstrap error: %s", err) } - case req := <-dht.triggerBootstrap: + case <-dht.triggerBootstrap: logger.Infof("triggering a bootstrap: RT has %d peers", dht.routingTable.Size()) - err := dht.doBootstrap(ctx, true) - select { - case req.errChan <- err: - close(req.errChan) - default: + if err := dht.doBootstrap(ctx, true); err != nil { + logger.Warning("bootstrap error: %s", err) } case <-ctx.Done(): return @@ -190,7 +178,7 @@ func (dht *IpfsDHT) selfWalk(ctx context.Context) error { // Note: the context is ignored. func (dht *IpfsDHT) Bootstrap(_ context.Context) error { select { - case dht.triggerBootstrap <- makeBootstrapReq(): + case dht.triggerBootstrap <- struct{}{}: default: } return nil diff --git a/notif.go b/notif.go index 25fe7102a..7c47215c3 100644 --- a/notif.go +++ b/notif.go @@ -36,7 +36,7 @@ func (nn *netNotifiee) Connected(n network.Network, v network.Conn) { dht.Update(dht.Context(), p) if bootstrap && dht.triggerAutoBootstrap { select { - case dht.triggerBootstrap <- makeBootstrapReq(): + case dht.triggerBootstrap <- struct{}{}: default: } } @@ -82,7 +82,7 @@ func (nn *netNotifiee) testConnection(v network.Conn) { dht.Update(dht.Context(), p) if bootstrap && dht.triggerAutoBootstrap { select { - case dht.triggerBootstrap <- makeBootstrapReq(): + case dht.triggerBootstrap <- struct{}{}: default: } } From 2fdad28d5c492c1f119b99170bf9109c3574c838 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Tue, 5 Nov 2019 11:13:44 +0000 Subject: [PATCH 6/6] chore(doc): fix comments --- dht_bootstrap.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dht_bootstrap.go b/dht_bootstrap.go index 193c49d80..914e01cfe 100644 --- a/dht_bootstrap.go +++ b/dht_bootstrap.go @@ -43,7 +43,7 @@ func init() { } } -// Bootstrap i +// Start the bootstrap worker. func (dht *IpfsDHT) startBootstrapping() error { // scan the RT table periodically & do a random walk on k-buckets that haven't been queried since the given bucket period dht.proc.Go(func(proc process.Process) { @@ -58,7 +58,7 @@ func (dht *IpfsDHT) startBootstrapping() error { logger.Warningf("bootstrap error: %s", err) } } else { - // disable the "auto-bootstrap" ticker so that no more ticks are sent to his channel + // disable the "auto-bootstrap" ticker so that no more ticks are sent to this channel scanInterval.Stop() }