Skip to content

Commit

Permalink
core: cleaned up bootstrap process
Browse files Browse the repository at this point in the history
  • Loading branch information
jbenet committed Jan 23, 2015
1 parent dd9c1b6 commit 95d58b2
Show file tree
Hide file tree
Showing 5 changed files with 211 additions and 221 deletions.
184 changes: 93 additions & 91 deletions core/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package core
import (
"errors"
"fmt"
"io"
"io/ioutil"
"math/rand"
"sync"
"time"
Expand All @@ -18,153 +20,141 @@ import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
goprocess "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
procctx "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context"
periodicproc "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic"
)

// ErrNotEnoughBootstrapPeers signals that we do not have enough bootstrap
// peers to bootstrap correctly.
var ErrNotEnoughBootstrapPeers = errors.New("not enough bootstrap peers to bootstrap")

const (
// BootstrapPeriod governs the periodic interval at which the node will
// attempt to bootstrap. The bootstrap process is not very expensive, so
// this threshold can afford to be small (<=30s).
BootstrapPeriod = 30 * time.Second
// BootstrapConfig specifies parameters used in an IpfsNode's network
// bootstrapping process.
type BootstrapConfig struct {

// BootstrapPeerThreshold governs the node Bootstrap process. If the node
// has less open connections than this number, it will open connections
// MinPeerThreshold governs whether to bootstrap more connections. If the
// node has less open connections than this number, it will open connections
// to the bootstrap nodes. From there, the routing system should be able
// to use the connections to the bootstrap nodes to connect to even more
// peers. Routing systems like the IpfsDHT do so in their own Bootstrap
// process, which issues random queries to find more peers.
BootstrapPeerThreshold = 4
MinPeerThreshold int

// Period governs the periodic interval at which the node will
// attempt to bootstrap. The bootstrap process is not very expensive, so
// this threshold can afford to be small (<=30s).
Period time.Duration

// BootstrapConnectionTimeout determines how long to wait for a bootstrap
// ConnectionTimeout determines how long to wait for a bootstrap
// connection attempt before cancelling it.
BootstrapConnectionTimeout time.Duration = BootstrapPeriod / 3
)
ConnectionTimeout time.Duration

// BootstrapPeers is a function that returns a set of bootstrap peers
// for the bootstrap process to use. This makes it possible for clients
// to control the peers the process uses at any moment.
BootstrapPeers func() []peer.PeerInfo
}

