Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bootstrap): take autobootstrap to completion #403

Merged
merged 6 commits into from
Nov 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ type IpfsDHT struct {

bootstrapCfg opts.BootstrapConfig

triggerBootstrap chan struct{}
triggerAutoBootstrap bool
triggerBootstrap chan struct{}
latestSelfWalk time.Time // the last time we looked-up our own peerID in the network
}

// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
Expand Down Expand Up @@ -103,12 +105,14 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er

dht.proc.AddChild(dht.providers.Process())
dht.Validator = cfg.Validator
dht.triggerAutoBootstrap = cfg.TriggerAutoBootstrap

if !cfg.Client {
for _, p := range cfg.Protocols {
h.SetStreamHandler(p, dht.handleNewStream)
}
}
dht.startBootstrapping()
return dht, nil
}

Expand Down Expand Up @@ -174,7 +178,7 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p
writeResp := func(errorChan chan error, err error) {
select {
case <-proc.Closing():
case errorChan <- err:
case errorChan <- errChan:
}
close(errorChan)
}
Expand Down
89 changes: 49 additions & 40 deletions dht_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"sync"
"time"

process "github.com/jbenet/goprocess"
processctx "github.com/jbenet/goprocess/context"
"github.com/libp2p/go-libp2p-core/routing"
"github.com/multiformats/go-multiaddr"
_ "github.com/multiformats/go-multiaddr-dns"
Expand Down Expand Up @@ -41,52 +43,57 @@ func init() {
}
}

