From 95d58b2a4a79c308def0afb67d9688bb33ee46cb Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Fri, 23 Jan 2015 04:36:18 -0800 Subject: [PATCH] core: cleaned up bootstrap process --- core/bootstrap.go | 184 +++++++++++++------------ core/core.go | 113 +++++++-------- routing/dht/dht_bootstrap.go | 112 ++++++--------- test/epictest/addcat_test.go | 11 +- test/epictest/three_legged_cat_test.go | 12 +- 5 files changed, 211 insertions(+), 221 deletions(-) diff --git a/core/bootstrap.go b/core/bootstrap.go index b1b0bfaa4d1..84bf8e65203 100644 --- a/core/bootstrap.go +++ b/core/bootstrap.go @@ -3,6 +3,8 @@ package core import ( "errors" "fmt" + "io" + "io/ioutil" "math/rand" "sync" "time" @@ -18,6 +20,7 @@ import ( context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" goprocess "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" + procctx "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context" periodicproc "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic" ) @@ -25,128 +28,116 @@ import ( // peers to bootstrap correctly. var ErrNotEnoughBootstrapPeers = errors.New("not enough bootstrap peers to bootstrap") -const ( - // BootstrapPeriod governs the periodic interval at which the node will - // attempt to bootstrap. The bootstrap process is not very expensive, so - // this threshold can afford to be small (<=30s). - BootstrapPeriod = 30 * time.Second +// BootstrapConfig specifies parameters used in an IpfsNode's network +// bootstrapping process. +type BootstrapConfig struct { - // BootstrapPeerThreshold governs the node Bootstrap process. If the node - // has less open connections than this number, it will open connections + // MinPeerThreshold governs whether to bootstrap more connections. If the + // node has less open connections than this number, it will open connections // to the bootstrap nodes. From there, the routing system should be able // to use the connections to the bootstrap nodes to connect to even more // peers. Routing systems like the IpfsDHT do so in their own Bootstrap // process, which issues random queries to find more peers. - BootstrapPeerThreshold = 4 + MinPeerThreshold int + + // Period governs the periodic interval at which the node will + // attempt to bootstrap. The bootstrap process is not very expensive, so + // this threshold can afford to be small (<=30s). + Period time.Duration - // BootstrapConnectionTimeout determines how long to wait for a bootstrap + // ConnectionTimeout determines how long to wait for a bootstrap // connection attempt before cancelling it. - BootstrapConnectionTimeout time.Duration = BootstrapPeriod / 3 -) + ConnectionTimeout time.Duration + + // BootstrapPeers is a function that returns a set of bootstrap peers + // for the bootstrap process to use. This makes it possible for clients + // to control the peers the process uses at any moment. + BootstrapPeers func() []peer.PeerInfo +} -// nodeBootstrapper is a small object used to bootstrap an IpfsNode. -type nodeBootstrapper struct { - node *IpfsNode +// DefaultBootstrapConfig specifies default sane parameters for bootstrapping. +var DefaultBootstrapConfig = BootstrapConfig{ + MinPeerThreshold: 4, + Period: 30 * time.Second, + ConnectionTimeout: (30 * time.Second) / 3, // Perod / 3 } -// TryToBootstrap starts IpfsNode bootstrapping. This function will run an -// initial bootstrapping phase before exiting: connect to several bootstrap -// nodes. This allows callers to call this function synchronously to: -// - check if an error occurrs (bootstrapping unsuccessful) -// - wait before starting services which require the node to be bootstrapped -// -// If bootstrapping initially fails, Bootstrap() will try again for a total of -// three times, before giving up completely. Note that in environments where a -// node may be initialized offline, as normal operation, BootstrapForever() -// should be used instead. -// -// Note: this function could be much cleaner if we were to relax the constraint -// that we want to exit **after** we have performed initial bootstrapping (and are -// thus connected to nodes). The constraint may not be that useful in practice. -// Consider cases when we initialize the node while disconnected from the internet. -// We don't want this launch to fail... want to continue launching the node, hoping -// that bootstrapping will work in the future if we get connected. -func (nb *nodeBootstrapper) TryToBootstrap(ctx context.Context, peers []peer.PeerInfo) error { - n := nb.node +func BootstrapConfigWithPeers(pis []peer.PeerInfo) BootstrapConfig { + cfg := DefaultBootstrapConfig + cfg.BootstrapPeers = func() []peer.PeerInfo { + return pis + } + return cfg +} + +// Bootstrap kicks off IpfsNode bootstrapping. This function will periodically +// check the number of open connections and -- if there are too few -- initiate +// connections to well-known bootstrap peers. It also kicks off subsystem +// bootstrapping (i.e. routing). +func Bootstrap(n *IpfsNode, cfg BootstrapConfig) (io.Closer, error) { // TODO what bootstrapping should happen if there is no DHT? i.e. we could // continue connecting to our bootstrap peers, but for what purpose? for now // simply exit without connecting to any of them. When we introduce another // routing system that uses bootstrap peers we can change this. - dht, ok := n.Routing.(*dht.IpfsDHT) + thedht, ok := n.Routing.(*dht.IpfsDHT) if !ok { - return nil + return ioutil.NopCloser(nil), nil } - for i := 0; i < 3; i++ { - if err := bootstrapRound(ctx, n.PeerHost, dht, n.Peerstore, peers); err != nil { - return err + // the periodic bootstrap function -- the connection supervisor + periodic := func(worker goprocess.Process) { + ctx := procctx.WithProcessClosing(context.Background(), worker) + defer log.EventBegin(ctx, "periodicBootstrap", n.Identity).Done() + + if err := bootstrapRound(ctx, n.PeerHost, thedht, n.Peerstore, cfg); err != nil { + log.Event(ctx, "bootstrapError", n.Identity, lgbl.Error(err)) + log.Errorf("%s bootstrap error: %s", n.Identity, err) } } - // at this point we have done at least one round of initial bootstrap. - // we're ready to kick off dht bootstrapping. - dbproc, err := dht.Bootstrap(ctx) + // kick off the node's periodic bootstrapping + proc := periodicproc.Tick(cfg.Period, periodic) + proc.Go(periodic) // run one right now. + + // kick off dht bootstrapping. + dbproc, err := thedht.Bootstrap(dht.DefaultBootstrapConfig) if err != nil { - return err + proc.Close() + return nil, err } - // kick off the node's periodic bootstrapping - proc := periodicproc.Tick(BootstrapPeriod, func(worker goprocess.Process) { - defer log.EventBegin(ctx, "periodicBootstrap", n.Identity).Done() - if err := bootstrapRound(ctx, n.PeerHost, dht, n.Peerstore, peers); err != nil { - log.Error(err) - } - }) - // add dht bootstrap proc as a child, so it is closed automatically when we are. proc.AddChild(dbproc) - - // we were given a context. instead of returning proc for the caller - // to manage, for now we just close the proc when context is done. - go func() { - <-ctx.Done() - proc.Close() - }() - return nil -} - -// BootstrapForever starts IpfsNode bootstrapping. Unlike TryToBootstrap(), -// BootstrapForever() will run indefinitely (until its context is cancelled). -// This is particularly useful for the daemon and other services, which may -// be started offline and will come online at a future date. -// -// TODO: check offline --to--> online case works well and doesn't hurt perf. -// We may still be dialing. We should check network config. -func (nb *nodeBootstrapper) BootstrapForever(ctx context.Context, peers []peer.PeerInfo) error { - for { - if err := nb.TryToBootstrap(ctx, peers); err == nil { - return nil - } - } + return proc, nil } func bootstrapRound(ctx context.Context, host host.Host, route *dht.IpfsDHT, peerstore peer.Peerstore, - bootstrapPeers []peer.PeerInfo) error { + cfg BootstrapConfig) error { + + ctx, _ = context.WithTimeout(ctx, cfg.ConnectionTimeout) + id := host.ID() - ctx, _ = context.WithTimeout(ctx, BootstrapConnectionTimeout) + // get bootstrap peers from config. retrieving them here makes + // sure we remain observant of changes to client configuration. + peers := cfg.BootstrapPeers() // determine how many bootstrap connections to open - connectedPeers := host.Network().Peers() - if len(connectedPeers) >= BootstrapPeerThreshold { - log.Event(ctx, "bootstrapSkip", host.ID()) + connected := host.Network().Peers() + if len(connected) >= cfg.MinPeerThreshold { + log.Event(ctx, "bootstrapSkip", id) log.Debugf("%s core bootstrap skipped -- connected to %d (> %d) nodes", - host.ID(), len(connectedPeers), BootstrapPeerThreshold) + id, len(connected), cfg.MinPeerThreshold) return nil } - numCxnsToCreate := BootstrapPeerThreshold - len(connectedPeers) + numToDial := cfg.MinPeerThreshold - len(connected) // filter out bootstrap nodes we are already connected to var notConnected []peer.PeerInfo - for _, p := range bootstrapPeers { + for _, p := range peers { if host.Network().Connectedness(p.ID) != inet.Connected { notConnected = append(notConnected, p) } @@ -154,17 +145,16 @@ func bootstrapRound(ctx context.Context, // if connected to all bootstrap peer candidates, exit if len(notConnected) < 1 { - log.Debugf("%s no more bootstrap peers to create %d connections", host.ID(), numCxnsToCreate) + log.Debugf("%s no more bootstrap peers to create %d connections", id, numToDial) return ErrNotEnoughBootstrapPeers } // connect to a random susbset of bootstrap candidates - randomSubset := randomSubsetOfPeers(notConnected, numCxnsToCreate) - defer log.EventBegin(ctx, "bootstrapStart", host.ID()).Done() - log.Debugf("%s bootstrapping to %d nodes: %s", host.ID(), numCxnsToCreate, randomSubset) - if err := bootstrapConnect(ctx, peerstore, route, randomSubset); err != nil { - log.Event(ctx, "bootstrapError", host.ID(), lgbl.Error(err)) - log.Errorf("%s bootstrap error: %s", host.ID(), err) + randSubset := randomSubsetOfPeers(notConnected, numToDial) + + defer log.EventBegin(ctx, "bootstrapStart", id).Done() + log.Debugf("%s bootstrapping to %d nodes: %s", id, numToDial, randSubset) + if err := bootstrapConnect(ctx, peerstore, route, randSubset); err != nil { return err } return nil @@ -196,12 +186,12 @@ func bootstrapConnect(ctx context.Context, ps.AddAddresses(p.ID, p.Addrs) err := route.Connect(ctx, p.ID) if err != nil { - log.Event(ctx, "bootstrapFailed", p.ID) + log.Event(ctx, "bootstrapDialFailed", p.ID) log.Errorf("failed to bootstrap with %v: %s", p.ID, err) errs <- err return } - log.Event(ctx, "bootstrapSuccess", p.ID) + log.Event(ctx, "bootstrapDialSuccess", p.ID) log.Infof("bootstrapped with %v", p.ID) }(p) } @@ -223,7 +213,19 @@ func bootstrapConnect(ctx context.Context, return nil } -func toPeer(bootstrap config.BootstrapPeer) (p peer.PeerInfo, err error) { +func toPeerInfos(bpeers []config.BootstrapPeer) ([]peer.PeerInfo, error) { + var peers []peer.PeerInfo + for _, bootstrap := range bpeers { + p, err := toPeerInfo(bootstrap) + if err != nil { + return nil, err + } + peers = append(peers, p) + } + return peers, nil +} + +func toPeerInfo(bootstrap config.BootstrapPeer) (p peer.PeerInfo, err error) { id, err := peer.IDB58Decode(bootstrap.PeerID) if err != nil { return diff --git a/core/core.go b/core/core.go index 0bf6d394026..a2839f610dd 100644 --- a/core/core.go +++ b/core/core.go @@ -11,33 +11,36 @@ import ( datastore "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" + eventlog "github.com/jbenet/go-ipfs/thirdparty/eventlog" + debugerror "github.com/jbenet/go-ipfs/util/debugerror" + + diag "github.com/jbenet/go-ipfs/diagnostics" + ic "github.com/jbenet/go-ipfs/p2p/crypto" + p2phost "github.com/jbenet/go-ipfs/p2p/host" + p2pbhost "github.com/jbenet/go-ipfs/p2p/host/basic" + swarm "github.com/jbenet/go-ipfs/p2p/net/swarm" + addrutil "github.com/jbenet/go-ipfs/p2p/net/swarm/addr" + peer "github.com/jbenet/go-ipfs/p2p/peer" + + routing "github.com/jbenet/go-ipfs/routing" + dht "github.com/jbenet/go-ipfs/routing/dht" + offroute "github.com/jbenet/go-ipfs/routing/offline" + bstore "github.com/jbenet/go-ipfs/blocks/blockstore" bserv "github.com/jbenet/go-ipfs/blockservice" - diag "github.com/jbenet/go-ipfs/diagnostics" exchange "github.com/jbenet/go-ipfs/exchange" bitswap "github.com/jbenet/go-ipfs/exchange/bitswap" bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network" offline "github.com/jbenet/go-ipfs/exchange/offline" rp "github.com/jbenet/go-ipfs/exchange/reprovide" + mount "github.com/jbenet/go-ipfs/fuse/mount" merkledag "github.com/jbenet/go-ipfs/merkledag" namesys "github.com/jbenet/go-ipfs/namesys" - ic "github.com/jbenet/go-ipfs/p2p/crypto" - p2phost "github.com/jbenet/go-ipfs/p2p/host" - p2pbhost "github.com/jbenet/go-ipfs/p2p/host/basic" - swarm "github.com/jbenet/go-ipfs/p2p/net/swarm" - addrutil "github.com/jbenet/go-ipfs/p2p/net/swarm/addr" - peer "github.com/jbenet/go-ipfs/p2p/peer" path "github.com/jbenet/go-ipfs/path" pin "github.com/jbenet/go-ipfs/pin" repo "github.com/jbenet/go-ipfs/repo" config "github.com/jbenet/go-ipfs/repo/config" - routing "github.com/jbenet/go-ipfs/routing" - dht "github.com/jbenet/go-ipfs/routing/dht" - offroute "github.com/jbenet/go-ipfs/routing/offline" - eventlog "github.com/jbenet/go-ipfs/thirdparty/eventlog" - debugerror "github.com/jbenet/go-ipfs/util/debugerror" - lgbl "github.com/jbenet/go-ipfs/util/eventlog/loggables" ) const IpnsValidatorTag = "ipns" @@ -75,13 +78,14 @@ type IpfsNode struct { Resolver *path.Resolver // the path resolution system // Online - PrivateKey ic.PrivKey // the local node's private Key - PeerHost p2phost.Host // the network host (server+client) - Routing routing.IpfsRouting // the routing system. recommend ipfs-dht - Exchange exchange.Interface // the block exchange + strategy (bitswap) - Namesys namesys.NameSystem // the name system, resolves paths to hashes - Diagnostics *diag.Diagnostics // the diagnostics service - Reprovider *rp.Reprovider // the value reprovider system + PrivateKey ic.PrivKey // the local node's private Key + PeerHost p2phost.Host // the network host (server+client) + Bootstrapper io.Closer // the periodic bootstrapper + Routing routing.IpfsRouting // the routing system. recommend ipfs-dht + Exchange exchange.Interface // the block exchange + strategy (bitswap) + Namesys namesys.NameSystem // the name system, resolves paths to hashes + Diagnostics *diag.Diagnostics // the diagnostics service + Reprovider *rp.Reprovider // the value reprovider system ctxgroup.ContextGroup @@ -238,14 +242,7 @@ func (n *IpfsNode) StartOnlineServices(ctx context.Context) error { n.Reprovider = rp.NewReprovider(n.Routing, n.Blockstore) go n.Reprovider.ProvideEvery(ctx, kReprovideFrequency) - // prepare bootstrap peers from config - bpeers, err := n.loadBootstrapPeers() - if err != nil { - log.Event(ctx, "bootstrapError", n.Identity, lgbl.Error(err)) - log.Errorf("%s bootstrap error: %s", n.Identity, err) - return debugerror.Wrap(err) - } - return n.Bootstrap(ctx, bpeers) + return n.Bootstrap(DefaultBootstrapConfig) } // teardown closes owned children. If any errors occur, this function returns @@ -254,20 +251,20 @@ func (n *IpfsNode) teardown() error { // owned objects are closed in this teardown to ensure that they're closed // regardless of which constructor was used to add them to the node. var closers []io.Closer - if n.Repo != nil { - closers = append(closers, n.Repo) - } - if n.Blocks != nil { - closers = append(closers, n.Blocks) - } - if n.Routing != nil { - if dht, ok := n.Routing.(*dht.IpfsDHT); ok { - closers = append(closers, dht) + addCloser := func(c io.Closer) { + if c != nil { + closers = append(closers, c) } } - if n.PeerHost != nil { - closers = append(closers, n.PeerHost) + + addCloser(n.Bootstrapper) + addCloser(n.Repo) + addCloser(n.Blocks) + if dht, ok := n.Routing.(*dht.IpfsDHT); ok { + addCloser(dht) } + addCloser(n.PeerHost) + var errs []error for _, closer := range closers { if err := closer.Close(); err != nil { @@ -293,16 +290,34 @@ func (n *IpfsNode) Resolve(path string) (*merkledag.Node, error) { return n.Resolver.ResolvePath(path) } -// Bootstrap is undefined when node is not in OnlineMode -func (n *IpfsNode) Bootstrap(ctx context.Context, peers []peer.PeerInfo) error { +func (n *IpfsNode) Bootstrap(cfg BootstrapConfig) error { // TODO what should return value be when in offlineMode? if n.Routing == nil { return nil } - nb := nodeBootstrapper{n} - return nb.TryToBootstrap(ctx, peers) + if n.Bootstrapper != nil { + n.Bootstrapper.Close() // stop previous bootstrap process. + } + + // if the caller did not specify a bootstrap peer function, get the + // freshest bootstrap peers from config. this responds to live changes. + if cfg.BootstrapPeers == nil { + cfg.BootstrapPeers = func() []peer.PeerInfo { + bpeers := n.Repo.Config().Bootstrap + ps, err := toPeerInfos(bpeers) + if err != nil { + log.Error("failed to parse bootstrap peers from config: %s", bpeers) + return nil + } + return ps + } + } + + var err error + n.Bootstrapper, err = Bootstrap(n, cfg) + return err } func (n *IpfsNode) loadID() error { @@ -342,18 +357,6 @@ func (n *IpfsNode) loadPrivateKey() error { return nil } -func (n *IpfsNode) loadBootstrapPeers() ([]peer.PeerInfo, error) { - var peers []peer.PeerInfo - for _, bootstrap := range n.Repo.Config().Bootstrap { - p, err := toPeer(bootstrap) - if err != nil { - return nil, err - } - peers = append(peers, p) - } - return peers, nil -} - // SetupOfflineRouting loads the local nodes private key and // uses it to instantiate a routing system in offline mode. // This is primarily used for offline ipns modifications. diff --git a/routing/dht/dht_bootstrap.go b/routing/dht/dht_bootstrap.go index 588bcfd754d..c91df05e5f1 100644 --- a/routing/dht/dht_bootstrap.go +++ b/routing/dht/dht_bootstrap.go @@ -17,52 +17,42 @@ import ( periodicproc "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic" ) -// DefaultBootstrapQueries specifies how many queries to run, -// if the user does not specify a different number as an option. +// BootstrapConfig specifies parameters used bootstrapping the DHT. // -// For now, this is set to 16 queries, which is an aggressive number. -// We are currently more interested in ensuring we have a properly formed -// DHT than making sure our dht minimizes traffic. Once we are more certain -// of our implementation's robustness, we should lower this down to 8 or 4. -// -// Note there is also a tradeoff between the bootstrap period and the number -// of queries. We could support a higher period with a smaller number of -// queries -const DefaultBootstrapQueries = 1 +// Note there is a tradeoff between the bootstrap period and the +// number of queries. We could support a higher period with less +// queries. +type BootstrapConfig struct { + Queries int // how many queries to run per period + Period time.Duration // how often to run periodi cbootstrap. + Timeout time.Duration // how long to wait for a bootstrao query to run +} -// DefaultBootstrapPeriod specifies how often to periodically run bootstrap, -// if the user does not specify a different number as an option. -// -// For now, this is set to 10 seconds, which is an aggressive period. We are -// We are currently more interested in ensuring we have a properly formed -// DHT than making sure our dht minimizes traffic. Once we are more certain -// implementation's robustness, we should lower this down to 30s or 1m. -// -// Note there is also a tradeoff between the bootstrap period and the number -// of queries. We could support a higher period with a smaller number of -// queries -const DefaultBootstrapPeriod = time.Duration(10 * time.Second) - -// DefaultBootstrapTimeout specifies how long to wait for a bootstrap query -// to run. -const DefaultBootstrapTimeout = time.Duration(10 * time.Second) - -// Bootstrap runs bootstrapping once, then calls SignalBootstrap with default -// parameters: DefaultBootstrapQueries and DefaultBootstrapPeriod. This allows -// the user to catch an error off the bat if the connections are faulty. It also -// allows BootstrapOnSignal not to run bootstrap at the beginning, which is useful -// for instrumenting it on tests, or delaying bootstrap until the network is online -// and connected to at least a few nodes. -// -// Like PeriodicBootstrap, Bootstrap returns a process, so the user can stop it. -func (dht *IpfsDHT) Bootstrap(ctx context.Context) (goprocess.Process, error) { +var DefaultBootstrapConfig = BootstrapConfig{ + // For now, this is set to 1 query. + // We are currently more interested in ensuring we have a properly formed + // DHT than making sure our dht minimizes traffic. Once we are more certain + // of our implementation's robustness, we should lower this down to 8 or 4. + Queries: 1, - if err := dht.runBootstrap(ctx, DefaultBootstrapQueries); err != nil { - return nil, err - } + // For now, this is set to 10 seconds, which is an aggressive period. We are + // We are currently more interested in ensuring we have a properly formed + // DHT than making sure our dht minimizes traffic. Once we are more certain + // implementation's robustness, we should lower this down to 30s or 1m. + Period: time.Duration(20 * time.Second), - sig := time.Tick(DefaultBootstrapPeriod) - return dht.BootstrapOnSignal(DefaultBootstrapQueries, sig) + Timeout: time.Duration(20 * time.Second), +} + +// Bootstrap ensures the dht routing table remains healthy as peers come and go. +// it builds up a list of peers by requesting random peer IDs. The Bootstrap +// process will run a number of queries each time, and run every time signal fires. +// These parameters are configurable. +// +// Bootstrap returns a process, so the user can stop it. +func (dht *IpfsDHT) Bootstrap(config BootstrapConfig) (goprocess.Process, error) { + sig := time.Tick(config.Period) + return dht.BootstrapOnSignal(config, sig) } // SignalBootstrap ensures the dht routing table remains healthy as peers come and go. @@ -71,9 +61,9 @@ func (dht *IpfsDHT) Bootstrap(ctx context.Context) (goprocess.Process, error) { // These parameters are configurable. // // SignalBootstrap returns a process, so the user can stop it. -func (dht *IpfsDHT) BootstrapOnSignal(queries int, signal <-chan time.Time) (goprocess.Process, error) { - if queries <= 0 { - return nil, fmt.Errorf("invalid number of queries: %d", queries) +func (dht *IpfsDHT) BootstrapOnSignal(cfg BootstrapConfig, signal <-chan time.Time) (goprocess.Process, error) { + if cfg.Queries <= 0 { + return nil, fmt.Errorf("invalid number of queries: %d", cfg.Queries) } if signal == nil { @@ -85,27 +75,9 @@ func (dht *IpfsDHT) BootstrapOnSignal(queries int, signal <-chan time.Time) (gop // maybe this is a good case for whole module event pub/sub? ctx := dht.Context() - if err := dht.runBootstrap(ctx, queries); err != nil { + if err := dht.runBootstrap(ctx, cfg); err != nil { log.Error(err) // A bootstrapping error is important to notice but not fatal. - // maybe the client should be able to consume these errors, - // though I dont have a clear use case in mind-- what **could** - // the client do if one of the bootstrap calls fails? - // - // This is also related to the core's bootstrap failures. - // superviseConnections should perhaps allow clients to detect - // bootstrapping problems. - // - // Anyway, passing errors could be done with a bootstrapper object. - // this would imply the client should be able to consume a lot of - // other non-fatal dht errors too. providing this functionality - // should be done correctly DHT-wide. - // NB: whatever the design, clients must ensure they drain errors! - // This pattern is common to many things, perhaps long-running services - // should have something like an ErrStream that allows clients to consume - // periodic errors and take action. It should allow the user to also - // ignore all errors with something like an ErrStreamDiscard. We should - // study what other systems do for ideas. } }) @@ -113,7 +85,7 @@ func (dht *IpfsDHT) BootstrapOnSignal(queries int, signal <-chan time.Time) (gop } // runBootstrap builds up list of peers by requesting random peer IDs -func (dht *IpfsDHT) runBootstrap(ctx context.Context, queries int) error { +func (dht *IpfsDHT) runBootstrap(ctx context.Context, cfg BootstrapConfig) error { bslog := func(msg string) { log.Debugf("DHT %s dhtRunBootstrap %s -- routing table size: %d", dht.self, msg, dht.routingTable.Size()) } @@ -133,7 +105,7 @@ func (dht *IpfsDHT) runBootstrap(ctx context.Context, queries int) error { } // bootstrap sequentially, as results will compound - ctx, cancel := context.WithTimeout(ctx, DefaultBootstrapTimeout) + ctx, cancel := context.WithTimeout(ctx, cfg.Timeout) defer cancel() runQuery := func(ctx context.Context, id peer.ID) { p, err := dht.FindPeer(ctx, id) @@ -154,9 +126,9 @@ func (dht *IpfsDHT) runBootstrap(ctx context.Context, queries int) error { if sequential { // these should be parallel normally. but can make them sequential for debugging. // note that the core/bootstrap context deadline should be extended too for that. - for i := 0; i < queries; i++ { + for i := 0; i < cfg.Queries; i++ { id := randomID() - log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, queries, id) + log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, cfg.Queries, id) runQuery(ctx, id) } @@ -166,13 +138,13 @@ func (dht *IpfsDHT) runBootstrap(ctx context.Context, queries int) error { // normally, we should be selecting on ctx.Done() here too, but this gets // complicated to do with WaitGroup, and doesnt wait for the children to exit. var wg sync.WaitGroup - for i := 0; i < queries; i++ { + for i := 0; i < cfg.Queries; i++ { wg.Add(1) go func() { defer wg.Done() id := randomID() - log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, queries, id) + log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, cfg.Queries, id) runQuery(ctx, id) }() } diff --git a/test/epictest/addcat_test.go b/test/epictest/addcat_test.go index 047c312e941..9c5486ed62b 100644 --- a/test/epictest/addcat_test.go +++ b/test/epictest/addcat_test.go @@ -115,8 +115,15 @@ func DirectAddCat(data []byte, conf testutil.LatencyConfig) error { } defer catter.Close() - catter.Bootstrap(ctx, []peer.PeerInfo{adder.Peerstore.PeerInfo(adder.Identity)}) - adder.Bootstrap(ctx, []peer.PeerInfo{catter.Peerstore.PeerInfo(catter.Identity)}) + bs1 := []peer.PeerInfo{adder.Peerstore.PeerInfo(adder.Identity)} + bs2 := []peer.PeerInfo{catter.Peerstore.PeerInfo(catter.Identity)} + + if err := catter.Bootstrap(core.BootstrapConfigWithPeers(bs1)); err != nil { + return err + } + if err := adder.Bootstrap(core.BootstrapConfigWithPeers(bs2)); err != nil { + return err + } keyAdded, err := coreunix.Add(adder, bytes.NewReader(data)) if err != nil { diff --git a/test/epictest/three_legged_cat_test.go b/test/epictest/three_legged_cat_test.go index c86403ff70b..ff3f036a372 100644 --- a/test/epictest/three_legged_cat_test.go +++ b/test/epictest/three_legged_cat_test.go @@ -62,9 +62,15 @@ func RunThreeLeggedCat(data []byte, conf testutil.LatencyConfig) error { return err } defer bootstrap.Close() - boostrapInfo := bootstrap.Peerstore.PeerInfo(bootstrap.PeerHost.ID()) - adder.Bootstrap(ctx, []peer.PeerInfo{boostrapInfo}) - catter.Bootstrap(ctx, []peer.PeerInfo{boostrapInfo}) + + bis := bootstrap.Peerstore.PeerInfo(bootstrap.PeerHost.ID()) + bcfg := core.BootstrapConfigWithPeers([]peer.PeerInfo{bis}) + if err := adder.Bootstrap(bcfg); err != nil { + return err + } + if err := catter.Bootstrap(bcfg); err != nil { + return err + } keyAdded, err := coreunix.Add(adder, bytes.NewReader(data)) if err != nil {