diff --git a/core/bootstrap.go b/core/bootstrap.go index f8da807bf71d..74b2253ffe27 100644 --- a/core/bootstrap.go +++ b/core/bootstrap.go @@ -2,6 +2,7 @@ package core import ( "errors" + "fmt" "math/rand" "sync" "time" @@ -16,109 +17,187 @@ import ( context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" + goprocess "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" + periodicproc "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic" ) +// ErrNotEnoughBootstrapPeers signals that we do not have enough bootstrap +// peers to bootstrap correctly. +var ErrNotEnoughBootstrapPeers = errors.New("not enough bootstrap peers to bootstrap") + const ( - period = 30 * time.Second // how often to check connection status - connectiontimeout time.Duration = period / 3 // duration to wait when attempting to connect - recoveryThreshold = 4 // attempt to bootstrap if connection count falls below this value - numDHTBootstrapQueries = 15 // number of DHT queries to execute + // BootstrapPeriod governs the periodic interval at which the node will + // attempt to bootstrap. The bootstrap process is not very expensive, so + // this threshold can afford to be small (<=30s). + BootstrapPeriod = 30 * time.Second + + // BootstrapPeerThreshold governs the node Bootstrap process. If the node + // has less open connections than this number, it will open connections + // to the bootstrap nodes. From there, the routing system should be able + // to use the connections to the bootstrap nodes to connect to even more + // peers. Routing systems like the IpfsDHT do so in their own Bootstrap + // process, which issues random queries to find more peers. + BootstrapPeerThreshold = 4 + + // BootstrapConnectionTimeout determines how long to wait for a bootstrap + // connection attempt before cancelling it. + BootstrapConnectionTimeout time.Duration = BootstrapPeriod / 3 ) -func superviseConnections(parent context.Context, - h host.Host, - route *dht.IpfsDHT, // TODO depend on abstract interface for testing purposes - store peer.Peerstore, - peers []peer.PeerInfo) error { +// nodeBootstrapper is a small object used to bootstrap an IpfsNode. +type nodeBootstrapper struct { + node *IpfsNode +} - var dhtAlreadyBootstrapping bool +// TryToBootstrap starts IpfsNode bootstrapping. This function will run an +// initial bootstrapping phase before exiting: connect to several bootstrap +// nodes. This allows callers to call this function synchronously to: +// - check if an error occurrs (bootstrapping unsuccessful) +// - wait before starting services which require the node to be bootstrapped +// +// If bootstrapping initially fails, Bootstrap() will try again for a total of +// three times, before giving up completely. Note that in environments where a +// node may be initialized offline, as normal operation, BootstrapForever() +// should be used instead. +// +// Note: this function could be much cleaner if we were to relax the constraint +// that we want to exit **after** we have performed initial bootstrapping (and are +// thus connected to nodes). The constraint may not be that useful in practice. +// Consider cases when we initialize the node while disconnected from the internet. +// We don't want this launch to fail... want to continue launching the node, hoping +// that bootstrapping will work in the future if we get connected. +func (nb *nodeBootstrapper) TryToBootstrap(ctx context.Context, peers []peer.PeerInfo) error { + n := nb.node + + // TODO what bootstrapping should happen if there is no DHT? i.e. we could + // continue connecting to our bootstrap peers, but for what purpose? for now + // simply exit without connecting to any of them. When we introduce another + // routing system that uses bootstrap peers we can change this. + dht, ok := n.Routing.(*dht.IpfsDHT) + if !ok { + return nil + } - for { - ctx, _ := context.WithTimeout(parent, connectiontimeout) - // TODO get config from disk so |peers| always reflects the latest - // information - if err := bootstrap(ctx, h, route, store, peers); err != nil { - log.Error(err) + for i := 0; i < 3; i++ { + if err := bootstrapRound(ctx, n.PeerHost, dht, n.Peerstore, peers); err != nil { + return err } + } - if !dhtAlreadyBootstrapping { - dhtAlreadyBootstrapping = true // only call dht.Bootstrap once. - if _, err := route.Bootstrap(); err != nil { - log.Error(err) - } + // at this point we have done at least one round of initial bootstrap. + // we're ready to kick off dht bootstrapping. + dbproc, err := dht.Bootstrap(ctx) + if err != nil { + return err + } + + // kick off the node's periodic bootstrapping + proc := periodicproc.Tick(BootstrapPeriod, func(worker goprocess.Process) { + if err := bootstrapRound(ctx, n.PeerHost, dht, n.Peerstore, peers); err != nil { + log.Error(err) } + }) + + // add dht bootstrap proc as a child, so it is closed automatically when we are. + proc.AddChild(dbproc) + + // we were given a context. instead of returning proc for the caller + // to manage, for now we just close the proc when context is done. + go func() { + <-ctx.Done() + proc.Close() + }() + return nil +} - select { - case <-parent.Done(): - return parent.Err() - case <-time.Tick(period): +// BootstrapForever starts IpfsNode bootstrapping. Unlike TryToBootstrap(), +// BootstrapForever() will run indefinitely (until its context is cancelled). +// This is particularly useful for the daemon and other services, which may +// be started offline and will come online at a future date. +// +// TODO: check offline --to--> online case works well and doesn't hurt perf. +// We may still be dialing. We should check network config. +func (nb *nodeBootstrapper) BootstrapForever(ctx context.Context, peers []peer.PeerInfo) error { + for { + if err := nb.TryToBootstrap(ctx, peers); err == nil { + return nil } } - return nil } -func bootstrap(ctx context.Context, - h host.Host, - r *dht.IpfsDHT, - ps peer.Peerstore, +func bootstrapRound(ctx context.Context, + host host.Host, + route *dht.IpfsDHT, + peerstore peer.Peerstore, bootstrapPeers []peer.PeerInfo) error { - connectedPeers := h.Network().Peers() - if len(connectedPeers) >= recoveryThreshold { - log.Event(ctx, "bootstrapSkip", h.ID()) - log.Debugf("%s core bootstrap skipped -- connected to %d (> %d) nodes", - h.ID(), len(connectedPeers), recoveryThreshold) + ctx, _ = context.WithTimeout(ctx, BootstrapConnectionTimeout) + // determine how many bootstrap connections to open + connectedPeers := host.Network().Peers() + if len(connectedPeers) >= BootstrapPeerThreshold { + log.Event(ctx, "bootstrapSkip", host.ID()) + log.Debugf("%s core bootstrap skipped -- connected to %d (> %d) nodes", + host.ID(), len(connectedPeers), BootstrapPeerThreshold) return nil } - numCxnsToCreate := recoveryThreshold - len(connectedPeers) - - log.Event(ctx, "bootstrapStart", h.ID()) - log.Debugf("%s core bootstrapping to %d more nodes", h.ID(), numCxnsToCreate) + numCxnsToCreate := BootstrapPeerThreshold - len(connectedPeers) + // filter out bootstrap nodes we are already connected to var notConnected []peer.PeerInfo for _, p := range bootstrapPeers { - if h.Network().Connectedness(p.ID) != inet.Connected { + if host.Network().Connectedness(p.ID) != inet.Connected { notConnected = append(notConnected, p) } } - // if not connected to all bootstrap peer candidates - if len(notConnected) > 0 { - var randomSubset = randomSubsetOfPeers(notConnected, numCxnsToCreate) - log.Debugf("%s bootstrapping to %d nodes: %s", h.ID(), numCxnsToCreate, randomSubset) - if err := connect(ctx, ps, r, randomSubset); err != nil { - log.Event(ctx, "bootstrapError", h.ID(), lgbl.Error(err)) - log.Errorf("%s bootstrap error: %s", h.ID(), err) - return err - } + // if connected to all bootstrap peer candidates, exit + if len(notConnected) < 1 { + log.Debugf("%s no more bootstrap peers to create %d connections", host.ID(), numCxnsToCreate) + return ErrNotEnoughBootstrapPeers + } + + // connect to a random susbset of bootstrap candidates + var randomSubset = randomSubsetOfPeers(notConnected, numCxnsToCreate) + log.Event(ctx, "bootstrapStart", host.ID()) + log.Debugf("%s bootstrapping to %d nodes: %s", host.ID(), numCxnsToCreate, randomSubset) + if err := bootstrapConnect(ctx, peerstore, route, randomSubset); err != nil { + log.Event(ctx, "bootstrapError", host.ID(), lgbl.Error(err)) + log.Errorf("%s bootstrap error: %s", host.ID(), err) + return err } return nil } -func connect(ctx context.Context, ps peer.Peerstore, r *dht.IpfsDHT, peers []peer.PeerInfo) error { +func bootstrapConnect(ctx context.Context, + ps peer.Peerstore, + route *dht.IpfsDHT, + peers []peer.PeerInfo) error { if len(peers) < 1 { - return errors.New("bootstrap set empty") + return ErrNotEnoughBootstrapPeers } + errs := make(chan error, len(peers)) var wg sync.WaitGroup for _, p := range peers { // performed asynchronously because when performed synchronously, if // one `Connect` call hangs, subsequent calls are more likely to // fail/abort due to an expiring context. + // Also, performed asynchronously for dial speed. wg.Add(1) go func(p peer.PeerInfo) { defer wg.Done() - log.Event(ctx, "bootstrapDial", r.LocalPeer(), p.ID) - log.Debugf("%s bootstrapping to %s", r.LocalPeer(), p.ID) + log.Event(ctx, "bootstrapDial", route.LocalPeer(), p.ID) + log.Debugf("%s bootstrapping to %s", route.LocalPeer(), p.ID) ps.AddAddresses(p.ID, p.Addrs) - err := r.Connect(ctx, p.ID) + err := route.Connect(ctx, p.ID) if err != nil { log.Event(ctx, "bootstrapFailed", p.ID) - log.Criticalf("failed to bootstrap with %v: %s", p.ID, err) + log.Errorf("failed to bootstrap with %v: %s", p.ID, err) + errs <- err return } log.Event(ctx, "bootstrapSuccess", p.ID) @@ -126,6 +205,20 @@ func connect(ctx context.Context, ps peer.Peerstore, r *dht.IpfsDHT, peers []pee }(p) } wg.Wait() + + // our failure condition is when no connection attempt succeeded. + // So drain the errs channel, counting the results. + close(errs) + count := 0 + var err error + for err = range errs { + if err != nil { + count++ + } + } + if count == len(peers) { + return fmt.Errorf("failed to bootstrap. %s", err) + } return nil } diff --git a/core/core.go b/core/core.go index a2e03f2155a4..d5ece35e9bf6 100644 --- a/core/core.go +++ b/core/core.go @@ -297,30 +297,12 @@ func (n *IpfsNode) Resolve(k util.Key) (*merkledag.Node, error) { func (n *IpfsNode) Bootstrap(ctx context.Context, peers []peer.PeerInfo) error { // TODO what should return value be when in offlineMode? - if n.Routing == nil { return nil } - // TODO what bootstrapping should happen if there is no DHT? i.e. we could - // continue connecting to our bootstrap peers, but for what purpose? - dhtRouting, ok := n.Routing.(*dht.IpfsDHT) - if !ok { - return nil - } - - // TODO consider moving connection supervision into the Network. We've - // discussed improvements to this Node constructor. One improvement - // would be to make the node configurable, allowing clients to inject - // an Exchange, Network, or Routing component and have the constructor - // manage the wiring. In that scenario, this dangling function is a bit - // awkward. - - // spin off the node's connection supervisor. - // TODO, clean up how this thing works. Make the superviseConnections thing - // work like the DHT.Bootstrap. - go superviseConnections(ctx, n.PeerHost, dhtRouting, n.Peerstore, peers) - return nil + nb := nodeBootstrapper{n} + return nb.TryToBootstrap(ctx, peers) } func (n *IpfsNode) loadID() error { diff --git a/routing/dht/dht_bootstrap.go b/routing/dht/dht_bootstrap.go index 095c194d676e..c3991972ce4f 100644 --- a/routing/dht/dht_bootstrap.go +++ b/routing/dht/dht_bootstrap.go @@ -14,6 +14,7 @@ import ( context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" goprocess "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" + periodicproc "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic" ) // DefaultBootstrapQueries specifies how many queries to run, @@ -54,9 +55,9 @@ const DefaultBootstrapTimeout = time.Duration(10 * time.Second) // and connected to at least a few nodes. // // Like PeriodicBootstrap, Bootstrap returns a process, so the user can stop it. -func (dht *IpfsDHT) Bootstrap() (goprocess.Process, error) { +func (dht *IpfsDHT) Bootstrap(ctx context.Context) (goprocess.Process, error) { - if err := dht.runBootstrap(dht.Context(), DefaultBootstrapQueries); err != nil { + if err := dht.runBootstrap(ctx, DefaultBootstrapQueries); err != nil { return nil, err } @@ -79,41 +80,32 @@ func (dht *IpfsDHT) BootstrapOnSignal(queries int, signal <-chan time.Time) (gop return nil, fmt.Errorf("invalid signal: %v", signal) } - proc := goprocess.Go(func(worker goprocess.Process) { - defer log.Debug("dht bootstrapper shutting down") - for { - select { - case <-worker.Closing(): - return - - case <-signal: - // it would be useful to be able to send out signals of when we bootstrap, too... - // maybe this is a good case for whole module event pub/sub? - - ctx := dht.Context() - if err := dht.runBootstrap(ctx, queries); err != nil { - log.Error(err) - // A bootstrapping error is important to notice but not fatal. - // maybe the client should be able to consume these errors, - // though I dont have a clear use case in mind-- what **could** - // the client do if one of the bootstrap calls fails? - // - // This is also related to the core's bootstrap failures. - // superviseConnections should perhaps allow clients to detect - // bootstrapping problems. - // - // Anyway, passing errors could be done with a bootstrapper object. - // this would imply the client should be able to consume a lot of - // other non-fatal dht errors too. providing this functionality - // should be done correctly DHT-wide. - // NB: whatever the design, clients must ensure they drain errors! - // This pattern is common to many things, perhaps long-running services - // should have something like an ErrStream that allows clients to consume - // periodic errors and take action. It should allow the user to also - // ignore all errors with something like an ErrStreamDiscard. We should - // study what other systems do for ideas. - } - } + proc := periodicproc.Ticker(signal, func(worker goprocess.Process) { + // it would be useful to be able to send out signals of when we bootstrap, too... + // maybe this is a good case for whole module event pub/sub? + + ctx := dht.Context() + if err := dht.runBootstrap(ctx, queries); err != nil { + log.Error(err) + // A bootstrapping error is important to notice but not fatal. + // maybe the client should be able to consume these errors, + // though I dont have a clear use case in mind-- what **could** + // the client do if one of the bootstrap calls fails? + // + // This is also related to the core's bootstrap failures. + // superviseConnections should perhaps allow clients to detect + // bootstrapping problems. + // + // Anyway, passing errors could be done with a bootstrapper object. + // this would imply the client should be able to consume a lot of + // other non-fatal dht errors too. providing this functionality + // should be done correctly DHT-wide. + // NB: whatever the design, clients must ensure they drain errors! + // This pattern is common to many things, perhaps long-running services + // should have something like an ErrStream that allows clients to consume + // periodic errors and take action. It should allow the user to also + // ignore all errors with something like an ErrStreamDiscard. We should + // study what other systems do for ideas. } })