// BootstrapConfig runs cfg.Queries bootstrap queries every cfg.BucketPeriod.
func (dht *IpfsDHT) Bootstrap(ctx context.Context) error {
triggerBootstrapFnc := func() {
logger.Infof("triggerBootstrapFnc: RT only has %d peers which is less than the min threshold of %d, triggering self & bucket bootstrap",
dht.routingTable.Size(), minRTBootstrapThreshold)

if err := dht.selfWalk(ctx); err != nil {
logger.Warningf("triggerBootstrapFnc: self walk: error: %s", err)
}
// Start the bootstrap worker.
func (dht *IpfsDHT) startBootstrapping() error {
// scan the RT table periodically & do a random walk on k-buckets that haven't been queried since the given bucket period
dht.proc.Go(func(proc process.Process) {
ctx := processctx.OnClosingContext(proc)

if err := dht.bootstrapBuckets(ctx); err != nil {
logger.Warningf("triggerBootstrapFnc: bootstrap buckets: error bootstrapping: %s", err)
}
}
scanInterval := time.NewTicker(dht.bootstrapCfg.BucketPeriod)
defer scanInterval.Stop()

// we should query for self periodically so we can discover closer peers
go func() {
for {
err := dht.selfWalk(ctx)
if err != nil {
logger.Warningf("self walk: error: %s", err)
}
select {
case <-time.After(dht.bootstrapCfg.SelfQueryInterval):
case <-ctx.Done():
return
// run bootstrap if option is set
if dht.triggerAutoBootstrap {
if err := dht.doBootstrap(ctx, true); err != nil {
logger.Warningf("bootstrap error: %s", err)
}
} else {
// disable the "auto-bootstrap" ticker so that no more ticks are sent to this channel
scanInterval.Stop()
}
}()

// scan the RT table periodically & do a random walk on k-buckets that haven't been queried since the given bucket period
go func() {
for {
err := dht.bootstrapBuckets(ctx)
if err != nil {
logger.Warningf("bootstrap buckets: error bootstrapping: %s", err)
}
select {
case <-time.After(dht.bootstrapCfg.RoutingTableScanInterval):
case now := <-scanInterval.C:
walkSelf := now.After(dht.latestSelfWalk.Add(dht.bootstrapCfg.SelfQueryInterval))
if err := dht.doBootstrap(ctx, walkSelf); err != nil {
logger.Warning("bootstrap error: %s", err)
}
case <-dht.triggerBootstrap:
triggerBootstrapFnc()
logger.Infof("triggering a bootstrap: RT has %d peers", dht.routingTable.Size())
if err := dht.doBootstrap(ctx, true); err != nil {
logger.Warning("bootstrap error: %s", err)
}
case <-ctx.Done():
return
}
}
}()
})

return nil
}

func (dht *IpfsDHT) doBootstrap(ctx context.Context, walkSelf bool) error {
if walkSelf {
if err := dht.selfWalk(ctx); err != nil {
return fmt.Errorf("self walk: error: %s", err)
}
dht.latestSelfWalk = time.Now()
}

if err := dht.bootstrapBuckets(ctx); err != nil {
return fmt.Errorf("bootstrap buckets: error bootstrapping: %s", err)
}

return nil
}
Expand Down Expand Up @@ -166,11 +173,13 @@ func (dht *IpfsDHT) selfWalk(ctx context.Context) error {
return err
}

// synchronous bootstrap.
func (dht *IpfsDHT) bootstrapOnce(ctx context.Context) error {
if err := dht.selfWalk(ctx); err != nil {
return errors.Wrap(err, "failed bootstrap while searching for self")
} else {
return dht.bootstrapBuckets(ctx)
// Bootstrap tells the DHT to get into a bootstrapped state.
//
// Note: the context is ignored.
func (dht *IpfsDHT) Bootstrap(_ context.Context) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bootstrap is actually supposed to be async (at the moment). We can change this, but we'll need to propagate the change up the interfaces.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed below, this will now be async.

select {
case dht.triggerBootstrap <- struct{}{}:
default:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is going to behave inconsistently. If we're currently bootstrapping, we'll return immediately.

If we're going to block, a new bootstrap call should probably try to "join" an existing one. However, that will add some complexity.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hsanjuan what is cluster's specific need:

  1. Do you need a way to tell the DHT to bootstrap?
  2. Do you need to be able to wait for the DHT too finish bootstrapping?

Note: queries should still work even if the DHT isn't bootstrapped, as long as you have some peers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need a peer to get start getting "well-connected" after the first contact to other peer(s) [before that the peer would have had no connections). I don't need to wait until the end of the bootstrap round (it can just happen in background)

You may say "well, call Bootstrap()" then and not before, but things are a bit more tricky (the dht may have already been bootstrapped externally and what not). Being able to trigger a bootstrap round on demand (independent from the recurring configured one) is handy here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need a peer to get start getting "well-connected" after the first contact to other peer(s) [before that the peer would have had no connections).

This should now be automatic.

Being able to trigger a bootstrap round on demand (independent from the recurring configured one) is handy here.

SGTM. I agree bootstrap should actually trigger a bootstrap.


@aarshkshah1992 given this, I wouldn't wait. I'd just trigger the bootstrap and move on.

}
return nil
}
21 changes: 18 additions & 3 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func setupDHT(ctx context.Context, t *testing.T, client bool) *IpfsDHT {
bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)),
opts.Client(client),
opts.NamespacedValidator("v", blankValidator{}),
opts.DisableAutoBootstrap(),
)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -200,7 +201,7 @@ func bootstrap(t *testing.T, ctx context.Context, dhts []*IpfsDHT) {
start := rand.Intn(len(dhts)) // randomize to decrease bias.
for i := range dhts {
dht := dhts[(start+i)%len(dhts)]
dht.bootstrapOnce(ctx)
dht.Bootstrap(ctx)
}
}

Expand Down Expand Up @@ -690,7 +691,18 @@ func TestBootstrap(t *testing.T) {

func TestBootstrapBelowMinRTThreshold(t *testing.T) {
ctx := context.Background()
dhtA := setupDHT(ctx, t, false)

// enable auto bootstrap on A
dhtA, err := New(
ctx,
bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)),
opts.Client(false),
opts.NamespacedValidator("v", blankValidator{}),
)
if err != nil {
t.Fatal(err)
}

dhtB := setupDHT(ctx, t, false)
dhtC := setupDHT(ctx, t, false)

Expand Down Expand Up @@ -783,7 +795,7 @@ func TestPeriodicBootstrap(t *testing.T) {

t.Logf("bootstrapping them so they find each other. %d", nDHTs)
for _, dht := range dhts {
go dht.bootstrapOnce(ctx)
go dht.Bootstrap(ctx)
}

// this is async, and we dont know when it's finished with one cycle, so keep checking
Expand Down Expand Up @@ -1416,6 +1428,7 @@ func TestGetSetPluggedProtocol(t *testing.T) {
opts.Protocols("/esh/dht"),
opts.Client(false),
opts.NamespacedValidator("v", blankValidator{}),
opts.DisableAutoBootstrap(),
}

dhtA, err := New(ctx, bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)), os...)
Expand Down Expand Up @@ -1454,6 +1467,7 @@ func TestGetSetPluggedProtocol(t *testing.T) {
opts.Protocols("/esh/dht"),
opts.Client(false),
opts.NamespacedValidator("v", blankValidator{}),
opts.DisableAutoBootstrap(),
}...)
if err != nil {
t.Fatal(err)
Expand All @@ -1463,6 +1477,7 @@ func TestGetSetPluggedProtocol(t *testing.T) {
opts.Protocols("/lsr/dht"),
opts.Client(false),
opts.NamespacedValidator("v", blankValidator{}),
opts.DisableAutoBootstrap(),
}...)
if err != nil {
t.Fatal(err)
Expand Down
14 changes: 10 additions & 4 deletions ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/routing"
opts "github.com/libp2p/go-libp2p-kad-dht/opts"

ggio "github.com/gogo/protobuf/io"
u "github.com/ipfs/go-ipfs-util"
Expand All @@ -29,7 +30,8 @@ func TestGetFailures(t *testing.T) {
}
hosts := mn.Hosts()

d, err := New(ctx, hosts[0])
os := []opts.Option{opts.DisableAutoBootstrap()}
d, err := New(ctx, hosts[0], os...)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -149,7 +151,9 @@ func TestNotFound(t *testing.T) {
t.Fatal(err)
}
hosts := mn.Hosts()
d, err := New(ctx, hosts[0])

os := []opts.Option{opts.DisableAutoBootstrap()}
d, err := New(ctx, hosts[0], os...)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -228,7 +232,8 @@ func TestLessThanKResponses(t *testing.T) {
}
hosts := mn.Hosts()

d, err := New(ctx, hosts[0])
os := []opts.Option{opts.DisableAutoBootstrap()}
d, err := New(ctx, hosts[0], os...)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -297,7 +302,8 @@ func TestMultipleQueries(t *testing.T) {
t.Fatal(err)
}
hosts := mn.Hosts()
d, err := New(ctx, hosts[0])
os := []opts.Option{opts.DisableAutoBootstrap()}
d, err := New(ctx, hosts[0], os...)
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions notif.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (nn *netNotifiee) Connected(n network.Network, v network.Conn) {
if dht.host.Network().Connectedness(p) == network.Connected {
bootstrap := dht.routingTable.Size() <= minRTBootstrapThreshold
dht.Update(dht.Context(), p)
if bootstrap {
if bootstrap && dht.triggerAutoBootstrap {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some tests rely on a specific number of peers in the RT/ specific number of results in a query after a peer has finished connecting to other peers. However, this code causes the peer to connect to more peers/adds more peers to the RT than it intended to in the test & that causes the test to fail. So, we need to disable this feature for tests.

select {
case dht.triggerBootstrap <- struct{}{}:
default:
Expand Down Expand Up @@ -80,7 +80,7 @@ func (nn *netNotifiee) testConnection(v network.Conn) {
if dht.host.Network().Connectedness(p) == network.Connected {
bootstrap := dht.routingTable.Size() <= minRTBootstrapThreshold
dht.Update(dht.Context(), p)
if bootstrap {
if bootstrap && dht.triggerAutoBootstrap {
select {
case dht.triggerBootstrap <- struct{}{}:
default:
Expand Down
35 changes: 22 additions & 13 deletions opts/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,20 @@ var (

// BootstrapConfig specifies parameters used for bootstrapping the DHT.
type BootstrapConfig struct {
BucketPeriod time.Duration // how long to wait for a k-bucket to be queried before doing a random walk on it
Timeout time.Duration // how long to wait for a bootstrap query to run
RoutingTableScanInterval time.Duration // how often to scan the RT for k-buckets that haven't been queried since the given period
SelfQueryInterval time.Duration // how often to query for self
BucketPeriod time.Duration // how long to wait for a k-bucket to be queried before doing a random walk on it
Timeout time.Duration // how long to wait for a bootstrap query to run
SelfQueryInterval time.Duration // how often to query for self
}

// Options is a structure containing all the options that can be used when constructing a DHT.
type Options struct {
Datastore ds.Batching
Validator record.Validator
Client bool
Protocols []protocol.ID
BucketSize int
BootstrapConfig BootstrapConfig
Datastore ds.Batching
Validator record.Validator
Client bool
Protocols []protocol.ID
BucketSize int
BootstrapConfig BootstrapConfig
TriggerAutoBootstrap bool
}

// Apply applies the given options to this Option
Expand Down Expand Up @@ -63,14 +63,13 @@ var Defaults = func(o *Options) error {
// same as that mentioned in the kad dht paper
BucketPeriod: 1 * time.Hour,

// since the default bucket period is 1 hour, a scan interval of 30 minutes sounds reasonable
RoutingTableScanInterval: 30 * time.Minute,

Timeout: 10 * time.Second,

SelfQueryInterval: 1 * time.Hour,
}

o.TriggerAutoBootstrap = true

return nil
}

Expand Down Expand Up @@ -149,3 +148,13 @@ func BucketSize(bucketSize int) Option {
return nil
}
}

// DisableAutoBootstrap completely disables 'auto-bootstrap' on the Dht
// This means that neither will we do periodic bootstrap nor will we
// bootstrap the Dht even if the Routing Table size goes below the minimum threshold
func DisableAutoBootstrap() Option {
return func(o *Options) error {
o.TriggerAutoBootstrap = false
return nil
}
}