Skip to content

Commit

Permalink
routing/dht: periodic bootstrapping #572
Browse files Browse the repository at this point in the history
  • Loading branch information
jbenet committed Jan 16, 2015
1 parent 647da1b commit dcdd63e
Show file tree
Hide file tree
Showing 4 changed files with 328 additions and 102 deletions.
2 changes: 1 addition & 1 deletion core/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func bootstrap(ctx context.Context,

// we can try running dht bootstrap even if we're connected to all bootstrap peers.
if len(h.Network().Conns()) > 0 {
if err := r.Bootstrap(ctx, numDHTBootstrapQueries); err != nil {
if _, err := r.Bootstrap(); err != nil {
// log this as Info. later on, discern better between errors.
log.Infof("dht bootstrap err: %s", err)
return nil
Expand Down
63 changes: 0 additions & 63 deletions routing/dht/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,66 +353,3 @@ func (dht *IpfsDHT) PingRoutine(t time.Duration) {
}
}
}

// Bootstrap builds up list of peers by requesting random peer IDs
func (dht *IpfsDHT) Bootstrap(ctx context.Context, queries int) error {
var merr u.MultiErr

randomID := func() peer.ID {
// 16 random bytes is not a valid peer id. it may be fine becuase
// the dht will rehash to its own keyspace anyway.
id := make([]byte, 16)
rand.Read(id)
return peer.ID(id)
}

// bootstrap sequentially, as results will compound
runQuery := func(ctx context.Context, id peer.ID) {
p, err := dht.FindPeer(ctx, id)
if err == routing.ErrNotFound {
// this isn't an error. this is precisely what we expect.
} else if err != nil {
merr = append(merr, err)
} else {
// woah, actually found a peer with that ID? this shouldn't happen normally
// (as the ID we use is not a real ID). this is an odd error worth logging.
err := fmt.Errorf("Bootstrap peer error: Actually FOUND peer. (%s, %s)", id, p)
log.Errorf("%s", err)
merr = append(merr, err)
}
}

sequential := true
if sequential {
// these should be parallel normally. but can make them sequential for debugging.
// note that the core/bootstrap context deadline should be extended too for that.
for i := 0; i < queries; i++ {
id := randomID()
log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, queries, id)
runQuery(ctx, id)
}

} else {
// note on parallelism here: the context is passed in to the queries, so they
// **should** exit when it exceeds, making this function exit on ctx cancel.
// normally, we should be selecting on ctx.Done() here too, but this gets
// complicated to do with WaitGroup, and doesnt wait for the children to exit.
var wg sync.WaitGroup
for i := 0; i < queries; i++ {
wg.Add(1)
go func() {
defer wg.Done()

id := randomID()
log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, queries, id)
runQuery(ctx, id)
}()
}
wg.Wait()
}

if len(merr) > 0 {
return merr
}
return nil
}
181 changes: 181 additions & 0 deletions routing/dht/dht_bootstrap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
// Package dht implements a distributed hash table that satisfies the ipfs routing
// interface. This DHT is modeled after kademlia with Coral and S/Kademlia modifications.
package dht

import (
"crypto/rand"
"fmt"
"sync"
"time"

peer "github.com/jbenet/go-ipfs/p2p/peer"
routing "github.com/jbenet/go-ipfs/routing"
u "github.com/jbenet/go-ipfs/util"

context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
goprocess "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
)

// DefaultBootstrapQueries specifies how many queries to run,
// if the user does not specify a different number as an option.
//
// For now, this is set to 16 queries, which is an aggressive number.
// We are currently more interested in ensuring we have a properly formed
// DHT than making sure our dht minimizes traffic. Once we are more certain
// of our implementation's robustness, we should lower this down to 8 or 4.
//
// Note there is also a tradeoff between the bootstrap period and the number
// of queries. We could support a higher period with a smaller number of
// queries
const DefaultBootstrapQueries = 16

// DefaultBootstrapPeriod specifies how often to periodically run bootstrap,
// if the user does not specify a different number as an option.
//
// For now, this is set to 10 seconds, which is an aggressive period. We are
// We are currently more interested in ensuring we have a properly formed
// DHT than making sure our dht minimizes traffic. Once we are more certain
// implementation's robustness, we should lower this down to 30s or 1m.
//
// Note there is also a tradeoff between the bootstrap period and the number
// of queries. We could support a higher period with a smaller number of
// queries
const DefaultBootstrapPeriod = time.Duration(10 * time.Second)