// nodeBootstrapper is a small object used to bootstrap an IpfsNode.
type nodeBootstrapper struct {
node *IpfsNode
// DefaultBootstrapConfig specifies default sane parameters for bootstrapping.
var DefaultBootstrapConfig = BootstrapConfig{
MinPeerThreshold: 4,
Period: 30 * time.Second,
ConnectionTimeout: (30 * time.Second) / 3, // Perod / 3
}

// TryToBootstrap starts IpfsNode bootstrapping. This function will run an
// initial bootstrapping phase before exiting: connect to several bootstrap
// nodes. This allows callers to call this function synchronously to:
// - check if an error occurrs (bootstrapping unsuccessful)
// - wait before starting services which require the node to be bootstrapped
//
// If bootstrapping initially fails, Bootstrap() will try again for a total of
// three times, before giving up completely. Note that in environments where a
// node may be initialized offline, as normal operation, BootstrapForever()
// should be used instead.
//
// Note: this function could be much cleaner if we were to relax the constraint
// that we want to exit **after** we have performed initial bootstrapping (and are
// thus connected to nodes). The constraint may not be that useful in practice.
// Consider cases when we initialize the node while disconnected from the internet.
// We don't want this launch to fail... want to continue launching the node, hoping
// that bootstrapping will work in the future if we get connected.
func (nb *nodeBootstrapper) TryToBootstrap(ctx context.Context, peers []peer.PeerInfo) error {
n := nb.node
func BootstrapConfigWithPeers(pis []peer.PeerInfo) BootstrapConfig {
cfg := DefaultBootstrapConfig
cfg.BootstrapPeers = func() []peer.PeerInfo {
return pis
}
return cfg
}

// Bootstrap kicks off IpfsNode bootstrapping. This function will periodically
// check the number of open connections and -- if there are too few -- initiate
// connections to well-known bootstrap peers. It also kicks off subsystem
// bootstrapping (i.e. routing).
func Bootstrap(n *IpfsNode, cfg BootstrapConfig) (io.Closer, error) {

// TODO what bootstrapping should happen if there is no DHT? i.e. we could
// continue connecting to our bootstrap peers, but for what purpose? for now
// simply exit without connecting to any of them. When we introduce another
// routing system that uses bootstrap peers we can change this.
dht, ok := n.Routing.(*dht.IpfsDHT)
thedht, ok := n.Routing.(*dht.IpfsDHT)
if !ok {
return nil
return ioutil.NopCloser(nil), nil
}

for i := 0; i < 3; i++ {
if err := bootstrapRound(ctx, n.PeerHost, dht, n.Peerstore, peers); err != nil {
return err
// the periodic bootstrap function -- the connection supervisor
periodic := func(worker goprocess.Process) {
ctx := procctx.WithProcessClosing(context.Background(), worker)
defer log.EventBegin(ctx, "periodicBootstrap", n.Identity).Done()

if err := bootstrapRound(ctx, n.PeerHost, thedht, n.Peerstore, cfg); err != nil {
log.Event(ctx, "bootstrapError", n.Identity, lgbl.Error(err))
log.Errorf("%s bootstrap error: %s", n.Identity, err)
}
}

// at this point we have done at least one round of initial bootstrap.
// we're ready to kick off dht bootstrapping.
dbproc, err := dht.Bootstrap(ctx)
// kick off the node's periodic bootstrapping
proc := periodicproc.Tick(cfg.Period, periodic)
proc.Go(periodic) // run one right now.

// kick off dht bootstrapping.
dbproc, err := thedht.Bootstrap(dht.DefaultBootstrapConfig)
if err != nil {
return err
proc.Close()
return nil, err
}

// kick off the node's periodic bootstrapping
proc := periodicproc.Tick(BootstrapPeriod, func(worker goprocess.Process) {
defer log.EventBegin(ctx, "periodicBootstrap", n.Identity).Done()
if err := bootstrapRound(ctx, n.PeerHost, dht, n.Peerstore, peers); err != nil {
log.Error(err)
}
})

// add dht bootstrap proc as a child, so it is closed automatically when we are.
proc.AddChild(dbproc)

// we were given a context. instead of returning proc for the caller
// to manage, for now we just close the proc when context is done.
go func() {
<-ctx.Done()
proc.Close()
}()
return nil
}

// BootstrapForever starts IpfsNode bootstrapping. Unlike TryToBootstrap(),
// BootstrapForever() will run indefinitely (until its context is cancelled).
// This is particularly useful for the daemon and other services, which may
// be started offline and will come online at a future date.
//
// TODO: check offline --to--> online case works well and doesn't hurt perf.
// We may still be dialing. We should check network config.
func (nb *nodeBootstrapper) BootstrapForever(ctx context.Context, peers []peer.PeerInfo) error {
for {
if err := nb.TryToBootstrap(ctx, peers); err == nil {
return nil
}
}
return proc, nil
}

func bootstrapRound(ctx context.Context,
host host.Host,
route *dht.IpfsDHT,
peerstore peer.Peerstore,
bootstrapPeers []peer.PeerInfo) error {
cfg BootstrapConfig) error {

ctx, _ = context.WithTimeout(ctx, cfg.ConnectionTimeout)
id := host.ID()

ctx, _ = context.WithTimeout(ctx, BootstrapConnectionTimeout)
// get bootstrap peers from config. retrieving them here makes
// sure we remain observant of changes to client configuration.
peers := cfg.BootstrapPeers()

// determine how many bootstrap connections to open
connectedPeers := host.Network().Peers()
if len(connectedPeers) >= BootstrapPeerThreshold {
log.Event(ctx, "bootstrapSkip", host.ID())
connected := host.Network().Peers()
if len(connected) >= cfg.MinPeerThreshold {
log.Event(ctx, "bootstrapSkip", id)
log.Debugf("%s core bootstrap skipped -- connected to %d (> %d) nodes",
host.ID(), len(connectedPeers), BootstrapPeerThreshold)
id, len(connected), cfg.MinPeerThreshold)
return nil
}
numCxnsToCreate := BootstrapPeerThreshold - len(connectedPeers)
numToDial := cfg.MinPeerThreshold - len(connected)

// filter out bootstrap nodes we are already connected to
var notConnected []peer.PeerInfo
for _, p := range bootstrapPeers {
for _, p := range peers {
if host.Network().Connectedness(p.ID) != inet.Connected {
notConnected = append(notConnected, p)
}
}

// if connected to all bootstrap peer candidates, exit
if len(notConnected) < 1 {
log.Debugf("%s no more bootstrap peers to create %d connections", host.ID(), numCxnsToCreate)
log.Debugf("%s no more bootstrap peers to create %d connections", id, numToDial)
return ErrNotEnoughBootstrapPeers
}

// connect to a random susbset of bootstrap candidates
randomSubset := randomSubsetOfPeers(notConnected, numCxnsToCreate)
defer log.EventBegin(ctx, "bootstrapStart", host.ID()).Done()
log.Debugf("%s bootstrapping to %d nodes: %s", host.ID(), numCxnsToCreate, randomSubset)
if err := bootstrapConnect(ctx, peerstore, route, randomSubset); err != nil {
log.Event(ctx, "bootstrapError", host.ID(), lgbl.Error(err))
log.Errorf("%s bootstrap error: %s", host.ID(), err)
randSubset := randomSubsetOfPeers(notConnected, numToDial)

defer log.EventBegin(ctx, "bootstrapStart", id).Done()
log.Debugf("%s bootstrapping to %d nodes: %s", id, numToDial, randSubset)
if err := bootstrapConnect(ctx, peerstore, route, randSubset); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -196,12 +186,12 @@ func bootstrapConnect(ctx context.Context,
ps.AddAddresses(p.ID, p.Addrs)
err := route.Connect(ctx, p.ID)
if err != nil {
log.Event(ctx, "bootstrapFailed", p.ID)
log.Event(ctx, "bootstrapDialFailed", p.ID)
log.Errorf("failed to bootstrap with %v: %s", p.ID, err)
errs <- err
return
}
log.Event(ctx, "bootstrapSuccess", p.ID)
log.Event(ctx, "bootstrapDialSuccess", p.ID)
log.Infof("bootstrapped with %v", p.ID)
}(p)
}
Expand All @@ -223,7 +213,19 @@ func bootstrapConnect(ctx context.Context,
return nil
}

func toPeer(bootstrap config.BootstrapPeer) (p peer.PeerInfo, err error) {
func toPeerInfos(bpeers []config.BootstrapPeer) ([]peer.PeerInfo, error) {
var peers []peer.PeerInfo
for _, bootstrap := range bpeers {
p, err := toPeerInfo(bootstrap)
if err != nil {
return nil, err
}
peers = append(peers, p)
}
return peers, nil
}

func toPeerInfo(bootstrap config.BootstrapPeer) (p peer.PeerInfo, err error) {
id, err := peer.IDB58Decode(bootstrap.PeerID)
if err != nil {
return
Expand Down
Loading

0 comments on commit 95d58b2

Please sign in to comment.