// Bootstrap runs bootstrapping once, then calls SignalBootstrap with default
// parameters: DefaultBootstrapQueries and DefaultBootstrapPeriod. This allows
// the user to catch an error off the bat if the connections are faulty. It also
// allows BootstrapOnSignal not to run bootstrap at the beginning, which is useful
// for instrumenting it on tests, or delaying bootstrap until the network is online
// and connected to at least a few nodes.
//
// Like PeriodicBootstrap, Bootstrap returns a process, so the user can stop it.
func (dht *IpfsDHT) Bootstrap() (goprocess.Process, error) {

if err := dht.runBootstrap(dht.Context(), DefaultBootstrapQueries); err != nil {
return nil, err
}

sig := time.Tick(DefaultBootstrapPeriod)
return dht.BootstrapOnSignal(DefaultBootstrapQueries, sig)
}

// SignalBootstrap ensures the dht routing table remains healthy as peers come and go.
// it builds up a list of peers by requesting random peer IDs. The Bootstrap
// process will run a number of queries each time, and run every time signal fires.
// These parameters are configurable.
//
// SignalBootstrap returns a process, so the user can stop it.
func (dht *IpfsDHT) BootstrapOnSignal(queries int, signal <-chan time.Time) (goprocess.Process, error) {
if queries <= 0 {
return nil, fmt.Errorf("invalid number of queries: %d", queries)
}

if signal == nil {
return nil, fmt.Errorf("invalid signal: %v", signal)
}

proc := goprocess.Go(func(worker goprocess.Process) {
for {
select {
case <-worker.Closing():
log.Debug("dht bootstrapper shutting down")
return

case <-signal:
// it would be useful to be able to send out signals of when we bootstrap, too...
// maybe this is a good case for whole module event pub/sub?

ctx := dht.Context()
if err := dht.runBootstrap(ctx, queries); err != nil {
log.Error(err)
// A bootstrapping error is important to notice but not fatal.
// maybe the client should be able to consume these errors,
// though I dont have a clear use case in mind-- what **could**
// the client do if one of the bootstrap calls fails?
//
// This is also related to the core's bootstrap failures.
// superviseConnections should perhaps allow clients to detect
// bootstrapping problems.
//
// Anyway, passing errors could be done with a bootstrapper object.
// this would imply the client should be able to consume a lot of
// other non-fatal dht errors too. providing this functionality
// should be done correctly DHT-wide.
// NB: whatever the design, clients must ensure they drain errors!
// This pattern is common to many things, perhaps long-running services
// should have something like an ErrStream that allows clients to consume
// periodic errors and take action. It should allow the user to also
// ignore all errors with something like an ErrStreamDiscard. We should
// study what other systems do for ideas.
}
}
}
})

return proc, nil
}

// runBootstrap builds up list of peers by requesting random peer IDs
func (dht *IpfsDHT) runBootstrap(ctx context.Context, queries int) error {

var merr u.MultiErr

randomID := func() peer.ID {
// 16 random bytes is not a valid peer id. it may be fine becuase
// the dht will rehash to its own keyspace anyway.
id := make([]byte, 16)
rand.Read(id)
return peer.ID(id)
}

// bootstrap sequentially, as results will compound
runQuery := func(ctx context.Context, id peer.ID) {
p, err := dht.FindPeer(ctx, id)
if err == routing.ErrNotFound {
// this isn't an error. this is precisely what we expect.
} else if err != nil {
merr = append(merr, err)
} else {
// woah, actually found a peer with that ID? this shouldn't happen normally
// (as the ID we use is not a real ID). this is an odd error worth logging.
err := fmt.Errorf("Bootstrap peer error: Actually FOUND peer. (%s, %s)", id, p)
log.Errorf("%s", err)
merr = append(merr, err)
}
}

sequential := true
if sequential {
// these should be parallel normally. but can make them sequential for debugging.
// note that the core/bootstrap context deadline should be extended too for that.
for i := 0; i < queries; i++ {
id := randomID()
log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, queries, id)
runQuery(ctx, id)
}

} else {
// note on parallelism here: the context is passed in to the queries, so they
// **should** exit when it exceeds, making this function exit on ctx cancel.
// normally, we should be selecting on ctx.Done() here too, but this gets
// complicated to do with WaitGroup, and doesnt wait for the children to exit.
var wg sync.WaitGroup
for i := 0; i < queries; i++ {
wg.Add(1)
go func() {
defer wg.Done()

id := randomID()
log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, queries, id)
runQuery(ctx, id)
}()
}
wg.Wait()
}

if len(merr) > 0 {
return merr
}
return nil
}
Loading

0 comments on commit dcdd63e

Please sign in to comment.