From 82d38a269237895150a2c2207f189e440b85883f Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Fri, 16 Jan 2015 12:52:12 -0800 Subject: [PATCH 01/20] routing/dht: periodic bootstrapping #572 --- core/bootstrap.go | 2 +- routing/dht/dht.go | 63 ------------ routing/dht/dht_bootstrap.go | 181 ++++++++++++++++++++++++++++++++++ routing/dht/dht_test.go | 184 +++++++++++++++++++++++++++-------- 4 files changed, 328 insertions(+), 102 deletions(-) create mode 100644 routing/dht/dht_bootstrap.go diff --git a/core/bootstrap.go b/core/bootstrap.go index 72a8be52508..e52188b5029 100644 --- a/core/bootstrap.go +++ b/core/bootstrap.go @@ -86,7 +86,7 @@ func bootstrap(ctx context.Context, // we can try running dht bootstrap even if we're connected to all bootstrap peers. if len(h.Network().Conns()) > 0 { - if err := r.Bootstrap(ctx, numDHTBootstrapQueries); err != nil { + if _, err := r.Bootstrap(); err != nil { // log this as Info. later on, discern better between errors. log.Infof("dht bootstrap err: %s", err) return nil diff --git a/routing/dht/dht.go b/routing/dht/dht.go index 923c8c69b71..0fd5177a2ce 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -370,66 +370,3 @@ func (dht *IpfsDHT) PingRoutine(t time.Duration) { } } } - -// Bootstrap builds up list of peers by requesting random peer IDs -func (dht *IpfsDHT) Bootstrap(ctx context.Context, queries int) error { - var merr u.MultiErr - - randomID := func() peer.ID { - // 16 random bytes is not a valid peer id. it may be fine becuase - // the dht will rehash to its own keyspace anyway. - id := make([]byte, 16) - rand.Read(id) - return peer.ID(id) - } - - // bootstrap sequentially, as results will compound - runQuery := func(ctx context.Context, id peer.ID) { - p, err := dht.FindPeer(ctx, id) - if err == routing.ErrNotFound { - // this isn't an error. this is precisely what we expect. - } else if err != nil { - merr = append(merr, err) - } else { - // woah, actually found a peer with that ID? this shouldn't happen normally - // (as the ID we use is not a real ID). this is an odd error worth logging. - err := fmt.Errorf("Bootstrap peer error: Actually FOUND peer. (%s, %s)", id, p) - log.Errorf("%s", err) - merr = append(merr, err) - } - } - - sequential := true - if sequential { - // these should be parallel normally. but can make them sequential for debugging. - // note that the core/bootstrap context deadline should be extended too for that. - for i := 0; i < queries; i++ { - id := randomID() - log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, queries, id) - runQuery(ctx, id) - } - - } else { - // note on parallelism here: the context is passed in to the queries, so they - // **should** exit when it exceeds, making this function exit on ctx cancel. - // normally, we should be selecting on ctx.Done() here too, but this gets - // complicated to do with WaitGroup, and doesnt wait for the children to exit. - var wg sync.WaitGroup - for i := 0; i < queries; i++ { - wg.Add(1) - go func() { - defer wg.Done() - - id := randomID() - log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, queries, id) - runQuery(ctx, id) - }() - } - wg.Wait() - } - - if len(merr) > 0 { - return merr - } - return nil -} diff --git a/routing/dht/dht_bootstrap.go b/routing/dht/dht_bootstrap.go new file mode 100644 index 00000000000..271fa747493 --- /dev/null +++ b/routing/dht/dht_bootstrap.go @@ -0,0 +1,181 @@ +// Package dht implements a distributed hash table that satisfies the ipfs routing +// interface. This DHT is modeled after kademlia with Coral and S/Kademlia modifications. +package dht + +import ( + "crypto/rand" + "fmt" + "sync" + "time" + + peer "github.com/jbenet/go-ipfs/p2p/peer" + routing "github.com/jbenet/go-ipfs/routing" + u "github.com/jbenet/go-ipfs/util" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + goprocess "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" +) + +// DefaultBootstrapQueries specifies how many queries to run, +// if the user does not specify a different number as an option. +// +// For now, this is set to 16 queries, which is an aggressive number. +// We are currently more interested in ensuring we have a properly formed +// DHT than making sure our dht minimizes traffic. Once we are more certain +// of our implementation's robustness, we should lower this down to 8 or 4. +// +// Note there is also a tradeoff between the bootstrap period and the number +// of queries. We could support a higher period with a smaller number of +// queries +const DefaultBootstrapQueries = 16 + +// DefaultBootstrapPeriod specifies how often to periodically run bootstrap, +// if the user does not specify a different number as an option. +// +// For now, this is set to 10 seconds, which is an aggressive period. We are +// We are currently more interested in ensuring we have a properly formed +// DHT than making sure our dht minimizes traffic. Once we are more certain +// implementation's robustness, we should lower this down to 30s or 1m. +// +// Note there is also a tradeoff between the bootstrap period and the number +// of queries. We could support a higher period with a smaller number of +// queries +const DefaultBootstrapPeriod = time.Duration(10 * time.Second) + +// Bootstrap runs bootstrapping once, then calls SignalBootstrap with default +// parameters: DefaultBootstrapQueries and DefaultBootstrapPeriod. This allows +// the user to catch an error off the bat if the connections are faulty. It also +// allows BootstrapOnSignal not to run bootstrap at the beginning, which is useful +// for instrumenting it on tests, or delaying bootstrap until the network is online +// and connected to at least a few nodes. +// +// Like PeriodicBootstrap, Bootstrap returns a process, so the user can stop it. +func (dht *IpfsDHT) Bootstrap() (goprocess.Process, error) { + + if err := dht.runBootstrap(dht.Context(), DefaultBootstrapQueries); err != nil { + return nil, err + } + + sig := time.Tick(DefaultBootstrapPeriod) + return dht.BootstrapOnSignal(DefaultBootstrapQueries, sig) +} + +// SignalBootstrap ensures the dht routing table remains healthy as peers come and go. +// it builds up a list of peers by requesting random peer IDs. The Bootstrap +// process will run a number of queries each time, and run every time signal fires. +// These parameters are configurable. +// +// SignalBootstrap returns a process, so the user can stop it. +func (dht *IpfsDHT) BootstrapOnSignal(queries int, signal <-chan time.Time) (goprocess.Process, error) { + if queries <= 0 { + return nil, fmt.Errorf("invalid number of queries: %d", queries) + } + + if signal == nil { + return nil, fmt.Errorf("invalid signal: %v", signal) + } + + proc := goprocess.Go(func(worker goprocess.Process) { + for { + select { + case <-worker.Closing(): + log.Debug("dht bootstrapper shutting down") + return + + case <-signal: + // it would be useful to be able to send out signals of when we bootstrap, too... + // maybe this is a good case for whole module event pub/sub? + + ctx := dht.Context() + if err := dht.runBootstrap(ctx, queries); err != nil { + log.Error(err) + // A bootstrapping error is important to notice but not fatal. + // maybe the client should be able to consume these errors, + // though I dont have a clear use case in mind-- what **could** + // the client do if one of the bootstrap calls fails? + // + // This is also related to the core's bootstrap failures. + // superviseConnections should perhaps allow clients to detect + // bootstrapping problems. + // + // Anyway, passing errors could be done with a bootstrapper object. + // this would imply the client should be able to consume a lot of + // other non-fatal dht errors too. providing this functionality + // should be done correctly DHT-wide. + // NB: whatever the design, clients must ensure they drain errors! + // This pattern is common to many things, perhaps long-running services + // should have something like an ErrStream that allows clients to consume + // periodic errors and take action. It should allow the user to also + // ignore all errors with something like an ErrStreamDiscard. We should + // study what other systems do for ideas. + } + } + } + }) + + return proc, nil +} + +// runBootstrap builds up list of peers by requesting random peer IDs +func (dht *IpfsDHT) runBootstrap(ctx context.Context, queries int) error { + + var merr u.MultiErr + + randomID := func() peer.ID { + // 16 random bytes is not a valid peer id. it may be fine becuase + // the dht will rehash to its own keyspace anyway. + id := make([]byte, 16) + rand.Read(id) + return peer.ID(id) + } + + // bootstrap sequentially, as results will compound + runQuery := func(ctx context.Context, id peer.ID) { + p, err := dht.FindPeer(ctx, id) + if err == routing.ErrNotFound { + // this isn't an error. this is precisely what we expect. + } else if err != nil { + merr = append(merr, err) + } else { + // woah, actually found a peer with that ID? this shouldn't happen normally + // (as the ID we use is not a real ID). this is an odd error worth logging. + err := fmt.Errorf("Bootstrap peer error: Actually FOUND peer. (%s, %s)", id, p) + log.Errorf("%s", err) + merr = append(merr, err) + } + } + + sequential := true + if sequential { + // these should be parallel normally. but can make them sequential for debugging. + // note that the core/bootstrap context deadline should be extended too for that. + for i := 0; i < queries; i++ { + id := randomID() + log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, queries, id) + runQuery(ctx, id) + } + + } else { + // note on parallelism here: the context is passed in to the queries, so they + // **should** exit when it exceeds, making this function exit on ctx cancel. + // normally, we should be selecting on ctx.Done() here too, but this gets + // complicated to do with WaitGroup, and doesnt wait for the children to exit. + var wg sync.WaitGroup + for i := 0; i < queries; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + id := randomID() + log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, queries, id) + runQuery(ctx, id) + }() + } + wg.Wait() + } + + if len(merr) > 0 { + return merr + } + return nil +} diff --git a/routing/dht/dht_test.go b/routing/dht/dht_test.go index 07211f5fea1..afc5756e828 100644 --- a/routing/dht/dht_test.go +++ b/routing/dht/dht_test.go @@ -75,25 +75,20 @@ func connect(t *testing.T, ctx context.Context, a, b *IpfsDHT) { func bootstrap(t *testing.T, ctx context.Context, dhts []*IpfsDHT) { ctx, cancel := context.WithCancel(ctx) + log.Error("hmm") + defer log.Error("hmm end") + log.Debugf("bootstrapping dhts...") - rounds := 1 + // tried async. sequential fares much better. compare: + // 100 async https://gist.github.com/jbenet/56d12f0578d5f34810b2 + // 100 sync https://gist.github.com/jbenet/6c59e7c15426e48aaedd + // probably because results compound - for i := 0; i < rounds; i++ { - log.Debugf("bootstrapping round %d/%d\n", i, rounds) - - // tried async. sequential fares much better. compare: - // 100 async https://gist.github.com/jbenet/56d12f0578d5f34810b2 - // 100 sync https://gist.github.com/jbenet/6c59e7c15426e48aaedd - // probably because results compound - - start := rand.Intn(len(dhts)) // randomize to decrease bias. - for i := range dhts { - dht := dhts[(start+i)%len(dhts)] - log.Debugf("bootstrapping round %d/%d -- %s\n", i, rounds, dht.self) - dht.Bootstrap(ctx, 3) - } + start := rand.Intn(len(dhts)) // randomize to decrease bias. + for i := range dhts { + dht := dhts[(start+i)%len(dhts)] + dht.runBootstrap(ctx, 3) } - cancel() } @@ -235,6 +230,53 @@ func TestProvides(t *testing.T) { } } +// if minPeers or avgPeers is 0, dont test for it. +func waitForWellFormedTables(t *testing.T, dhts []*IpfsDHT, minPeers, avgPeers int, timeout time.Duration) bool { + // test "well-formed-ness" (>= minPeers peers in every routing table) + + checkTables := func() bool { + totalPeers := 0 + for _, dht := range dhts { + rtlen := dht.routingTable.Size() + totalPeers += rtlen + if minPeers > 0 && rtlen < minPeers { + t.Logf("routing table for %s only has %d peers (should have >%d)", dht.self, rtlen, minPeers) + return false + } + } + actualAvgPeers := totalPeers / len(dhts) + t.Logf("avg rt size: %d", actualAvgPeers) + if avgPeers > 0 && actualAvgPeers < avgPeers { + t.Logf("avg rt size: %d < %d", actualAvgPeers, avgPeers) + return false + } + return true + } + + timeoutA := time.After(timeout) + for { + select { + case <-timeoutA: + log.Error("did not reach well-formed routing tables by %s", timeout) + return false // failed + case <-time.After(5 * time.Millisecond): + if checkTables() { + return true // succeeded + } + } + } +} + +func printRoutingTables(dhts []*IpfsDHT) { + // the routing tables should be full now. let's inspect them. + fmt.Println("checking routing table of %d", len(dhts)) + for _, dht := range dhts { + fmt.Printf("checking routing table of %s\n", dht.self) + dht.routingTable.Print() + fmt.Println("") + } +} + func TestBootstrap(t *testing.T) { // t.Skip("skipping test to debug another") if testing.Short() { @@ -258,38 +300,105 @@ func TestBootstrap(t *testing.T) { } <-time.After(100 * time.Millisecond) - t.Logf("bootstrapping them so they find each other", nDHTs) - ctxT, _ := context.WithTimeout(ctx, 5*time.Second) - bootstrap(t, ctxT, dhts) + // bootstrap a few times until we get good tables. + stop := make(chan struct{}) + go func() { + for { + t.Logf("bootstrapping them so they find each other", nDHTs) + ctxT, _ := context.WithTimeout(ctx, 5*time.Second) + bootstrap(t, ctxT, dhts) + + select { + case <-time.After(50 * time.Millisecond): + continue // being explicit + case <-stop: + return + } + } + }() + + waitForWellFormedTables(t, dhts, 7, 10, 5*time.Second) + close(stop) if u.Debug { // the routing tables should be full now. let's inspect them. - <-time.After(5 * time.Second) - t.Logf("checking routing table of %d", nDHTs) - for _, dht := range dhts { - fmt.Printf("checking routing table of %s\n", dht.self) - dht.routingTable.Print() - fmt.Println("") + printRoutingTables(dhts) + } +} + +func TestPeriodicBootstrap(t *testing.T) { + // t.Skip("skipping test to debug another") + if testing.Short() { + t.SkipNow() + } + + ctx := context.Background() + + nDHTs := 30 + _, _, dhts := setupDHTS(ctx, nDHTs, t) + defer func() { + for i := 0; i < nDHTs; i++ { + dhts[i].Close() + defer dhts[i].host.Close() + } + }() + + // signal amplifier + amplify := func(signal chan time.Time, other []chan time.Time) { + for t := range signal { + for _, s := range other { + s <- t + } + } + for _, s := range other { + close(s) } } - // test "well-formed-ness" (>= 3 peers in every routing table) - avgsize := 0 + signal := make(chan time.Time) + allSignals := []chan time.Time{} + + // kick off periodic bootstrappers with instrumented signals. + for _, dht := range dhts { + s := make(chan time.Time) + allSignals = append(allSignals, s) + dht.BootstrapOnSignal(5, s) + } + go amplify(signal, allSignals) + + t.Logf("dhts are not connected.", nDHTs) + for _, dht := range dhts { + rtlen := dht.routingTable.Size() + if rtlen > 0 { + t.Errorf("routing table for %s should have 0 peers. has %d", dht.self, rtlen) + } + } + + for i := 0; i < nDHTs; i++ { + connect(t, ctx, dhts[i], dhts[(i+1)%len(dhts)]) + } + + t.Logf("dhts are now connected to 1-2 others.", nDHTs) for _, dht := range dhts { rtlen := dht.routingTable.Size() - avgsize += rtlen - t.Logf("routing table for %s has %d peers", dht.self, rtlen) - if rtlen < 4 { - // currently, we dont have good bootstrapping guarantees. - // t.Errorf("routing table for %s only has %d peers", dht.self, rtlen) + if rtlen > 2 { + t.Errorf("routing table for %s should have at most 2 peers. has %d", dht.self, rtlen) } } - avgsize = avgsize / len(dhts) - avgsizeExpected := 6 - t.Logf("avg rt size: %d", avgsize) - if avgsize < avgsizeExpected { - t.Errorf("avg rt size: %d < %d", avgsize, avgsizeExpected) + if u.Debug { + printRoutingTables(dhts) + } + + t.Logf("bootstrapping them so they find each other", nDHTs) + signal <- time.Now() + + // this is async, and we dont know when it's finished with one cycle, so keep checking + // until the routing tables look better, or some long timeout for the failure case. + waitForWellFormedTables(t, dhts, 7, 10, 5*time.Second) + + if u.Debug { + printRoutingTables(dhts) } } @@ -319,7 +428,6 @@ func TestProvidesMany(t *testing.T) { if u.Debug { // the routing tables should be full now. let's inspect them. - <-time.After(5 * time.Second) t.Logf("checking routing table of %d", nDHTs) for _, dht := range dhts { fmt.Printf("checking routing table of %s\n", dht.self) From c64338a816d52b737b3a79f804a078eda8ad7635 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sat, 17 Jan 2015 14:54:36 -0800 Subject: [PATCH 02/20] diag/net: io must respect timeout ctx MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See the discussion below. A future commit will implement the closer change below, and rebase this one on top. <•jbenet> `n.Diagnostics.GetDiagnostic(time.Second * 20)` is not being respected. should it use a context instead? or is it a timeout because the timeout is sent to other nodes? <•jbenet> oh it's that the io doesnt respect the context so we're stuck waiting for responses. <•jbenet> this is that complex interface point between the world of contexts, and the world of io. ctxutil.Reader/Writer is made for this, but you have to make sure to defer close the stream. (see how dht_net uses it). i'd love to find a safer interface. not sure what it is, but we have to a) respect contexts, and b) allow using standard io.Reader/Writers. Maybe TRTTD <•jbenet> is have ctxutil.Reader/Writer take ReadCloser and WriteClosers and always close them. the user _must_ pass an ioutil. NopCloser to avoid ctxutil closing on you when you dont want it to. <•jbenet> this seems safer to me in the general case. --- diagnostics/diag.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/diagnostics/diag.go b/diagnostics/diag.go index a14f7d239f0..f745e2c2810 100644 --- a/diagnostics/diag.go +++ b/diagnostics/diag.go @@ -16,6 +16,7 @@ import ( "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" ggio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/io" "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" + ctxutil "github.com/jbenet/go-ipfs/util/ctx" host "github.com/jbenet/go-ipfs/p2p/host" inet "github.com/jbenet/go-ipfs/p2p/net" @@ -220,8 +221,10 @@ func (d *Diagnostics) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Messa } defer s.Close() - r := ggio.NewDelimitedReader(s, inet.MessageSizeMax) - w := ggio.NewDelimitedWriter(s) + cr := ctxutil.NewReader(ctx, s) // ok to use. we defer close stream in this func + cw := ctxutil.NewWriter(ctx, s) // ok to use. we defer close stream in this func + r := ggio.NewDelimitedReader(cr, inet.MessageSizeMax) + w := ggio.NewDelimitedWriter(cw) start := time.Now() From 898b9696ca47545df781b214ad7d0623320b81cb Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sat, 17 Jan 2015 16:44:29 -0800 Subject: [PATCH 03/20] diag/net: add timeout param to cmd --- core/commands/diag.go | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/core/commands/diag.go b/core/commands/diag.go index ca42858710c..98790f67400 100644 --- a/core/commands/diag.go +++ b/core/commands/diag.go @@ -36,6 +36,8 @@ type DiagnosticOutput struct { Peers []DiagnosticPeer } +var DefaultDiagnosticTimeout = time.Second * 20 + var DiagCmd = &cmds.Command{ Helptext: cmds.HelpText{ Tagline: "Generates diagnostic reports", @@ -57,6 +59,7 @@ connected peers and latencies between them. }, Options: []cmds.Option{ + cmds.StringOption("timeout", "diagnostic timeout duration"), cmds.StringOption("vis", "output vis. one of: "+strings.Join(visFmts, ", ")), }, @@ -75,7 +78,20 @@ connected peers and latencies between them. return nil, err } - info, err := n.Diagnostics.GetDiagnostic(time.Second * 20) + timeoutS, _, err := req.Option("timeout").String() + if err != nil { + return nil, err + } + timeout := DefaultDiagnosticTimeout + if timeoutS != "" { + t, err := time.ParseDuration(timeoutS) + if err != nil { + return nil, cmds.ClientError("error parsing timeout") + } + timeout = t + } + + info, err := n.Diagnostics.GetDiagnostic(timeout) if err != nil { return nil, err } From f6278735ef6ad4e8f3fda47ac6c58f33c46d248e Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sat, 17 Jan 2015 16:44:56 -0800 Subject: [PATCH 04/20] net/diag: recursively decrement timeouts. Not sure this works. we dont have tests for net diag. We should make some. cc @whyrusleeping. --- .../jbenet/go-logging/examples/example.go | 2 +- diagnostics/diag.go | 104 +++++++++--------- diagnostics/internal/pb/diagnostics.pb.go | 12 +- diagnostics/internal/pb/diagnostics.proto | 1 + diagnostics/internal/pb/timeout.go | 14 +++ 5 files changed, 78 insertions(+), 55 deletions(-) create mode 100644 diagnostics/internal/pb/timeout.go diff --git a/Godeps/_workspace/src/github.com/jbenet/go-logging/examples/example.go b/Godeps/_workspace/src/github.com/jbenet/go-logging/examples/example.go index 28168d00e22..0ad8a7beea8 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-logging/examples/example.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-logging/examples/example.go @@ -3,7 +3,7 @@ package main import ( "os" - "github.com/op/go-logging" + "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-logging" ) var log = logging.MustGetLogger("example") diff --git a/diagnostics/diag.go b/diagnostics/diag.go index f745e2c2810..c54b7dd5164 100644 --- a/diagnostics/diag.go +++ b/diagnostics/diag.go @@ -7,6 +7,7 @@ import ( "bytes" "encoding/json" "errors" + "fmt" "io" "sync" "time" @@ -33,6 +34,7 @@ var log = util.Logger("diagnostics") var ProtocolDiag protocol.ID = "/ipfs/diagnostics" const ResponseTimeout = time.Second * 10 +const HopTimeoutDecrement = time.Second * 2 // Diagnostics is a net service that manages requesting and responding to diagnostic // requests @@ -149,39 +151,24 @@ func (d *Diagnostics) GetDiagnostic(timeout time.Duration) ([]*DiagInfo, error) peers := d.getPeers() log.Debugf("Sending diagnostic request to %d peers.", len(peers)) - var out []*DiagInfo - di := d.getDiagInfo() - out = append(out, di) - pmes := newMessage(diagID) - respdata := make(chan []byte) - sends := 0 - for p, _ := range peers { - log.Debugf("Sending getDiagnostic to: %s", p) - sends++ - go func(p peer.ID) { - data, err := d.getDiagnosticFromPeer(ctx, p, pmes) - if err != nil { - log.Errorf("GetDiagnostic error: %v", err) - respdata <- nil - return - } - respdata <- data - }(p) + pmes.SetTimeoutDuration(timeout - HopTimeoutDecrement) // decrease timeout per hop + dpeers, err := d.getDiagnosticFromPeers(ctx, d.getPeers(), pmes) + if err != nil { + return nil, fmt.Errorf("diagnostic from peers err: %s", err) } - for i := 0; i < sends; i++ { - data := <-respdata - if data == nil { - continue - } - out = appendDiagnostics(data, out) + var out []*DiagInfo + di := d.getDiagInfo() + out = append(out, di) + for _, dpi := range dpeers { + out = appendDiagnostics(out, dpi) } return out, nil } -func appendDiagnostics(data []byte, cur []*DiagInfo) []*DiagInfo { +func appendDiagnostics(cur []*DiagInfo, data []byte) []*DiagInfo { buf := bytes.NewBuffer(data) dec := json.NewDecoder(buf) for { @@ -198,6 +185,38 @@ func appendDiagnostics(data []byte, cur []*DiagInfo) []*DiagInfo { return cur } +func (d *Diagnostics) getDiagnosticFromPeers(ctx context.Context, peers map[peer.ID]int, pmes *pb.Message) ([][]byte, error) { + timeout := pmes.GetTimeoutDuration() + if timeout < 1 { + return nil, fmt.Errorf("timeout too short: %s", timeout) + } + ctx, _ = context.WithTimeout(ctx, timeout) + + respdata := make(chan []byte) + sendcount := 0 + for p, _ := range peers { + log.Debugf("Sending diagnostic request to peer: %s", p) + sendcount++ + go func(p peer.ID) { + out, err := d.getDiagnosticFromPeer(ctx, p, pmes) + if err != nil { + log.Errorf("getDiagnostic error: %v", err) + respdata <- nil + return + } + respdata <- out + }(p) + } + + outall := make([][]byte, 0, len(peers)) + for i := 0; i < sendcount; i++ { + out := <-respdata + outall = append(outall, out) + } + + return outall, nil +} + // TODO: this method no longer needed. func (d *Diagnostics) getDiagnosticFromPeer(ctx context.Context, p peer.ID, mes *pb.Message) ([]byte, error) { rpmes, err := d.sendRequest(ctx, p, mes) @@ -259,38 +278,17 @@ func (d *Diagnostics) handleDiagnostic(p peer.ID, pmes *pb.Message) (*pb.Message d.diagMap[pmes.GetDiagID()] = time.Now() d.diagLock.Unlock() - buf := new(bytes.Buffer) di := d.getDiagInfo() - buf.Write(di.Marshal()) - - ctx, _ := context.WithTimeout(context.TODO(), ResponseTimeout) - - respdata := make(chan []byte) - sendcount := 0 - for p, _ := range d.getPeers() { - log.Debugf("Sending diagnostic request to peer: %s", p) - sendcount++ - go func(p peer.ID) { - out, err := d.getDiagnosticFromPeer(ctx, p, pmes) - if err != nil { - log.Errorf("getDiagnostic error: %v", err) - respdata <- nil - return - } - respdata <- out - }(p) - } - - for i := 0; i < sendcount; i++ { - out := <-respdata - _, err := buf.Write(out) - if err != nil { - log.Errorf("getDiagnostic write output error: %v", err) - continue + resp.Data = di.Marshal() + dpeers, err := d.getDiagnosticFromPeers(context.TODO(), d.getPeers(), pmes) + if err != nil { + log.Errorf("diagnostic from peers err: %s", err) + } else { + for _, b := range dpeers { + resp.Data = append(resp.Data, b...) // concatenate them all. } } - resp.Data = buf.Bytes() return resp, nil } diff --git a/diagnostics/internal/pb/diagnostics.pb.go b/diagnostics/internal/pb/diagnostics.pb.go index 4aa721711eb..0da512c3ec2 100644 --- a/diagnostics/internal/pb/diagnostics.pb.go +++ b/diagnostics/internal/pb/diagnostics.pb.go @@ -14,15 +14,18 @@ It has these top-level messages: package diagnostics_pb import proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/proto" +import json "encoding/json" import math "math" -// Reference imports to suppress errors if they are not otherwise used. +// Reference proto, json, and math imports to suppress error if they are not otherwise used. var _ = proto.Marshal +var _ = &json.SyntaxError{} var _ = math.Inf type Message struct { DiagID *string `protobuf:"bytes,1,req" json:"DiagID,omitempty"` Data []byte `protobuf:"bytes,2,opt" json:"Data,omitempty"` + Timeout *int64 `protobuf:"varint,3,opt" json:"Timeout,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -44,5 +47,12 @@ func (m *Message) GetData() []byte { return nil } +func (m *Message) GetTimeout() int64 { + if m != nil && m.Timeout != nil { + return *m.Timeout + } + return 0 +} + func init() { } diff --git a/diagnostics/internal/pb/diagnostics.proto b/diagnostics/internal/pb/diagnostics.proto index 3ffe2f523ce..2202f7c246b 100644 --- a/diagnostics/internal/pb/diagnostics.proto +++ b/diagnostics/internal/pb/diagnostics.proto @@ -3,4 +3,5 @@ package diagnostics.pb; message Message { required string DiagID = 1; optional bytes Data = 2; + optional int64 Timeout = 3; // in nanoseconds } diff --git a/diagnostics/internal/pb/timeout.go b/diagnostics/internal/pb/timeout.go new file mode 100644 index 00000000000..f2043c0e7c3 --- /dev/null +++ b/diagnostics/internal/pb/timeout.go @@ -0,0 +1,14 @@ +package diagnostics_pb + +import ( + "time" +) + +func (m *Message) GetTimeoutDuration() time.Duration { + return time.Duration(m.GetTimeout()) +} + +func (m *Message) SetTimeoutDuration(t time.Duration) { + it := int64(t) + m.Timeout = &it +} From 65b657e0ea82a1d3fee66a48529d46b56d7ab1e6 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sun, 18 Jan 2015 04:53:02 +0000 Subject: [PATCH 05/20] stream back diagnostic responses as they are received --- diagnostics/diag.go | 210 ++++++++++++++++++++++---------------------- 1 file changed, 106 insertions(+), 104 deletions(-) diff --git a/diagnostics/diag.go b/diagnostics/diag.go index c54b7dd5164..dbdf2be8613 100644 --- a/diagnostics/diag.go +++ b/diagnostics/diag.go @@ -4,11 +4,9 @@ package diagnostics import ( - "bytes" "encoding/json" "errors" "fmt" - "io" "sync" "time" @@ -33,6 +31,8 @@ var log = util.Logger("diagnostics") // ProtocolDiag is the diagnostics protocol.ID var ProtocolDiag protocol.ID = "/ipfs/diagnostics" +var ErrAlreadyRunning = errors.New("diagnostic with that ID already running") + const ResponseTimeout = time.Second * 10 const HopTimeoutDecrement = time.Second * 2 @@ -159,86 +159,56 @@ func (d *Diagnostics) GetDiagnostic(timeout time.Duration) ([]*DiagInfo, error) return nil, fmt.Errorf("diagnostic from peers err: %s", err) } - var out []*DiagInfo di := d.getDiagInfo() - out = append(out, di) - for _, dpi := range dpeers { - out = appendDiagnostics(out, dpi) + out := []*DiagInfo{di} + for dpi := range dpeers { + out = append(out, dpi) } return out, nil } -func appendDiagnostics(cur []*DiagInfo, data []byte) []*DiagInfo { - buf := bytes.NewBuffer(data) - dec := json.NewDecoder(buf) - for { - di := new(DiagInfo) - err := dec.Decode(di) - if err != nil { - if err != io.EOF { - log.Errorf("error decoding DiagInfo: %v", err) - } - break - } - cur = append(cur, di) +func decodeDiagJson(data []byte) (*DiagInfo, error) { + di := new(DiagInfo) + err := json.Unmarshal(data, di) + if err != nil { + return nil, err } - return cur -} -func (d *Diagnostics) getDiagnosticFromPeers(ctx context.Context, peers map[peer.ID]int, pmes *pb.Message) ([][]byte, error) { - timeout := pmes.GetTimeoutDuration() - if timeout < 1 { - return nil, fmt.Errorf("timeout too short: %s", timeout) - } - ctx, _ = context.WithTimeout(ctx, timeout) + return di, nil +} - respdata := make(chan []byte) - sendcount := 0 +func (d *Diagnostics) getDiagnosticFromPeers(ctx context.Context, peers map[peer.ID]int, pmes *pb.Message) (<-chan *DiagInfo, error) { + respdata := make(chan *DiagInfo) + wg := sync.WaitGroup{} for p, _ := range peers { + wg.Add(1) log.Debugf("Sending diagnostic request to peer: %s", p) - sendcount++ go func(p peer.ID) { + defer wg.Done() out, err := d.getDiagnosticFromPeer(ctx, p, pmes) if err != nil { - log.Errorf("getDiagnostic error: %v", err) - respdata <- nil + log.Errorf("Error getting diagnostic from %s: %s", p, err) return } - respdata <- out + for d := range out { + respdata <- d + } }(p) } - outall := make([][]byte, 0, len(peers)) - for i := 0; i < sendcount; i++ { - out := <-respdata - outall = append(outall, out) - } - - return outall, nil -} - -// TODO: this method no longer needed. -func (d *Diagnostics) getDiagnosticFromPeer(ctx context.Context, p peer.ID, mes *pb.Message) ([]byte, error) { - rpmes, err := d.sendRequest(ctx, p, mes) - if err != nil { - return nil, err - } - return rpmes.GetData(), nil -} + go func() { + wg.Wait() + close(respdata) + }() -func newMessage(diagID string) *pb.Message { - pmes := new(pb.Message) - pmes.DiagID = proto.String(diagID) - return pmes + return respdata, nil } -func (d *Diagnostics) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) { - +func (d *Diagnostics) getDiagnosticFromPeer(ctx context.Context, p peer.ID, pmes *pb.Message) (<-chan *DiagInfo, error) { s, err := d.host.NewStream(ProtocolDiag, p) if err != nil { return nil, err } - defer s.Close() cr := ctxutil.NewReader(ctx, s) // ok to use. we defer close stream in this func cw := ctxutil.NewWriter(ctx, s) // ok to use. we defer close stream in this func @@ -251,51 +221,57 @@ func (d *Diagnostics) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Messa return nil, err } - rpmes := new(pb.Message) - if err := r.ReadMsg(rpmes); err != nil { - return nil, err - } - if rpmes == nil { - return nil, errors.New("no response to request") - } + out := make(chan *DiagInfo) + go func() { - rtt := time.Since(start) - log.Infof("diagnostic request took: %s", rtt.String()) - return rpmes, nil -} + defer func() { + close(out) + s.Close() + rtt := time.Since(start) + log.Infof("diagnostic request took: %s", rtt.String()) + }() -func (d *Diagnostics) handleDiagnostic(p peer.ID, pmes *pb.Message) (*pb.Message, error) { - log.Debugf("HandleDiagnostic from %s for id = %s", p, util.Key(pmes.GetDiagID()).B58String()) - resp := newMessage(pmes.GetDiagID()) + for { + rpmes := new(pb.Message) + if err := r.ReadMsg(rpmes); err != nil { + log.Errorf("Error reading diagnostic from stream: %s", err) + return + } + if rpmes == nil { + log.Error("Got no response back from diag request.") + return + } - // Make sure we havent already handled this request to prevent loops - d.diagLock.Lock() - _, found := d.diagMap[pmes.GetDiagID()] - if found { - d.diagLock.Unlock() - return resp, nil - } - d.diagMap[pmes.GetDiagID()] = time.Now() - d.diagLock.Unlock() + di, err := decodeDiagJson(rpmes.GetData()) + if err != nil { + log.Error(err) + return + } - di := d.getDiagInfo() - resp.Data = di.Marshal() - dpeers, err := d.getDiagnosticFromPeers(context.TODO(), d.getPeers(), pmes) - if err != nil { - log.Errorf("diagnostic from peers err: %s", err) - } else { - for _, b := range dpeers { - resp.Data = append(resp.Data, b...) // concatenate them all. + select { + case out <- di: + case <-ctx.Done(): + return + } } - } - return resp, nil + }() + + return out, nil +} + +func newMessage(diagID string) *pb.Message { + pmes := new(pb.Message) + pmes.DiagID = proto.String(diagID) + return pmes } func (d *Diagnostics) HandleMessage(ctx context.Context, s inet.Stream) error { - r := ggio.NewDelimitedReader(s, 32768) // maxsize - w := ggio.NewDelimitedWriter(s) + cr := ctxutil.NewReader(ctx, s) + cw := ctxutil.NewWriter(ctx, s) + r := ggio.NewDelimitedReader(cr, inet.MessageSizeMax) // maxsize + w := ggio.NewDelimitedWriter(cw) // deserialize msg pmes := new(pb.Message) @@ -308,25 +284,51 @@ func (d *Diagnostics) HandleMessage(ctx context.Context, s inet.Stream) error { log.Infof("[peer: %s] Got message from [%s]\n", d.self.Pretty(), s.Conn().RemotePeer()) - // dispatch handler. - p := s.Conn().RemotePeer() - rpmes, err := d.handleDiagnostic(p, pmes) - if err != nil { - log.Errorf("handleDiagnostic error: %s", err) + // Make sure we havent already handled this request to prevent loops + if err := d.startDiag(pmes.GetDiagID()); err != nil { return nil } - // if nil response, return it before serializing - if rpmes == nil { - return nil + resp := newMessage(pmes.GetDiagID()) + resp.Data = d.getDiagInfo().Marshal() + if err := w.WriteMsg(resp); err != nil { + log.Errorf("Failed to write protobuf message over stream: %s", err) + return err } - // serialize + send response msg - if err := w.WriteMsg(rpmes); err != nil { - log.Errorf("Failed to encode protobuf message: %v", err) - return nil + timeout := pmes.GetTimeoutDuration() + if timeout < HopTimeoutDecrement { + return fmt.Errorf("timeout too short: %s", timeout) + } + ctx, _ = context.WithTimeout(ctx, timeout) + pmes.SetTimeoutDuration(timeout - HopTimeoutDecrement) + + dpeers, err := d.getDiagnosticFromPeers(ctx, d.getPeers(), pmes) + if err != nil { + log.Errorf("diagnostic from peers err: %s", err) + return err } + for b := range dpeers { + resp := newMessage(pmes.GetDiagID()) + resp.Data = b.Marshal() + if err := w.WriteMsg(resp); err != nil { + log.Errorf("Failed to write protobuf message over stream: %s", err) + return err + } + } + + return nil +} +func (d *Diagnostics) startDiag(id string) error { + d.diagLock.Lock() + _, found := d.diagMap[id] + if found { + d.diagLock.Unlock() + return ErrAlreadyRunning + } + d.diagMap[id] = time.Now() + d.diagLock.Unlock() return nil } From 8966743ebd1c9bedcc354140c63ab3736d6a32d6 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sat, 17 Jan 2015 20:02:58 -0800 Subject: [PATCH 06/20] try less aggressive bootstrap --- routing/dht/dht_bootstrap.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/routing/dht/dht_bootstrap.go b/routing/dht/dht_bootstrap.go index 271fa747493..6efd53d3ae9 100644 --- a/routing/dht/dht_bootstrap.go +++ b/routing/dht/dht_bootstrap.go @@ -27,7 +27,7 @@ import ( // Note there is also a tradeoff between the bootstrap period and the number // of queries. We could support a higher period with a smaller number of // queries -const DefaultBootstrapQueries = 16 +const DefaultBootstrapQueries = 1 // DefaultBootstrapPeriod specifies how often to periodically run bootstrap, // if the user does not specify a different number as an option. From ec848c486b5f8fd45e20983a1b0767558103922b Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sun, 18 Jan 2015 00:56:36 -0800 Subject: [PATCH 07/20] core: call dht bootstrap --- core/bootstrap.go | 23 +++++++++-------- core/core.go | 65 ++++++++++++++++++++++++++++++----------------- 2 files changed, 53 insertions(+), 35 deletions(-) diff --git a/core/bootstrap.go b/core/bootstrap.go index e52188b5029..f8da807bf71 100644 --- a/core/bootstrap.go +++ b/core/bootstrap.go @@ -31,6 +31,8 @@ func superviseConnections(parent context.Context, store peer.Peerstore, peers []peer.PeerInfo) error { + var dhtAlreadyBootstrapping bool + for { ctx, _ := context.WithTimeout(parent, connectiontimeout) // TODO get config from disk so |peers| always reflects the latest @@ -38,6 +40,14 @@ func superviseConnections(parent context.Context, if err := bootstrap(ctx, h, route, store, peers); err != nil { log.Error(err) } + + if !dhtAlreadyBootstrapping { + dhtAlreadyBootstrapping = true // only call dht.Bootstrap once. + if _, err := route.Bootstrap(); err != nil { + log.Error(err) + } + } + select { case <-parent.Done(): return parent.Err() @@ -56,7 +66,7 @@ func bootstrap(ctx context.Context, connectedPeers := h.Network().Peers() if len(connectedPeers) >= recoveryThreshold { log.Event(ctx, "bootstrapSkip", h.ID()) - log.Debugf("%s bootstrap skipped -- connected to %d (> %d) nodes", + log.Debugf("%s core bootstrap skipped -- connected to %d (> %d) nodes", h.ID(), len(connectedPeers), recoveryThreshold) return nil @@ -64,7 +74,7 @@ func bootstrap(ctx context.Context, numCxnsToCreate := recoveryThreshold - len(connectedPeers) log.Event(ctx, "bootstrapStart", h.ID()) - log.Debugf("%s bootstrapping to %d more nodes", h.ID(), numCxnsToCreate) + log.Debugf("%s core bootstrapping to %d more nodes", h.ID(), numCxnsToCreate) var notConnected []peer.PeerInfo for _, p := range bootstrapPeers { @@ -83,15 +93,6 @@ func bootstrap(ctx context.Context, return err } } - - // we can try running dht bootstrap even if we're connected to all bootstrap peers. - if len(h.Network().Conns()) > 0 { - if _, err := r.Bootstrap(); err != nil { - // log this as Info. later on, discern better between errors. - log.Infof("dht bootstrap err: %s", err) - return nil - } - } return nil } diff --git a/core/core.go b/core/core.go index dd922b7afd8..3ac954ed33e 100644 --- a/core/core.go +++ b/core/core.go @@ -235,29 +235,17 @@ func (n *IpfsNode) StartOnlineServices(ctx context.Context) error { // TODO implement an offline namesys that serves only local names. n.Namesys = namesys.NewNameSystem(n.Routing) - // TODO consider moving connection supervision into the Network. We've - // discussed improvements to this Node constructor. One improvement - // would be to make the node configurable, allowing clients to inject - // an Exchange, Network, or Routing component and have the constructor - // manage the wiring. In that scenario, this dangling function is a bit - // awkward. - var bootstrapPeers []peer.PeerInfo - for _, bootstrap := range n.Repo.Config().Bootstrap { - p, err := toPeer(bootstrap) - if err != nil { - log.Event(ctx, "bootstrapError", n.Identity, lgbl.Error(err)) - log.Errorf("%s bootstrap error: %s", n.Identity, err) - return err - } - bootstrapPeers = append(bootstrapPeers, p) - } - - go superviseConnections(ctx, n.PeerHost, dhtRouting, n.Peerstore, bootstrapPeers) - n.Reprovider = rp.NewReprovider(n.Routing, n.Blockstore) go n.Reprovider.ProvideEvery(ctx, kReprovideFrequency) - return nil + // prepare bootstrap peers from config + bpeers, err := n.loadBootstrapPeers() + if err != nil { + log.Event(ctx, "bootstrapError", n.Identity, lgbl.Error(err)) + log.Errorf("%s bootstrap error: %s", n.Identity, err) + return debugerror.Wrap(err) + } + return n.Bootstrap(ctx, bpeers) } // teardown closes owned children. If any errors occur, this function returns @@ -310,11 +298,28 @@ func (n *IpfsNode) Bootstrap(ctx context.Context, peers []peer.PeerInfo) error { // TODO what should return value be when in offlineMode? - if n.Routing != nil { - if dht, ok := n.Routing.(*dht.IpfsDHT); ok { - return bootstrap(ctx, n.PeerHost, dht, n.Peerstore, peers) - } + if n.Routing == nil { + return nil + } + + // TODO what bootstrapping should happen if there is no DHT? i.e. we could + // continue connecting to our bootstrap peers, but for what purpose? + dhtRouting, ok := n.Routing.(*dht.IpfsDHT) + if !ok { + return nil } + + // TODO consider moving connection supervision into the Network. We've + // discussed improvements to this Node constructor. One improvement + // would be to make the node configurable, allowing clients to inject + // an Exchange, Network, or Routing component and have the constructor + // manage the wiring. In that scenario, this dangling function is a bit + // awkward. + + // spin off the node's connection supervisor. + // TODO, clean up how this thing works. Make the superviseConnections thing + // work like the DHT.Bootstrap. + go superviseConnections(ctx, n.PeerHost, dhtRouting, n.Peerstore, peers) return nil } @@ -355,6 +360,18 @@ func (n *IpfsNode) loadPrivateKey() error { return nil } +func (n *IpfsNode) loadBootstrapPeers() ([]peer.PeerInfo, error) { + var peers []peer.PeerInfo + for _, bootstrap := range n.Repo.Config().Bootstrap { + p, err := toPeer(bootstrap) + if err != nil { + return nil, err + } + peers = append(peers, p) + } + return peers, nil +} + // SetupOfflineRouting loads the local nodes private key and // uses it to instantiate a routing system in offline mode. // This is primarily used for offline ipns modifications. From 1493c9d80ab12ca5549006cc65a1a7a8105aa88e Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sun, 18 Jan 2015 00:58:34 -0800 Subject: [PATCH 08/20] dht/bootstrap: logging --- routing/dht/dht_bootstrap.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/routing/dht/dht_bootstrap.go b/routing/dht/dht_bootstrap.go index 6efd53d3ae9..7ee82fbefcf 100644 --- a/routing/dht/dht_bootstrap.go +++ b/routing/dht/dht_bootstrap.go @@ -42,6 +42,10 @@ const DefaultBootstrapQueries = 1 // queries const DefaultBootstrapPeriod = time.Duration(10 * time.Second) +// DefaultBootstrapTimeout specifies how long to wait for a bootstrap query +// to run. +const DefaultBootstrapTimeout = time.Duration(10 * time.Second) + // Bootstrap runs bootstrapping once, then calls SignalBootstrap with default // parameters: DefaultBootstrapQueries and DefaultBootstrapPeriod. This allows // the user to catch an error off the bat if the connections are faulty. It also @@ -76,10 +80,10 @@ func (dht *IpfsDHT) BootstrapOnSignal(queries int, signal <-chan time.Time) (gop } proc := goprocess.Go(func(worker goprocess.Process) { + defer log.Debug("dht bootstrapper shutting down") for { select { case <-worker.Closing(): - log.Debug("dht bootstrapper shutting down") return case <-signal: @@ -118,6 +122,12 @@ func (dht *IpfsDHT) BootstrapOnSignal(queries int, signal <-chan time.Time) (gop // runBootstrap builds up list of peers by requesting random peer IDs func (dht *IpfsDHT) runBootstrap(ctx context.Context, queries int) error { + bslog := func(msg string) { + log.Debugf("DHT %s dhtRunBootstrap %s -- routing table size: %d", dht.self, msg, dht.routingTable.Size()) + } + bslog("start") + defer bslog("end") + defer log.EventBegin(ctx, "dhtRunBootstrap").Done() var merr u.MultiErr From 9cd975ce36929db905ec95736116b4d1765710d5 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sun, 18 Jan 2015 00:58:56 -0800 Subject: [PATCH 09/20] dht/bootstrap: timeout queries --- routing/dht/dht_bootstrap.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/routing/dht/dht_bootstrap.go b/routing/dht/dht_bootstrap.go index 7ee82fbefcf..095c194d676 100644 --- a/routing/dht/dht_bootstrap.go +++ b/routing/dht/dht_bootstrap.go @@ -140,6 +140,8 @@ func (dht *IpfsDHT) runBootstrap(ctx context.Context, queries int) error { } // bootstrap sequentially, as results will compound + ctx, cancel := context.WithTimeout(ctx, DefaultBootstrapTimeout) + defer cancel() runQuery := func(ctx context.Context, id peer.ID) { p, err := dht.FindPeer(ctx, id) if err == routing.ErrNotFound { From 486536149e651d3dfeacbf137f43623ff9319308 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sun, 18 Jan 2015 00:59:22 -0800 Subject: [PATCH 10/20] dht/query: err return NotFound case When some queries finished, but we got no result, it should be a simple NotFoundError. Only when every single query ended in error do we externalize those to the client, in case something major is going wrong --- routing/dht/ext_test.go | 10 ++++++++++ routing/dht/query.go | 10 +++++++--- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/routing/dht/ext_test.go b/routing/dht/ext_test.go index e151af5d246..ab756b5e4d6 100644 --- a/routing/dht/ext_test.go +++ b/routing/dht/ext_test.go @@ -49,6 +49,10 @@ func TestGetFailures(t *testing.T) { // u.POut("Timout Test\n") ctx1, _ := context.WithTimeout(context.Background(), 200*time.Millisecond) if _, err := d.GetValue(ctx1, u.Key("test")); err != nil { + if merr, ok := err.(u.MultiErr); ok && len(merr) > 0 { + err = merr[0] + } + if err != context.DeadlineExceeded { t.Fatal("Got different error than we expected", err) } @@ -86,6 +90,9 @@ func TestGetFailures(t *testing.T) { ctx2, _ := context.WithTimeout(context.Background(), 20*time.Second) _, err = d.GetValue(ctx2, u.Key("test")) if err != nil { + if merr, ok := err.(u.MultiErr); ok && len(merr) > 0 { + err = merr[0] + } if err != routing.ErrNotFound { t.Fatalf("Expected ErrNotFound, got: %s", err) } @@ -202,6 +209,9 @@ func TestNotFound(t *testing.T) { v, err := d.GetValue(ctx, u.Key("hello")) log.Debugf("get value got %v", v) if err != nil { + if merr, ok := err.(u.MultiErr); ok && len(merr) > 0 { + err = merr[0] + } switch err { case routing.ErrNotFound: //Success! diff --git a/routing/dht/query.go b/routing/dht/query.go index 53c232323c7..687d2621fd6 100644 --- a/routing/dht/query.go +++ b/routing/dht/query.go @@ -62,7 +62,7 @@ type dhtQueryRunner struct { peersRemaining todoctr.Counter // peersToQuery + currently processing result *dhtQueryResult // query result - errs []error // result errors. maybe should be a map[peer.ID]error + errs u.MultiErr // result errors. maybe should be a map[peer.ID]error rateLimit chan struct{} // processing semaphore log eventlog.EventLogger @@ -122,8 +122,12 @@ func (r *dhtQueryRunner) Run(peers []peer.ID) (*dhtQueryResult, error) { r.RLock() defer r.RUnlock() - if len(r.errs) > 0 { - err = r.errs[0] // take the first? + err = routing.ErrNotFound + + // if every query to every peer failed, something must be very wrong. + if len(r.errs) > 0 && len(r.errs) == r.peersSeen.Size() { + log.Debugf("query errs: %s", r.errs) + err = r.errs[0] } case <-r.cg.Closed(): From 5259cf06d64d9e9ecf9a6ffd8c8c6752ed1e8360 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sun, 18 Jan 2015 01:00:25 -0800 Subject: [PATCH 11/20] dht: kick off all the queries wit every node in our rt s/kademlia calls for makign sure to query all peers we have in our routing table, not just those closest. this helps ensure most queries resolve properly. --- routing/dht/routing.go | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/routing/dht/routing.go b/routing/dht/routing.go index 07d9ddc431f..2054e03fd0c 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -88,9 +88,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) { // get closest peers in the routing table rtp := dht.routingTable.ListPeers() log.Debugf("peers in rt: %s", len(rtp), rtp) - - closest := dht.routingTable.NearestPeers(kb.ConvertKey(key), PoolSize) - if closest == nil || len(closest) == 0 { + if len(rtp) == 0 { log.Warning("No peers from routing table!") return nil, errors.Wrap(kb.ErrLookupFailure) } @@ -111,7 +109,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) { }) // run it! - result, err := query.Run(ctx, closest) + result, err := query.Run(ctx, rtp) if err != nil { return nil, err } @@ -170,7 +168,7 @@ func (dht *IpfsDHT) FindProviders(ctx context.Context, key u.Key) ([]peer.PeerIn // to the given key func (dht *IpfsDHT) getClosestPeers(ctx context.Context, key u.Key) (<-chan peer.ID, error) { e := log.EventBegin(ctx, "getClosestPeers", &key) - tablepeers := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue) + tablepeers := dht.routingTable.ListPeers() if len(tablepeers) == 0 { return nil, errors.Wrap(kb.ErrLookupFailure) } @@ -313,7 +311,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co return &dhtQueryResult{closerPeers: clpeers}, nil }) - peers := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue) + peers := dht.routingTable.ListPeers() _, err := query.Run(ctx, peers) if err != nil { log.Errorf("Query error: %s", err) @@ -329,13 +327,13 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.PeerInfo, er return pi, nil } - closest := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue) - if closest == nil || len(closest) == 0 { + peers := dht.routingTable.ListPeers() + if len(peers) == 0 { return peer.PeerInfo{}, errors.Wrap(kb.ErrLookupFailure) } // Sanity... - for _, p := range closest { + for _, p := range peers { if p == id { log.Error("Found target peer in list of closest peers...") return dht.peerstore.PeerInfo(p), nil @@ -367,7 +365,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.PeerInfo, er }) // run it! - result, err := query.Run(ctx, closest) + result, err := query.Run(ctx, peers) if err != nil { return peer.PeerInfo{}, err } @@ -386,8 +384,8 @@ func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (< peerchan := make(chan peer.PeerInfo, asyncQueryBuffer) peersSeen := peer.Set{} - closest := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue) - if closest == nil || len(closest) == 0 { + peers := dht.routingTable.ListPeers() + if len(peers) == 0 { return nil, errors.Wrap(kb.ErrLookupFailure) } @@ -432,7 +430,7 @@ func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (< // run it! run it asynchronously to gen peers as results are found. // this does no error checking go func() { - if _, err := query.Run(ctx, closest); err != nil { + if _, err := query.Run(ctx, peers); err != nil { log.Error(err) } From 8e9413bd578f38d0cb3f46447736289cf47d3332 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Mon, 19 Jan 2015 19:23:38 -0800 Subject: [PATCH 12/20] p2p/proto/mux: make log more useful --- p2p/protocol/mux.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/protocol/mux.go b/p2p/protocol/mux.go index 91f28413f66..e2d121f961d 100644 --- a/p2p/protocol/mux.go +++ b/p2p/protocol/mux.go @@ -115,7 +115,7 @@ func (m *Mux) HandleSync(s inet.Stream) { return } - log.Infof("muxer handle protocol: %s", name) + log.Infof("muxer handle protocol %s: %s", s.Conn().RemotePeer(), name) handler(s) } From 773ee2e25df08e37c886685a37f02e8402db5ef8 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Tue, 20 Jan 2015 05:46:32 -0800 Subject: [PATCH 13/20] p2p/proto/id: more helpful log --- p2p/protocol/identify/id.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/p2p/protocol/identify/id.go b/p2p/protocol/identify/id.go index 3a0012287e9..c1c799b66ae 100644 --- a/p2p/protocol/identify/id.go +++ b/p2p/protocol/identify/id.go @@ -118,8 +118,8 @@ func (ids *IDService) ResponseHandler(s inet.Stream) { r := ggio.NewDelimitedReader(s, 2048) mes := pb.Identify{} if err := r.ReadMsg(&mes); err != nil { - log.Errorf("%s error receiving message from %s %s", ID, - c.RemotePeer(), c.RemoteMultiaddr()) + log.Errorf("%s error receiving message from %s %s %s", ID, + c.RemotePeer(), c.RemoteMultiaddr(), err) return } ids.consumeMessage(&mes, c) From 010cedf0a9893c4a63ee69cb2faef332412c404e Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Tue, 20 Jan 2015 05:50:50 -0800 Subject: [PATCH 14/20] ipfs swarm peers: sort output --- core/commands/swarm.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/commands/swarm.go b/core/commands/swarm.go index 6fa3c8e93d4..4fc53022d25 100644 --- a/core/commands/swarm.go +++ b/core/commands/swarm.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "path" + "sort" cmds "github.com/jbenet/go-ipfs/commands" peer "github.com/jbenet/go-ipfs/p2p/peer" @@ -64,6 +65,7 @@ ipfs swarm peers lists the set of peers this node is connected to. addrs[i] = fmt.Sprintf("%s/%s", addr, pid.Pretty()) } + sort.Sort(sort.StringSlice(addrs)) return &stringList{addrs}, nil }, Marshalers: cmds.MarshalerMap{ From c43f97d64e8a7431553009ee588ace009a69ab09 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Tue, 20 Jan 2015 05:53:56 -0800 Subject: [PATCH 15/20] updated goprocess, for periodic --- Godeps/Godeps.json | 2 +- .../github.com/jbenet/goprocess/.travis.yml | 11 + .../src/github.com/jbenet/goprocess/LICENSE | 21 ++ .../src/github.com/jbenet/goprocess/README.md | 2 + .../github.com/jbenet/goprocess/goprocess.go | 2 +- .../jbenet/goprocess/impl-goroutines.go | 114 -------- .../github.com/jbenet/goprocess/impl-mutex.go | 31 ++- .../jbenet/goprocess/periodic/README.md | 4 + .../goprocess/periodic/examples_test.go | 85 ++++++ .../jbenet/goprocess/periodic/periodic.go | 232 ++++++++++++++++ .../goprocess/periodic/periodic_test.go | 260 ++++++++++++++++++ 11 files changed, 639 insertions(+), 125 deletions(-) create mode 100644 Godeps/_workspace/src/github.com/jbenet/goprocess/.travis.yml create mode 100644 Godeps/_workspace/src/github.com/jbenet/goprocess/LICENSE delete mode 100644 Godeps/_workspace/src/github.com/jbenet/goprocess/impl-goroutines.go create mode 100644 Godeps/_workspace/src/github.com/jbenet/goprocess/periodic/README.md create mode 100644 Godeps/_workspace/src/github.com/jbenet/goprocess/periodic/examples_test.go create mode 100644 Godeps/_workspace/src/github.com/jbenet/goprocess/periodic/periodic.go create mode 100644 Godeps/_workspace/src/github.com/jbenet/goprocess/periodic/periodic_test.go diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index f7c7ab7cc7e..5c85e41170e 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -172,7 +172,7 @@ }, { "ImportPath": "github.com/jbenet/goprocess", - "Rev": "7f96033e206c3cd4e79d1c61cbdfff57869feaf8" + "Rev": "c37725a4a97d6ad772818b071ceef82789562142" }, { "ImportPath": "github.com/kr/binarydist", diff --git a/Godeps/_workspace/src/github.com/jbenet/goprocess/.travis.yml b/Godeps/_workspace/src/github.com/jbenet/goprocess/.travis.yml new file mode 100644 index 00000000000..7669438ed9a --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/goprocess/.travis.yml @@ -0,0 +1,11 @@ +language: go + +go: + - 1.2 + - 1.3 + - 1.4 + - release + - tip + +script: + - go test -v ./... diff --git a/Godeps/_workspace/src/github.com/jbenet/goprocess/LICENSE b/Godeps/_workspace/src/github.com/jbenet/goprocess/LICENSE new file mode 100644 index 00000000000..c7386b3c940 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/goprocess/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2014 Juan Batiz-Benet + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/Godeps/_workspace/src/github.com/jbenet/goprocess/README.md b/Godeps/_workspace/src/github.com/jbenet/goprocess/README.md index a19ed31978c..e2f12e16d65 100644 --- a/Godeps/_workspace/src/github.com/jbenet/goprocess/README.md +++ b/Godeps/_workspace/src/github.com/jbenet/goprocess/README.md @@ -1,5 +1,7 @@ # goprocess - lifecycles in go +[![travisbadge](https://travis-ci.org/jbenet/goprocess.svg)](https://travis-ci.org/jbenet/goprocess) + (Based on https://github.com/jbenet/go-ctxgroup) - Godoc: https://godoc.org/github.com/jbenet/goprocess diff --git a/Godeps/_workspace/src/github.com/jbenet/goprocess/goprocess.go b/Godeps/_workspace/src/github.com/jbenet/goprocess/goprocess.go index afe848c6136..762cecb2075 100644 --- a/Godeps/_workspace/src/github.com/jbenet/goprocess/goprocess.go +++ b/Godeps/_workspace/src/github.com/jbenet/goprocess/goprocess.go @@ -18,7 +18,7 @@ import ( // More specifically, it fits this: // // p := WithTeardown(tf) // new process is created, it is now running. -// p.AddChild(q) // can register children **before** Closing. +// p.AddChild(q) // can register children **before** Closed(). // go p.Close() // blocks until done running teardown func. // <-p.Closing() // would now return true. // <-p.childrenDone() // wait on all children to be done diff --git a/Godeps/_workspace/src/github.com/jbenet/goprocess/impl-goroutines.go b/Godeps/_workspace/src/github.com/jbenet/goprocess/impl-goroutines.go deleted file mode 100644 index 831dc938fb0..00000000000 --- a/Godeps/_workspace/src/github.com/jbenet/goprocess/impl-goroutines.go +++ /dev/null @@ -1,114 +0,0 @@ -// +build ignore - -// WARNING: this implementation is not correct. -// here only for historical purposes. - -package goprocess - -import ( - "sync" -) - -// process implements Process -type process struct { - children sync.WaitGroup // wait group for child goroutines - teardown TeardownFunc // called to run the teardown logic. - closing chan struct{} // closed once close starts. - closed chan struct{} // closed once close is done. - closeOnce sync.Once // ensure close is only called once. - closeErr error // error to return to clients of Close() -} - -// newProcess constructs and returns a Process. -// It will call tf TeardownFunc exactly once: -// **after** all children have fully Closed, -// **after** entering <-Closing(), and -// **before** <-Closed(). -func newProcess(tf TeardownFunc) *process { - if tf == nil { - tf = nilTeardownFunc - } - - return &process{ - teardown: tf, - closed: make(chan struct{}), - closing: make(chan struct{}), - } -} - -func (p *process) WaitFor(q Process) { - p.children.Add(1) // p waits on q to be done - go func(p *process, q Process) { - <-q.Closed() // wait until q is closed - p.children.Done() // p done waiting on q - }(p, q) -} - -func (p *process) AddChildNoWait(child Process) { - go func(p, child Process) { - <-p.Closing() // wait until p is closing - child.Close() // close child - }(p, child) -} - -func (p *process) AddChild(child Process) { - select { - case <-p.Closing(): - panic("attempt to add child to closing or closed process") - default: - } - - p.children.Add(1) // p waits on child to be done - go func(p *process, child Process) { - <-p.Closing() // wait until p is closing - child.Close() // close child and wait - p.children.Done() // p done waiting on child - }(p, child) -} - -func (p *process) Go(f ProcessFunc) Process { - select { - case <-p.Closing(): - panic("attempt to add child to closing or closed process") - default: - } - - // this is very similar to AddChild, but also runs the func - // in the child. we replicate it here to save one goroutine. - child := newProcessGoroutines(nil) - child.children.Add(1) // child waits on func to be done - p.AddChild(child) - go func() { - f(child) - child.children.Done() // wait on child's children to be done. - child.Close() // close to tear down. - }() - return child -} - -// Close is the external close function. -// it's a wrapper around internalClose that waits on Closed() -func (p *process) Close() error { - p.closeOnce.Do(p.doClose) - <-p.Closed() // sync.Once should block, but this checks chan is closed too - return p.closeErr -} - -func (p *process) Closing() <-chan struct{} { - return p.closing -} - -func (p *process) Closed() <-chan struct{} { - return p.closed -} - -// the _actual_ close process. -func (p *process) doClose() { - // this function should only be called once (hence the sync.Once). - // and it will panic (on closing channels) otherwise. - - close(p.closing) // signal that we're shutting down (Closing) - p.children.Wait() // wait till all children are done (before teardown) - p.closeErr = p.teardown() // actually run the close logic (ok safe to teardown) - close(p.closed) // signal that we're shut down (Closed) -} diff --git a/Godeps/_workspace/src/github.com/jbenet/goprocess/impl-mutex.go b/Godeps/_workspace/src/github.com/jbenet/goprocess/impl-mutex.go index ed68b9a0349..633d5b056ab 100644 --- a/Godeps/_workspace/src/github.com/jbenet/goprocess/impl-mutex.go +++ b/Godeps/_workspace/src/github.com/jbenet/goprocess/impl-mutex.go @@ -92,16 +92,18 @@ func (p *process) Go(f ProcessFunc) Process { // it's a wrapper around internalClose that waits on Closed() func (p *process) Close() error { p.Lock() - defer p.Unlock() - // if already closed, get out. + // if already closing, or closed, get out. (but wait!) select { - case <-p.Closed(): + case <-p.Closing(): + p.Unlock() + <-p.Closed() return p.closeErr default: } p.doClose() + p.Unlock() return p.closeErr } @@ -120,12 +122,23 @@ func (p *process) doClose() { close(p.closing) // signal that we're shutting down (Closing) - for _, c := range p.children { - go c.Close() // force all children to shut down - } - - for _, w := range p.waitfors { - <-w.Closed() // wait till all waitfors are fully closed (before teardown) + for len(p.children) > 0 || len(p.waitfors) > 0 { + for _, c := range p.children { + go c.Close() // force all children to shut down + } + p.children = nil // clear them + + // we must be careful not to iterate over waitfors directly, as it may + // change under our feet. + wf := p.waitfors + p.waitfors = nil // clear them + for _, w := range wf { + // Here, we wait UNLOCKED, so that waitfors who are in the middle of + // adding a child to us can finish. we will immediately close the child. + p.Unlock() + <-w.Closed() // wait till all waitfors are fully closed (before teardown) + p.Lock() + } } p.closeErr = p.teardown() // actually run the close logic (ok safe to teardown) diff --git a/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic/README.md b/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic/README.md new file mode 100644 index 00000000000..7a2c55db1c6 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic/README.md @@ -0,0 +1,4 @@ +# goprocess/periodic - periodic process creation + +- goprocess: https://github.com/jbenet/goprocess +- Godoc: https://godoc.org/github.com/jbenet/goprocess/periodic diff --git a/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic/examples_test.go b/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic/examples_test.go new file mode 100644 index 00000000000..782353db158 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic/examples_test.go @@ -0,0 +1,85 @@ +package periodicproc_test + +import ( + "fmt" + "time" + + goprocess "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" + periodicproc "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic" +) + +func ExampleEvery() { + tock := make(chan struct{}) + + i := 0 + p := periodicproc.Every(time.Second, func(proc goprocess.Process) { + tock <- struct{}{} + fmt.Printf("hello %d\n", i) + i++ + }) + + <-tock + <-tock + <-tock + p.Close() + + // Output: + // hello 0 + // hello 1 + // hello 2 +} + +func ExampleTick() { + p := periodicproc.Tick(time.Second, func(proc goprocess.Process) { + fmt.Println("tick") + }) + + <-time.After(3*time.Second + 500*time.Millisecond) + p.Close() + + // Output: + // tick + // tick + // tick +} + +func ExampleTickGo() { + + // with TickGo, execution is not rate limited, + // there can be many in-flight simultaneously + + wait := make(chan struct{}) + p := periodicproc.TickGo(time.Second, func(proc goprocess.Process) { + fmt.Println("tick") + <-wait + }) + + <-time.After(3*time.Second + 500*time.Millisecond) + + wait <- struct{}{} + wait <- struct{}{} + wait <- struct{}{} + p.Close() // blocks us until all children are closed. + + // Output: + // tick + // tick + // tick +} + +func ExampleOnSignal() { + sig := make(chan struct{}) + p := periodicproc.OnSignal(sig, func(proc goprocess.Process) { + fmt.Println("fire!") + }) + + sig <- struct{}{} + sig <- struct{}{} + sig <- struct{}{} + p.Close() + + // Output: + // fire! + // fire! + // fire! +} diff --git a/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic/periodic.go b/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic/periodic.go new file mode 100644 index 00000000000..ce1c4611e9c --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic/periodic.go @@ -0,0 +1,232 @@ +// Package periodic is part of github.com/jbenet/goprocess. +// It provides a simple periodic processor that calls a function +// periodically based on some options. +// +// For example: +// +// // use a time.Duration +// p := periodicproc.Every(time.Second, func(proc goprocess.Process) { +// fmt.Printf("the time is %s and all is well", time.Now()) +// }) +// +// <-time.After(5*time.Second) +// p.Close() +// +// // use a time.Time channel (like time.Ticker) +// p := periodicproc.Tick(time.Tick(time.Second), func(proc goprocess.Process) { +// fmt.Printf("the time is %s and all is well", time.Now()) +// }) +// +// <-time.After(5*time.Second) +// p.Close() +// +// // or arbitrary signals +// signal := make(chan struct{}) +// p := periodicproc.OnSignal(signal, func(proc goprocess.Process) { +// fmt.Printf("the time is %s and all is well", time.Now()) +// }) +// +// signal<- struct{}{} +// signal<- struct{}{} +// <-time.After(5 * time.Second) +// signal<- struct{}{} +// p.Close() +// +package periodicproc + +import ( + "time" + + gp "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" +) + +// Every calls the given ProcessFunc at periodic intervals. Internally, it uses +// <-time.After(interval), so it will have the behavior of waiting _at least_ +// interval in between calls. If you'd prefer the time.Ticker behavior, use +// periodicproc.Tick instead. +// This is sequentially rate limited, only one call will be in-flight at a time. +func Every(interval time.Duration, procfunc gp.ProcessFunc) gp.Process { + return gp.Go(func(proc gp.Process) { + for { + select { + case <-time.After(interval): + select { + case <-proc.Go(procfunc).Closed(): // spin it out as a child, and wait till it's done. + case <-proc.Closing(): // we're told to close + return + } + case <-proc.Closing(): // we're told to close + return + } + } + }) +} + +// EveryGo calls the given ProcessFunc at periodic intervals. Internally, it uses +// <-time.After(interval) +// This is not rate limited, multiple calls could be in-flight at the same time. +func EveryGo(interval time.Duration, procfunc gp.ProcessFunc) gp.Process { + return gp.Go(func(proc gp.Process) { + for { + select { + case <-time.After(interval): + proc.Go(procfunc) + case <-proc.Closing(): // we're told to close + return + } + } + }) +} + +// Tick constructs a ticker with interval, and calls the given ProcessFunc every +// time the ticker fires. +// This is sequentially rate limited, only one call will be in-flight at a time. +// +// p := periodicproc.Tick(time.Second, func(proc goprocess.Process) { +// fmt.Println("fire!") +// }) +// +// <-time.After(3 * time.Second) +// p.Close() +// +// // Output: +// // fire! +// // fire! +// // fire! +func Tick(interval time.Duration, procfunc gp.ProcessFunc) gp.Process { + return gp.Go(func(proc gp.Process) { + ticker := time.NewTicker(interval) + callOnTicker(ticker.C, procfunc)(proc) + ticker.Stop() + }) +} + +// TickGo constructs a ticker with interval, and calls the given ProcessFunc every +// time the ticker fires. +// This is not rate limited, multiple calls could be in-flight at the same time. +// +// p := periodicproc.TickGo(time.Second, func(proc goprocess.Process) { +// fmt.Println("fire!") +// <-time.After(10 * time.Second) // will not block sequential execution +// }) +// +// <-time.After(3 * time.Second) +// p.Close() +// +// // Output: +// // fire! +// // fire! +// // fire! +func TickGo(interval time.Duration, procfunc gp.ProcessFunc) gp.Process { + return gp.Go(func(proc gp.Process) { + ticker := time.NewTicker(interval) + goCallOnTicker(ticker.C, procfunc)(proc) + ticker.Stop() + }) +} + +// Ticker calls the given ProcessFunc every time the ticker fires. +// This is sequentially rate limited, only one call will be in-flight at a time. +func Ticker(ticker <-chan time.Time, procfunc gp.ProcessFunc) gp.Process { + return gp.Go(callOnTicker(ticker, procfunc)) +} + +// TickerGo calls the given ProcessFunc every time the ticker fires. +// This is not rate limited, multiple calls could be in-flight at the same time. +func TickerGo(ticker <-chan time.Time, procfunc gp.ProcessFunc) gp.Process { + return gp.Go(goCallOnTicker(ticker, procfunc)) +} + +func callOnTicker(ticker <-chan time.Time, pf gp.ProcessFunc) gp.ProcessFunc { + return func(proc gp.Process) { + for { + select { + case <-ticker: + select { + case <-proc.Go(pf).Closed(): // spin it out as a child, and wait till it's done. + case <-proc.Closing(): // we're told to close + return + } + case <-proc.Closing(): // we're told to close + return + } + } + } +} + +func goCallOnTicker(ticker <-chan time.Time, pf gp.ProcessFunc) gp.ProcessFunc { + return func(proc gp.Process) { + for { + select { + case <-ticker: + proc.Go(pf) + case <-proc.Closing(): // we're told to close + return + } + } + } +} + +// OnSignal calls the given ProcessFunc every time the signal fires, and waits for it to exit. +// This is sequentially rate limited, only one call will be in-flight at a time. +// +// sig := make(chan struct{}) +// p := periodicproc.OnSignal(sig, func(proc goprocess.Process) { +// fmt.Println("fire!") +// <-time.After(time.Second) // delays sequential execution by 1 second +// }) +// +// sig<- struct{} +// sig<- struct{} +// sig<- struct{} +// +// // Output: +// // fire! +// // fire! +// // fire! +func OnSignal(sig <-chan struct{}, procfunc gp.ProcessFunc) gp.Process { + return gp.Go(func(proc gp.Process) { + for { + select { + case <-sig: + select { + case <-proc.Go(procfunc).Closed(): // spin it out as a child, and wait till it's done. + case <-proc.Closing(): // we're told to close + return + } + case <-proc.Closing(): // we're told to close + return + } + } + }) +} + +// OnSignalGo calls the given ProcessFunc every time the signal fires. +// This is not rate limited, multiple calls could be in-flight at the same time. +// +// sig := make(chan struct{}) +// p := periodicproc.OnSignalGo(sig, func(proc goprocess.Process) { +// fmt.Println("fire!") +// <-time.After(time.Second) // wont block execution +// }) +// +// sig<- struct{} +// sig<- struct{} +// sig<- struct{} +// +// // Output: +// // fire! +// // fire! +// // fire! +func OnSignalGo(sig <-chan struct{}, procfunc gp.ProcessFunc) gp.Process { + return gp.Go(func(proc gp.Process) { + for { + select { + case <-sig: + proc.Go(procfunc) + case <-proc.Closing(): // we're told to close + return + } + } + }) +} diff --git a/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic/periodic_test.go b/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic/periodic_test.go new file mode 100644 index 00000000000..c79ed50c64d --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic/periodic_test.go @@ -0,0 +1,260 @@ +package periodicproc + +import ( + "testing" + "time" + + ci "github.com/jbenet/go-cienv" + gp "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" +) + +var ( + grace = time.Millisecond * 5 + interval = time.Millisecond * 10 + timeout = time.Second * 5 +) + +func init() { + if ci.IsRunning() { + grace = time.Millisecond * 500 + interval = time.Millisecond * 1000 + timeout = time.Second * 15 + } +} + +func between(min, diff, max time.Duration) bool { + return min <= diff && diff <= max +} + +func testBetween(t *testing.T, min, diff, max time.Duration) { + if !between(min, diff, max) { + t.Error("time diff incorrect:", min, diff, max) + } +} + +type intervalFunc func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) + +func testSeq(t *testing.T, toTest intervalFunc) { + t.Parallel() + + last := time.Now() + times := make(chan time.Time, 10) + p := toTest(times, nil) + + for i := 0; i < 5; i++ { + next := <-times + testBetween(t, interval-grace, next.Sub(last), interval+grace) + last = next + } + + go p.Close() + select { + case <-p.Closed(): + case <-time.After(timeout): + t.Error("proc failed to close") + } +} + +func testSeqWait(t *testing.T, toTest intervalFunc) { + t.Parallel() + + last := time.Now() + times := make(chan time.Time, 10) + wait := make(chan struct{}) + p := toTest(times, wait) + + for i := 0; i < 5; i++ { + next := <-times + testBetween(t, interval-grace, next.Sub(last), interval+grace) + + <-time.After(interval * 2) // make it wait. + last = time.Now() // make it now (sequential) + wait <- struct{}{} // release it. + } + + go p.Close() + + select { + case <-p.Closed(): + case <-time.After(timeout): + t.Error("proc failed to close") + } +} + +func testSeqNoWait(t *testing.T, toTest intervalFunc) { + t.Parallel() + + last := time.Now() + times := make(chan time.Time, 10) + wait := make(chan struct{}) + p := toTest(times, wait) + + for i := 0; i < 5; i++ { + next := <-times + testBetween(t, 0, next.Sub(last), interval+grace) // min of 0 + + <-time.After(interval * 2) // make it wait. + last = time.Now() // make it now (sequential) + wait <- struct{}{} // release it. + } + + go p.Close() + +end: + select { + case wait <- struct{}{}: // drain any extras. + goto end + case <-p.Closed(): + case <-time.After(timeout): + t.Error("proc failed to close") + } +} + +func testParallel(t *testing.T, toTest intervalFunc) { + t.Parallel() + + last := time.Now() + times := make(chan time.Time, 10) + wait := make(chan struct{}) + p := toTest(times, wait) + + for i := 0; i < 5; i++ { + next := <-times + testBetween(t, interval-grace, next.Sub(last), interval+grace) + last = next + + <-time.After(interval * 2) // make it wait. + wait <- struct{}{} // release it. + } + + go p.Close() + +end: + select { + case wait <- struct{}{}: // drain any extras. + goto end + case <-p.Closed(): + case <-time.After(timeout): + t.Error("proc failed to close") + } +} + +func TestEverySeq(t *testing.T) { + testSeq(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) { + return Every(interval, func(proc gp.Process) { + times <- time.Now() + }) + }) +} + +func TestEverySeqWait(t *testing.T) { + testSeqWait(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) { + return Every(interval, func(proc gp.Process) { + times <- time.Now() + select { + case <-wait: + case <-proc.Closing(): + } + }) + }) +} + +func TestEveryGoSeq(t *testing.T) { + testSeq(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) { + return EveryGo(interval, func(proc gp.Process) { + times <- time.Now() + }) + }) +} + +func TestEveryGoSeqParallel(t *testing.T) { + testParallel(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) { + return EveryGo(interval, func(proc gp.Process) { + times <- time.Now() + select { + case <-wait: + case <-proc.Closing(): + } + }) + }) +} + +func TestTickSeq(t *testing.T) { + testSeq(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) { + return Tick(interval, func(proc gp.Process) { + times <- time.Now() + }) + }) +} + +func TestTickSeqNoWait(t *testing.T) { + testSeqNoWait(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) { + return Tick(interval, func(proc gp.Process) { + times <- time.Now() + select { + case <-wait: + case <-proc.Closing(): + } + }) + }) +} + +func TestTickGoSeq(t *testing.T) { + testSeq(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) { + return TickGo(interval, func(proc gp.Process) { + times <- time.Now() + }) + }) +} + +func TestTickGoSeqParallel(t *testing.T) { + testParallel(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) { + return TickGo(interval, func(proc gp.Process) { + times <- time.Now() + select { + case <-wait: + case <-proc.Closing(): + } + }) + }) +} + +func TestTickerSeq(t *testing.T) { + testSeq(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) { + return Ticker(time.Tick(interval), func(proc gp.Process) { + times <- time.Now() + }) + }) +} + +func TestTickerSeqNoWait(t *testing.T) { + testSeqNoWait(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) { + return Ticker(time.Tick(interval), func(proc gp.Process) { + times <- time.Now() + select { + case <-wait: + case <-proc.Closing(): + } + }) + }) +} + +func TestTickerGoSeq(t *testing.T) { + testSeq(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) { + return TickerGo(time.Tick(interval), func(proc gp.Process) { + times <- time.Now() + }) + }) +} + +func TestTickerGoParallel(t *testing.T) { + testParallel(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) { + return TickerGo(time.Tick(interval), func(proc gp.Process) { + times <- time.Now() + select { + case <-wait: + case <-proc.Closing(): + } + }) + }) +} From d6ce837d720ffc9f542f4c63f83364372f219f27 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Tue, 20 Jan 2015 07:38:20 -0800 Subject: [PATCH 16/20] core/bootstrap: cleaned up bootstrapping Moved it to its own package to isolate scope. --- core/bootstrap.go | 203 +++++++++++++++++++++++++---------- core/core.go | 22 +--- routing/dht/dht_bootstrap.go | 66 +++++------- 3 files changed, 179 insertions(+), 112 deletions(-) diff --git a/core/bootstrap.go b/core/bootstrap.go index f8da807bf71..74b2253ffe2 100644 --- a/core/bootstrap.go +++ b/core/bootstrap.go @@ -2,6 +2,7 @@ package core import ( "errors" + "fmt" "math/rand" "sync" "time" @@ -16,109 +17,187 @@ import ( context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" + goprocess "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" + periodicproc "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic" ) +// ErrNotEnoughBootstrapPeers signals that we do not have enough bootstrap +// peers to bootstrap correctly. +var ErrNotEnoughBootstrapPeers = errors.New("not enough bootstrap peers to bootstrap") + const ( - period = 30 * time.Second // how often to check connection status - connectiontimeout time.Duration = period / 3 // duration to wait when attempting to connect - recoveryThreshold = 4 // attempt to bootstrap if connection count falls below this value - numDHTBootstrapQueries = 15 // number of DHT queries to execute + // BootstrapPeriod governs the periodic interval at which the node will + // attempt to bootstrap. The bootstrap process is not very expensive, so + // this threshold can afford to be small (<=30s). + BootstrapPeriod = 30 * time.Second + + // BootstrapPeerThreshold governs the node Bootstrap process. If the node + // has less open connections than this number, it will open connections + // to the bootstrap nodes. From there, the routing system should be able + // to use the connections to the bootstrap nodes to connect to even more + // peers. Routing systems like the IpfsDHT do so in their own Bootstrap + // process, which issues random queries to find more peers. + BootstrapPeerThreshold = 4 + + // BootstrapConnectionTimeout determines how long to wait for a bootstrap + // connection attempt before cancelling it. + BootstrapConnectionTimeout time.Duration = BootstrapPeriod / 3 ) -func superviseConnections(parent context.Context, - h host.Host, - route *dht.IpfsDHT, // TODO depend on abstract interface for testing purposes - store peer.Peerstore, - peers []peer.PeerInfo) error { +// nodeBootstrapper is a small object used to bootstrap an IpfsNode. +type nodeBootstrapper struct { + node *IpfsNode +} - var dhtAlreadyBootstrapping bool +// TryToBootstrap starts IpfsNode bootstrapping. This function will run an +// initial bootstrapping phase before exiting: connect to several bootstrap +// nodes. This allows callers to call this function synchronously to: +// - check if an error occurrs (bootstrapping unsuccessful) +// - wait before starting services which require the node to be bootstrapped +// +// If bootstrapping initially fails, Bootstrap() will try again for a total of +// three times, before giving up completely. Note that in environments where a +// node may be initialized offline, as normal operation, BootstrapForever() +// should be used instead. +// +// Note: this function could be much cleaner if we were to relax the constraint +// that we want to exit **after** we have performed initial bootstrapping (and are +// thus connected to nodes). The constraint may not be that useful in practice. +// Consider cases when we initialize the node while disconnected from the internet. +// We don't want this launch to fail... want to continue launching the node, hoping +// that bootstrapping will work in the future if we get connected. +func (nb *nodeBootstrapper) TryToBootstrap(ctx context.Context, peers []peer.PeerInfo) error { + n := nb.node + + // TODO what bootstrapping should happen if there is no DHT? i.e. we could + // continue connecting to our bootstrap peers, but for what purpose? for now + // simply exit without connecting to any of them. When we introduce another + // routing system that uses bootstrap peers we can change this. + dht, ok := n.Routing.(*dht.IpfsDHT) + if !ok { + return nil + } - for { - ctx, _ := context.WithTimeout(parent, connectiontimeout) - // TODO get config from disk so |peers| always reflects the latest - // information - if err := bootstrap(ctx, h, route, store, peers); err != nil { - log.Error(err) + for i := 0; i < 3; i++ { + if err := bootstrapRound(ctx, n.PeerHost, dht, n.Peerstore, peers); err != nil { + return err } + } - if !dhtAlreadyBootstrapping { - dhtAlreadyBootstrapping = true // only call dht.Bootstrap once. - if _, err := route.Bootstrap(); err != nil { - log.Error(err) - } + // at this point we have done at least one round of initial bootstrap. + // we're ready to kick off dht bootstrapping. + dbproc, err := dht.Bootstrap(ctx) + if err != nil { + return err + } + + // kick off the node's periodic bootstrapping + proc := periodicproc.Tick(BootstrapPeriod, func(worker goprocess.Process) { + if err := bootstrapRound(ctx, n.PeerHost, dht, n.Peerstore, peers); err != nil { + log.Error(err) } + }) + + // add dht bootstrap proc as a child, so it is closed automatically when we are. + proc.AddChild(dbproc) + + // we were given a context. instead of returning proc for the caller + // to manage, for now we just close the proc when context is done. + go func() { + <-ctx.Done() + proc.Close() + }() + return nil +} - select { - case <-parent.Done(): - return parent.Err() - case <-time.Tick(period): +// BootstrapForever starts IpfsNode bootstrapping. Unlike TryToBootstrap(), +// BootstrapForever() will run indefinitely (until its context is cancelled). +// This is particularly useful for the daemon and other services, which may +// be started offline and will come online at a future date. +// +// TODO: check offline --to--> online case works well and doesn't hurt perf. +// We may still be dialing. We should check network config. +func (nb *nodeBootstrapper) BootstrapForever(ctx context.Context, peers []peer.PeerInfo) error { + for { + if err := nb.TryToBootstrap(ctx, peers); err == nil { + return nil } } - return nil } -func bootstrap(ctx context.Context, - h host.Host, - r *dht.IpfsDHT, - ps peer.Peerstore, +func bootstrapRound(ctx context.Context, + host host.Host, + route *dht.IpfsDHT, + peerstore peer.Peerstore, bootstrapPeers []peer.PeerInfo) error { - connectedPeers := h.Network().Peers() - if len(connectedPeers) >= recoveryThreshold { - log.Event(ctx, "bootstrapSkip", h.ID()) - log.Debugf("%s core bootstrap skipped -- connected to %d (> %d) nodes", - h.ID(), len(connectedPeers), recoveryThreshold) + ctx, _ = context.WithTimeout(ctx, BootstrapConnectionTimeout) + // determine how many bootstrap connections to open + connectedPeers := host.Network().Peers() + if len(connectedPeers) >= BootstrapPeerThreshold { + log.Event(ctx, "bootstrapSkip", host.ID()) + log.Debugf("%s core bootstrap skipped -- connected to %d (> %d) nodes", + host.ID(), len(connectedPeers), BootstrapPeerThreshold) return nil } - numCxnsToCreate := recoveryThreshold - len(connectedPeers) - - log.Event(ctx, "bootstrapStart", h.ID()) - log.Debugf("%s core bootstrapping to %d more nodes", h.ID(), numCxnsToCreate) + numCxnsToCreate := BootstrapPeerThreshold - len(connectedPeers) + // filter out bootstrap nodes we are already connected to var notConnected []peer.PeerInfo for _, p := range bootstrapPeers { - if h.Network().Connectedness(p.ID) != inet.Connected { + if host.Network().Connectedness(p.ID) != inet.Connected { notConnected = append(notConnected, p) } } - // if not connected to all bootstrap peer candidates - if len(notConnected) > 0 { - var randomSubset = randomSubsetOfPeers(notConnected, numCxnsToCreate) - log.Debugf("%s bootstrapping to %d nodes: %s", h.ID(), numCxnsToCreate, randomSubset) - if err := connect(ctx, ps, r, randomSubset); err != nil { - log.Event(ctx, "bootstrapError", h.ID(), lgbl.Error(err)) - log.Errorf("%s bootstrap error: %s", h.ID(), err) - return err - } + // if connected to all bootstrap peer candidates, exit + if len(notConnected) < 1 { + log.Debugf("%s no more bootstrap peers to create %d connections", host.ID(), numCxnsToCreate) + return ErrNotEnoughBootstrapPeers + } + + // connect to a random susbset of bootstrap candidates + var randomSubset = randomSubsetOfPeers(notConnected, numCxnsToCreate) + log.Event(ctx, "bootstrapStart", host.ID()) + log.Debugf("%s bootstrapping to %d nodes: %s", host.ID(), numCxnsToCreate, randomSubset) + if err := bootstrapConnect(ctx, peerstore, route, randomSubset); err != nil { + log.Event(ctx, "bootstrapError", host.ID(), lgbl.Error(err)) + log.Errorf("%s bootstrap error: %s", host.ID(), err) + return err } return nil } -func connect(ctx context.Context, ps peer.Peerstore, r *dht.IpfsDHT, peers []peer.PeerInfo) error { +func bootstrapConnect(ctx context.Context, + ps peer.Peerstore, + route *dht.IpfsDHT, + peers []peer.PeerInfo) error { if len(peers) < 1 { - return errors.New("bootstrap set empty") + return ErrNotEnoughBootstrapPeers } + errs := make(chan error, len(peers)) var wg sync.WaitGroup for _, p := range peers { // performed asynchronously because when performed synchronously, if // one `Connect` call hangs, subsequent calls are more likely to // fail/abort due to an expiring context. + // Also, performed asynchronously for dial speed. wg.Add(1) go func(p peer.PeerInfo) { defer wg.Done() - log.Event(ctx, "bootstrapDial", r.LocalPeer(), p.ID) - log.Debugf("%s bootstrapping to %s", r.LocalPeer(), p.ID) + log.Event(ctx, "bootstrapDial", route.LocalPeer(), p.ID) + log.Debugf("%s bootstrapping to %s", route.LocalPeer(), p.ID) ps.AddAddresses(p.ID, p.Addrs) - err := r.Connect(ctx, p.ID) + err := route.Connect(ctx, p.ID) if err != nil { log.Event(ctx, "bootstrapFailed", p.ID) - log.Criticalf("failed to bootstrap with %v: %s", p.ID, err) + log.Errorf("failed to bootstrap with %v: %s", p.ID, err) + errs <- err return } log.Event(ctx, "bootstrapSuccess", p.ID) @@ -126,6 +205,20 @@ func connect(ctx context.Context, ps peer.Peerstore, r *dht.IpfsDHT, peers []pee }(p) } wg.Wait() + + // our failure condition is when no connection attempt succeeded. + // So drain the errs channel, counting the results. + close(errs) + count := 0 + var err error + for err = range errs { + if err != nil { + count++ + } + } + if count == len(peers) { + return fmt.Errorf("failed to bootstrap. %s", err) + } return nil } diff --git a/core/core.go b/core/core.go index 3ac954ed33e..0bf6d394026 100644 --- a/core/core.go +++ b/core/core.go @@ -297,30 +297,12 @@ func (n *IpfsNode) Resolve(path string) (*merkledag.Node, error) { func (n *IpfsNode) Bootstrap(ctx context.Context, peers []peer.PeerInfo) error { // TODO what should return value be when in offlineMode? - if n.Routing == nil { return nil } - // TODO what bootstrapping should happen if there is no DHT? i.e. we could - // continue connecting to our bootstrap peers, but for what purpose? - dhtRouting, ok := n.Routing.(*dht.IpfsDHT) - if !ok { - return nil - } - - // TODO consider moving connection supervision into the Network. We've - // discussed improvements to this Node constructor. One improvement - // would be to make the node configurable, allowing clients to inject - // an Exchange, Network, or Routing component and have the constructor - // manage the wiring. In that scenario, this dangling function is a bit - // awkward. - - // spin off the node's connection supervisor. - // TODO, clean up how this thing works. Make the superviseConnections thing - // work like the DHT.Bootstrap. - go superviseConnections(ctx, n.PeerHost, dhtRouting, n.Peerstore, peers) - return nil + nb := nodeBootstrapper{n} + return nb.TryToBootstrap(ctx, peers) } func (n *IpfsNode) loadID() error { diff --git a/routing/dht/dht_bootstrap.go b/routing/dht/dht_bootstrap.go index 095c194d676..c3991972ce4 100644 --- a/routing/dht/dht_bootstrap.go +++ b/routing/dht/dht_bootstrap.go @@ -14,6 +14,7 @@ import ( context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" goprocess "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" + periodicproc "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic" ) // DefaultBootstrapQueries specifies how many queries to run, @@ -54,9 +55,9 @@ const DefaultBootstrapTimeout = time.Duration(10 * time.Second) // and connected to at least a few nodes. // // Like PeriodicBootstrap, Bootstrap returns a process, so the user can stop it. -func (dht *IpfsDHT) Bootstrap() (goprocess.Process, error) { +func (dht *IpfsDHT) Bootstrap(ctx context.Context) (goprocess.Process, error) { - if err := dht.runBootstrap(dht.Context(), DefaultBootstrapQueries); err != nil { + if err := dht.runBootstrap(ctx, DefaultBootstrapQueries); err != nil { return nil, err } @@ -79,41 +80,32 @@ func (dht *IpfsDHT) BootstrapOnSignal(queries int, signal <-chan time.Time) (gop return nil, fmt.Errorf("invalid signal: %v", signal) } - proc := goprocess.Go(func(worker goprocess.Process) { - defer log.Debug("dht bootstrapper shutting down") - for { - select { - case <-worker.Closing(): - return - - case <-signal: - // it would be useful to be able to send out signals of when we bootstrap, too... - // maybe this is a good case for whole module event pub/sub? - - ctx := dht.Context() - if err := dht.runBootstrap(ctx, queries); err != nil { - log.Error(err) - // A bootstrapping error is important to notice but not fatal. - // maybe the client should be able to consume these errors, - // though I dont have a clear use case in mind-- what **could** - // the client do if one of the bootstrap calls fails? - // - // This is also related to the core's bootstrap failures. - // superviseConnections should perhaps allow clients to detect - // bootstrapping problems. - // - // Anyway, passing errors could be done with a bootstrapper object. - // this would imply the client should be able to consume a lot of - // other non-fatal dht errors too. providing this functionality - // should be done correctly DHT-wide. - // NB: whatever the design, clients must ensure they drain errors! - // This pattern is common to many things, perhaps long-running services - // should have something like an ErrStream that allows clients to consume - // periodic errors and take action. It should allow the user to also - // ignore all errors with something like an ErrStreamDiscard. We should - // study what other systems do for ideas. - } - } + proc := periodicproc.Ticker(signal, func(worker goprocess.Process) { + // it would be useful to be able to send out signals of when we bootstrap, too... + // maybe this is a good case for whole module event pub/sub? + + ctx := dht.Context() + if err := dht.runBootstrap(ctx, queries); err != nil { + log.Error(err) + // A bootstrapping error is important to notice but not fatal. + // maybe the client should be able to consume these errors, + // though I dont have a clear use case in mind-- what **could** + // the client do if one of the bootstrap calls fails? + // + // This is also related to the core's bootstrap failures. + // superviseConnections should perhaps allow clients to detect + // bootstrapping problems. + // + // Anyway, passing errors could be done with a bootstrapper object. + // this would imply the client should be able to consume a lot of + // other non-fatal dht errors too. providing this functionality + // should be done correctly DHT-wide. + // NB: whatever the design, clients must ensure they drain errors! + // This pattern is common to many things, perhaps long-running services + // should have something like an ErrStream that allows clients to consume + // periodic errors and take action. It should allow the user to also + // ignore all errors with something like an ErrStreamDiscard. We should + // study what other systems do for ideas. } }) From dd9c1b6243b9292980e474f553f58e93fb6f7f3f Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Tue, 20 Jan 2015 17:22:14 -0800 Subject: [PATCH 17/20] core/bootstrap: CR comments --- core/bootstrap.go | 7 ++++--- routing/dht/dht_bootstrap.go | 1 + routing/dht/dht_test.go | 2 -- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/bootstrap.go b/core/bootstrap.go index 74b2253ffe2..b1b0bfaa4d1 100644 --- a/core/bootstrap.go +++ b/core/bootstrap.go @@ -93,6 +93,7 @@ func (nb *nodeBootstrapper) TryToBootstrap(ctx context.Context, peers []peer.Pee // kick off the node's periodic bootstrapping proc := periodicproc.Tick(BootstrapPeriod, func(worker goprocess.Process) { + defer log.EventBegin(ctx, "periodicBootstrap", n.Identity).Done() if err := bootstrapRound(ctx, n.PeerHost, dht, n.Peerstore, peers); err != nil { log.Error(err) } @@ -158,8 +159,8 @@ func bootstrapRound(ctx context.Context, } // connect to a random susbset of bootstrap candidates - var randomSubset = randomSubsetOfPeers(notConnected, numCxnsToCreate) - log.Event(ctx, "bootstrapStart", host.ID()) + randomSubset := randomSubsetOfPeers(notConnected, numCxnsToCreate) + defer log.EventBegin(ctx, "bootstrapStart", host.ID()).Done() log.Debugf("%s bootstrapping to %d nodes: %s", host.ID(), numCxnsToCreate, randomSubset) if err := bootstrapConnect(ctx, peerstore, route, randomSubset); err != nil { log.Event(ctx, "bootstrapError", host.ID(), lgbl.Error(err)) @@ -189,7 +190,7 @@ func bootstrapConnect(ctx context.Context, wg.Add(1) go func(p peer.PeerInfo) { defer wg.Done() - log.Event(ctx, "bootstrapDial", route.LocalPeer(), p.ID) + defer log.EventBegin(ctx, "bootstrapDial", route.LocalPeer(), p.ID).Done() log.Debugf("%s bootstrapping to %s", route.LocalPeer(), p.ID) ps.AddAddresses(p.ID, p.Addrs) diff --git a/routing/dht/dht_bootstrap.go b/routing/dht/dht_bootstrap.go index c3991972ce4..588bcfd754d 100644 --- a/routing/dht/dht_bootstrap.go +++ b/routing/dht/dht_bootstrap.go @@ -128,6 +128,7 @@ func (dht *IpfsDHT) runBootstrap(ctx context.Context, queries int) error { // the dht will rehash to its own keyspace anyway. id := make([]byte, 16) rand.Read(id) + id = u.Hash(id) return peer.ID(id) } diff --git a/routing/dht/dht_test.go b/routing/dht/dht_test.go index afc5756e828..2e1e1129fa5 100644 --- a/routing/dht/dht_test.go +++ b/routing/dht/dht_test.go @@ -75,8 +75,6 @@ func connect(t *testing.T, ctx context.Context, a, b *IpfsDHT) { func bootstrap(t *testing.T, ctx context.Context, dhts []*IpfsDHT) { ctx, cancel := context.WithCancel(ctx) - log.Error("hmm") - defer log.Error("hmm end") log.Debugf("bootstrapping dhts...") // tried async. sequential fares much better. compare: From 95d58b2a4a79c308def0afb67d9688bb33ee46cb Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Fri, 23 Jan 2015 04:36:18 -0800 Subject: [PATCH 18/20] core: cleaned up bootstrap process --- core/bootstrap.go | 184 +++++++++++++------------ core/core.go | 113 +++++++-------- routing/dht/dht_bootstrap.go | 112 ++++++--------- test/epictest/addcat_test.go | 11 +- test/epictest/three_legged_cat_test.go | 12 +- 5 files changed, 211 insertions(+), 221 deletions(-) diff --git a/core/bootstrap.go b/core/bootstrap.go index b1b0bfaa4d1..84bf8e65203 100644 --- a/core/bootstrap.go +++ b/core/bootstrap.go @@ -3,6 +3,8 @@ package core import ( "errors" "fmt" + "io" + "io/ioutil" "math/rand" "sync" "time" @@ -18,6 +20,7 @@ import ( context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" goprocess "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" + procctx "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context" periodicproc "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic" ) @@ -25,128 +28,116 @@ import ( // peers to bootstrap correctly. var ErrNotEnoughBootstrapPeers = errors.New("not enough bootstrap peers to bootstrap") -const ( - // BootstrapPeriod governs the periodic interval at which the node will - // attempt to bootstrap. The bootstrap process is not very expensive, so - // this threshold can afford to be small (<=30s). - BootstrapPeriod = 30 * time.Second +// BootstrapConfig specifies parameters used in an IpfsNode's network +// bootstrapping process. +type BootstrapConfig struct { - // BootstrapPeerThreshold governs the node Bootstrap process. If the node - // has less open connections than this number, it will open connections + // MinPeerThreshold governs whether to bootstrap more connections. If the + // node has less open connections than this number, it will open connections // to the bootstrap nodes. From there, the routing system should be able // to use the connections to the bootstrap nodes to connect to even more // peers. Routing systems like the IpfsDHT do so in their own Bootstrap // process, which issues random queries to find more peers. - BootstrapPeerThreshold = 4 + MinPeerThreshold int + + // Period governs the periodic interval at which the node will + // attempt to bootstrap. The bootstrap process is not very expensive, so + // this threshold can afford to be small (<=30s). + Period time.Duration - // BootstrapConnectionTimeout determines how long to wait for a bootstrap + // ConnectionTimeout determines how long to wait for a bootstrap // connection attempt before cancelling it. - BootstrapConnectionTimeout time.Duration = BootstrapPeriod / 3 -) + ConnectionTimeout time.Duration + + // BootstrapPeers is a function that returns a set of bootstrap peers + // for the bootstrap process to use. This makes it possible for clients + // to control the peers the process uses at any moment. + BootstrapPeers func() []peer.PeerInfo +} -// nodeBootstrapper is a small object used to bootstrap an IpfsNode. -type nodeBootstrapper struct { - node *IpfsNode +// DefaultBootstrapConfig specifies default sane parameters for bootstrapping. +var DefaultBootstrapConfig = BootstrapConfig{ + MinPeerThreshold: 4, + Period: 30 * time.Second, + ConnectionTimeout: (30 * time.Second) / 3, // Perod / 3 } -// TryToBootstrap starts IpfsNode bootstrapping. This function will run an -// initial bootstrapping phase before exiting: connect to several bootstrap -// nodes. This allows callers to call this function synchronously to: -// - check if an error occurrs (bootstrapping unsuccessful) -// - wait before starting services which require the node to be bootstrapped -// -// If bootstrapping initially fails, Bootstrap() will try again for a total of -// three times, before giving up completely. Note that in environments where a -// node may be initialized offline, as normal operation, BootstrapForever() -// should be used instead. -// -// Note: this function could be much cleaner if we were to relax the constraint -// that we want to exit **after** we have performed initial bootstrapping (and are -// thus connected to nodes). The constraint may not be that useful in practice. -// Consider cases when we initialize the node while disconnected from the internet. -// We don't want this launch to fail... want to continue launching the node, hoping -// that bootstrapping will work in the future if we get connected. -func (nb *nodeBootstrapper) TryToBootstrap(ctx context.Context, peers []peer.PeerInfo) error { - n := nb.node +func BootstrapConfigWithPeers(pis []peer.PeerInfo) BootstrapConfig { + cfg := DefaultBootstrapConfig + cfg.BootstrapPeers = func() []peer.PeerInfo { + return pis + } + return cfg +} + +// Bootstrap kicks off IpfsNode bootstrapping. This function will periodically +// check the number of open connections and -- if there are too few -- initiate +// connections to well-known bootstrap peers. It also kicks off subsystem +// bootstrapping (i.e. routing). +func Bootstrap(n *IpfsNode, cfg BootstrapConfig) (io.Closer, error) { // TODO what bootstrapping should happen if there is no DHT? i.e. we could // continue connecting to our bootstrap peers, but for what purpose? for now // simply exit without connecting to any of them. When we introduce another // routing system that uses bootstrap peers we can change this. - dht, ok := n.Routing.(*dht.IpfsDHT) + thedht, ok := n.Routing.(*dht.IpfsDHT) if !ok { - return nil + return ioutil.NopCloser(nil), nil } - for i := 0; i < 3; i++ { - if err := bootstrapRound(ctx, n.PeerHost, dht, n.Peerstore, peers); err != nil { - return err + // the periodic bootstrap function -- the connection supervisor + periodic := func(worker goprocess.Process) { + ctx := procctx.WithProcessClosing(context.Background(), worker) + defer log.EventBegin(ctx, "periodicBootstrap", n.Identity).Done() + + if err := bootstrapRound(ctx, n.PeerHost, thedht, n.Peerstore, cfg); err != nil { + log.Event(ctx, "bootstrapError", n.Identity, lgbl.Error(err)) + log.Errorf("%s bootstrap error: %s", n.Identity, err) } } - // at this point we have done at least one round of initial bootstrap. - // we're ready to kick off dht bootstrapping. - dbproc, err := dht.Bootstrap(ctx) + // kick off the node's periodic bootstrapping + proc := periodicproc.Tick(cfg.Period, periodic) + proc.Go(periodic) // run one right now. + + // kick off dht bootstrapping. + dbproc, err := thedht.Bootstrap(dht.DefaultBootstrapConfig) if err != nil { - return err + proc.Close() + return nil, err } - // kick off the node's periodic bootstrapping - proc := periodicproc.Tick(BootstrapPeriod, func(worker goprocess.Process) { - defer log.EventBegin(ctx, "periodicBootstrap", n.Identity).Done() - if err := bootstrapRound(ctx, n.PeerHost, dht, n.Peerstore, peers); err != nil { - log.Error(err) - } - }) - // add dht bootstrap proc as a child, so it is closed automatically when we are. proc.AddChild(dbproc) - - // we were given a context. instead of returning proc for the caller - // to manage, for now we just close the proc when context is done. - go func() { - <-ctx.Done() - proc.Close() - }() - return nil -} - -// BootstrapForever starts IpfsNode bootstrapping. Unlike TryToBootstrap(), -// BootstrapForever() will run indefinitely (until its context is cancelled). -// This is particularly useful for the daemon and other services, which may -// be started offline and will come online at a future date. -// -// TODO: check offline --to--> online case works well and doesn't hurt perf. -// We may still be dialing. We should check network config. -func (nb *nodeBootstrapper) BootstrapForever(ctx context.Context, peers []peer.PeerInfo) error { - for { - if err := nb.TryToBootstrap(ctx, peers); err == nil { - return nil - } - } + return proc, nil } func bootstrapRound(ctx context.Context, host host.Host, route *dht.IpfsDHT, peerstore peer.Peerstore, - bootstrapPeers []peer.PeerInfo) error { + cfg BootstrapConfig) error { + + ctx, _ = context.WithTimeout(ctx, cfg.ConnectionTimeout) + id := host.ID() - ctx, _ = context.WithTimeout(ctx, BootstrapConnectionTimeout) + // get bootstrap peers from config. retrieving them here makes + // sure we remain observant of changes to client configuration. + peers := cfg.BootstrapPeers() // determine how many bootstrap connections to open - connectedPeers := host.Network().Peers() - if len(connectedPeers) >= BootstrapPeerThreshold { - log.Event(ctx, "bootstrapSkip", host.ID()) + connected := host.Network().Peers() + if len(connected) >= cfg.MinPeerThreshold { + log.Event(ctx, "bootstrapSkip", id) log.Debugf("%s core bootstrap skipped -- connected to %d (> %d) nodes", - host.ID(), len(connectedPeers), BootstrapPeerThreshold) + id, len(connected), cfg.MinPeerThreshold) return nil } - numCxnsToCreate := BootstrapPeerThreshold - len(connectedPeers) + numToDial := cfg.MinPeerThreshold - len(connected) // filter out bootstrap nodes we are already connected to var notConnected []peer.PeerInfo - for _, p := range bootstrapPeers { + for _, p := range peers { if host.Network().Connectedness(p.ID) != inet.Connected { notConnected = append(notConnected, p) } @@ -154,17 +145,16 @@ func bootstrapRound(ctx context.Context, // if connected to all bootstrap peer candidates, exit if len(notConnected) < 1 { - log.Debugf("%s no more bootstrap peers to create %d connections", host.ID(), numCxnsToCreate) + log.Debugf("%s no more bootstrap peers to create %d connections", id, numToDial) return ErrNotEnoughBootstrapPeers } // connect to a random susbset of bootstrap candidates - randomSubset := randomSubsetOfPeers(notConnected, numCxnsToCreate) - defer log.EventBegin(ctx, "bootstrapStart", host.ID()).Done() - log.Debugf("%s bootstrapping to %d nodes: %s", host.ID(), numCxnsToCreate, randomSubset) - if err := bootstrapConnect(ctx, peerstore, route, randomSubset); err != nil { - log.Event(ctx, "bootstrapError", host.ID(), lgbl.Error(err)) - log.Errorf("%s bootstrap error: %s", host.ID(), err) + randSubset := randomSubsetOfPeers(notConnected, numToDial) + + defer log.EventBegin(ctx, "bootstrapStart", id).Done() + log.Debugf("%s bootstrapping to %d nodes: %s", id, numToDial, randSubset) + if err := bootstrapConnect(ctx, peerstore, route, randSubset); err != nil { return err } return nil @@ -196,12 +186,12 @@ func bootstrapConnect(ctx context.Context, ps.AddAddresses(p.ID, p.Addrs) err := route.Connect(ctx, p.ID) if err != nil { - log.Event(ctx, "bootstrapFailed", p.ID) + log.Event(ctx, "bootstrapDialFailed", p.ID) log.Errorf("failed to bootstrap with %v: %s", p.ID, err) errs <- err return } - log.Event(ctx, "bootstrapSuccess", p.ID) + log.Event(ctx, "bootstrapDialSuccess", p.ID) log.Infof("bootstrapped with %v", p.ID) }(p) } @@ -223,7 +213,19 @@ func bootstrapConnect(ctx context.Context, return nil } -func toPeer(bootstrap config.BootstrapPeer) (p peer.PeerInfo, err error) { +func toPeerInfos(bpeers []config.BootstrapPeer) ([]peer.PeerInfo, error) { + var peers []peer.PeerInfo + for _, bootstrap := range bpeers { + p, err := toPeerInfo(bootstrap) + if err != nil { + return nil, err + } + peers = append(peers, p) + } + return peers, nil +} + +func toPeerInfo(bootstrap config.BootstrapPeer) (p peer.PeerInfo, err error) { id, err := peer.IDB58Decode(bootstrap.PeerID) if err != nil { return diff --git a/core/core.go b/core/core.go index 0bf6d394026..a2839f610dd 100644 --- a/core/core.go +++ b/core/core.go @@ -11,33 +11,36 @@ import ( datastore "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" + eventlog "github.com/jbenet/go-ipfs/thirdparty/eventlog" + debugerror "github.com/jbenet/go-ipfs/util/debugerror" + + diag "github.com/jbenet/go-ipfs/diagnostics" + ic "github.com/jbenet/go-ipfs/p2p/crypto" + p2phost "github.com/jbenet/go-ipfs/p2p/host" + p2pbhost "github.com/jbenet/go-ipfs/p2p/host/basic" + swarm "github.com/jbenet/go-ipfs/p2p/net/swarm" + addrutil "github.com/jbenet/go-ipfs/p2p/net/swarm/addr" + peer "github.com/jbenet/go-ipfs/p2p/peer" + + routing "github.com/jbenet/go-ipfs/routing" + dht "github.com/jbenet/go-ipfs/routing/dht" + offroute "github.com/jbenet/go-ipfs/routing/offline" + bstore "github.com/jbenet/go-ipfs/blocks/blockstore" bserv "github.com/jbenet/go-ipfs/blockservice" - diag "github.com/jbenet/go-ipfs/diagnostics" exchange "github.com/jbenet/go-ipfs/exchange" bitswap "github.com/jbenet/go-ipfs/exchange/bitswap" bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network" offline "github.com/jbenet/go-ipfs/exchange/offline" rp "github.com/jbenet/go-ipfs/exchange/reprovide" + mount "github.com/jbenet/go-ipfs/fuse/mount" merkledag "github.com/jbenet/go-ipfs/merkledag" namesys "github.com/jbenet/go-ipfs/namesys" - ic "github.com/jbenet/go-ipfs/p2p/crypto" - p2phost "github.com/jbenet/go-ipfs/p2p/host" - p2pbhost "github.com/jbenet/go-ipfs/p2p/host/basic" - swarm "github.com/jbenet/go-ipfs/p2p/net/swarm" - addrutil "github.com/jbenet/go-ipfs/p2p/net/swarm/addr" - peer "github.com/jbenet/go-ipfs/p2p/peer" path "github.com/jbenet/go-ipfs/path" pin "github.com/jbenet/go-ipfs/pin" repo "github.com/jbenet/go-ipfs/repo" config "github.com/jbenet/go-ipfs/repo/config" - routing "github.com/jbenet/go-ipfs/routing" - dht "github.com/jbenet/go-ipfs/routing/dht" - offroute "github.com/jbenet/go-ipfs/routing/offline" - eventlog "github.com/jbenet/go-ipfs/thirdparty/eventlog" - debugerror "github.com/jbenet/go-ipfs/util/debugerror" - lgbl "github.com/jbenet/go-ipfs/util/eventlog/loggables" ) const IpnsValidatorTag = "ipns" @@ -75,13 +78,14 @@ type IpfsNode struct { Resolver *path.Resolver // the path resolution system // Online - PrivateKey ic.PrivKey // the local node's private Key - PeerHost p2phost.Host // the network host (server+client) - Routing routing.IpfsRouting // the routing system. recommend ipfs-dht - Exchange exchange.Interface // the block exchange + strategy (bitswap) - Namesys namesys.NameSystem // the name system, resolves paths to hashes - Diagnostics *diag.Diagnostics // the diagnostics service - Reprovider *rp.Reprovider // the value reprovider system + PrivateKey ic.PrivKey // the local node's private Key + PeerHost p2phost.Host // the network host (server+client) + Bootstrapper io.Closer // the periodic bootstrapper + Routing routing.IpfsRouting // the routing system. recommend ipfs-dht + Exchange exchange.Interface // the block exchange + strategy (bitswap) + Namesys namesys.NameSystem // the name system, resolves paths to hashes + Diagnostics *diag.Diagnostics // the diagnostics service + Reprovider *rp.Reprovider // the value reprovider system ctxgroup.ContextGroup @@ -238,14 +242,7 @@ func (n *IpfsNode) StartOnlineServices(ctx context.Context) error { n.Reprovider = rp.NewReprovider(n.Routing, n.Blockstore) go n.Reprovider.ProvideEvery(ctx, kReprovideFrequency) - // prepare bootstrap peers from config - bpeers, err := n.loadBootstrapPeers() - if err != nil { - log.Event(ctx, "bootstrapError", n.Identity, lgbl.Error(err)) - log.Errorf("%s bootstrap error: %s", n.Identity, err) - return debugerror.Wrap(err) - } - return n.Bootstrap(ctx, bpeers) + return n.Bootstrap(DefaultBootstrapConfig) } // teardown closes owned children. If any errors occur, this function returns @@ -254,20 +251,20 @@ func (n *IpfsNode) teardown() error { // owned objects are closed in this teardown to ensure that they're closed // regardless of which constructor was used to add them to the node. var closers []io.Closer - if n.Repo != nil { - closers = append(closers, n.Repo) - } - if n.Blocks != nil { - closers = append(closers, n.Blocks) - } - if n.Routing != nil { - if dht, ok := n.Routing.(*dht.IpfsDHT); ok { - closers = append(closers, dht) + addCloser := func(c io.Closer) { + if c != nil { + closers = append(closers, c) } } - if n.PeerHost != nil { - closers = append(closers, n.PeerHost) + + addCloser(n.Bootstrapper) + addCloser(n.Repo) + addCloser(n.Blocks) + if dht, ok := n.Routing.(*dht.IpfsDHT); ok { + addCloser(dht) } + addCloser(n.PeerHost) + var errs []error for _, closer := range closers { if err := closer.Close(); err != nil { @@ -293,16 +290,34 @@ func (n *IpfsNode) Resolve(path string) (*merkledag.Node, error) { return n.Resolver.ResolvePath(path) } -// Bootstrap is undefined when node is not in OnlineMode -func (n *IpfsNode) Bootstrap(ctx context.Context, peers []peer.PeerInfo) error { +func (n *IpfsNode) Bootstrap(cfg BootstrapConfig) error { // TODO what should return value be when in offlineMode? if n.Routing == nil { return nil } - nb := nodeBootstrapper{n} - return nb.TryToBootstrap(ctx, peers) + if n.Bootstrapper != nil { + n.Bootstrapper.Close() // stop previous bootstrap process. + } + + // if the caller did not specify a bootstrap peer function, get the + // freshest bootstrap peers from config. this responds to live changes. + if cfg.BootstrapPeers == nil { + cfg.BootstrapPeers = func() []peer.PeerInfo { + bpeers := n.Repo.Config().Bootstrap + ps, err := toPeerInfos(bpeers) + if err != nil { + log.Error("failed to parse bootstrap peers from config: %s", bpeers) + return nil + } + return ps + } + } + + var err error + n.Bootstrapper, err = Bootstrap(n, cfg) + return err } func (n *IpfsNode) loadID() error { @@ -342,18 +357,6 @@ func (n *IpfsNode) loadPrivateKey() error { return nil } -func (n *IpfsNode) loadBootstrapPeers() ([]peer.PeerInfo, error) { - var peers []peer.PeerInfo - for _, bootstrap := range n.Repo.Config().Bootstrap { - p, err := toPeer(bootstrap) - if err != nil { - return nil, err - } - peers = append(peers, p) - } - return peers, nil -} - // SetupOfflineRouting loads the local nodes private key and // uses it to instantiate a routing system in offline mode. // This is primarily used for offline ipns modifications. diff --git a/routing/dht/dht_bootstrap.go b/routing/dht/dht_bootstrap.go index 588bcfd754d..c91df05e5f1 100644 --- a/routing/dht/dht_bootstrap.go +++ b/routing/dht/dht_bootstrap.go @@ -17,52 +17,42 @@ import ( periodicproc "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic" ) -// DefaultBootstrapQueries specifies how many queries to run, -// if the user does not specify a different number as an option. +// BootstrapConfig specifies parameters used bootstrapping the DHT. // -// For now, this is set to 16 queries, which is an aggressive number. -// We are currently more interested in ensuring we have a properly formed -// DHT than making sure our dht minimizes traffic. Once we are more certain -// of our implementation's robustness, we should lower this down to 8 or 4. -// -// Note there is also a tradeoff between the bootstrap period and the number -// of queries. We could support a higher period with a smaller number of -// queries -const DefaultBootstrapQueries = 1 +// Note there is a tradeoff between the bootstrap period and the +// number of queries. We could support a higher period with less +// queries. +type BootstrapConfig struct { + Queries int // how many queries to run per period + Period time.Duration // how often to run periodi cbootstrap. + Timeout time.Duration // how long to wait for a bootstrao query to run +} -// DefaultBootstrapPeriod specifies how often to periodically run bootstrap, -// if the user does not specify a different number as an option. -// -// For now, this is set to 10 seconds, which is an aggressive period. We are -// We are currently more interested in ensuring we have a properly formed -// DHT than making sure our dht minimizes traffic. Once we are more certain -// implementation's robustness, we should lower this down to 30s or 1m. -// -// Note there is also a tradeoff between the bootstrap period and the number -// of queries. We could support a higher period with a smaller number of -// queries -const DefaultBootstrapPeriod = time.Duration(10 * time.Second) - -// DefaultBootstrapTimeout specifies how long to wait for a bootstrap query -// to run. -const DefaultBootstrapTimeout = time.Duration(10 * time.Second) - -// Bootstrap runs bootstrapping once, then calls SignalBootstrap with default -// parameters: DefaultBootstrapQueries and DefaultBootstrapPeriod. This allows -// the user to catch an error off the bat if the connections are faulty. It also -// allows BootstrapOnSignal not to run bootstrap at the beginning, which is useful -// for instrumenting it on tests, or delaying bootstrap until the network is online -// and connected to at least a few nodes. -// -// Like PeriodicBootstrap, Bootstrap returns a process, so the user can stop it. -func (dht *IpfsDHT) Bootstrap(ctx context.Context) (goprocess.Process, error) { +var DefaultBootstrapConfig = BootstrapConfig{ + // For now, this is set to 1 query. + // We are currently more interested in ensuring we have a properly formed + // DHT than making sure our dht minimizes traffic. Once we are more certain + // of our implementation's robustness, we should lower this down to 8 or 4. + Queries: 1, - if err := dht.runBootstrap(ctx, DefaultBootstrapQueries); err != nil { - return nil, err - } + // For now, this is set to 10 seconds, which is an aggressive period. We are + // We are currently more interested in ensuring we have a properly formed + // DHT than making sure our dht minimizes traffic. Once we are more certain + // implementation's robustness, we should lower this down to 30s or 1m. + Period: time.Duration(20 * time.Second), - sig := time.Tick(DefaultBootstrapPeriod) - return dht.BootstrapOnSignal(DefaultBootstrapQueries, sig) + Timeout: time.Duration(20 * time.Second), +} + +// Bootstrap ensures the dht routing table remains healthy as peers come and go. +// it builds up a list of peers by requesting random peer IDs. The Bootstrap +// process will run a number of queries each time, and run every time signal fires. +// These parameters are configurable. +// +// Bootstrap returns a process, so the user can stop it. +func (dht *IpfsDHT) Bootstrap(config BootstrapConfig) (goprocess.Process, error) { + sig := time.Tick(config.Period) + return dht.BootstrapOnSignal(config, sig) } // SignalBootstrap ensures the dht routing table remains healthy as peers come and go. @@ -71,9 +61,9 @@ func (dht *IpfsDHT) Bootstrap(ctx context.Context) (goprocess.Process, error) { // These parameters are configurable. // // SignalBootstrap returns a process, so the user can stop it. -func (dht *IpfsDHT) BootstrapOnSignal(queries int, signal <-chan time.Time) (goprocess.Process, error) { - if queries <= 0 { - return nil, fmt.Errorf("invalid number of queries: %d", queries) +func (dht *IpfsDHT) BootstrapOnSignal(cfg BootstrapConfig, signal <-chan time.Time) (goprocess.Process, error) { + if cfg.Queries <= 0 { + return nil, fmt.Errorf("invalid number of queries: %d", cfg.Queries) } if signal == nil { @@ -85,27 +75,9 @@ func (dht *IpfsDHT) BootstrapOnSignal(queries int, signal <-chan time.Time) (gop // maybe this is a good case for whole module event pub/sub? ctx := dht.Context() - if err := dht.runBootstrap(ctx, queries); err != nil { + if err := dht.runBootstrap(ctx, cfg); err != nil { log.Error(err) // A bootstrapping error is important to notice but not fatal. - // maybe the client should be able to consume these errors, - // though I dont have a clear use case in mind-- what **could** - // the client do if one of the bootstrap calls fails? - // - // This is also related to the core's bootstrap failures. - // superviseConnections should perhaps allow clients to detect - // bootstrapping problems. - // - // Anyway, passing errors could be done with a bootstrapper object. - // this would imply the client should be able to consume a lot of - // other non-fatal dht errors too. providing this functionality - // should be done correctly DHT-wide. - // NB: whatever the design, clients must ensure they drain errors! - // This pattern is common to many things, perhaps long-running services - // should have something like an ErrStream that allows clients to consume - // periodic errors and take action. It should allow the user to also - // ignore all errors with something like an ErrStreamDiscard. We should - // study what other systems do for ideas. } }) @@ -113,7 +85,7 @@ func (dht *IpfsDHT) BootstrapOnSignal(queries int, signal <-chan time.Time) (gop } // runBootstrap builds up list of peers by requesting random peer IDs -func (dht *IpfsDHT) runBootstrap(ctx context.Context, queries int) error { +func (dht *IpfsDHT) runBootstrap(ctx context.Context, cfg BootstrapConfig) error { bslog := func(msg string) { log.Debugf("DHT %s dhtRunBootstrap %s -- routing table size: %d", dht.self, msg, dht.routingTable.Size()) } @@ -133,7 +105,7 @@ func (dht *IpfsDHT) runBootstrap(ctx context.Context, queries int) error { } // bootstrap sequentially, as results will compound - ctx, cancel := context.WithTimeout(ctx, DefaultBootstrapTimeout) + ctx, cancel := context.WithTimeout(ctx, cfg.Timeout) defer cancel() runQuery := func(ctx context.Context, id peer.ID) { p, err := dht.FindPeer(ctx, id) @@ -154,9 +126,9 @@ func (dht *IpfsDHT) runBootstrap(ctx context.Context, queries int) error { if sequential { // these should be parallel normally. but can make them sequential for debugging. // note that the core/bootstrap context deadline should be extended too for that. - for i := 0; i < queries; i++ { + for i := 0; i < cfg.Queries; i++ { id := randomID() - log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, queries, id) + log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, cfg.Queries, id) runQuery(ctx, id) } @@ -166,13 +138,13 @@ func (dht *IpfsDHT) runBootstrap(ctx context.Context, queries int) error { // normally, we should be selecting on ctx.Done() here too, but this gets // complicated to do with WaitGroup, and doesnt wait for the children to exit. var wg sync.WaitGroup - for i := 0; i < queries; i++ { + for i := 0; i < cfg.Queries; i++ { wg.Add(1) go func() { defer wg.Done() id := randomID() - log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, queries, id) + log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, cfg.Queries, id) runQuery(ctx, id) }() } diff --git a/test/epictest/addcat_test.go b/test/epictest/addcat_test.go index 047c312e941..9c5486ed62b 100644 --- a/test/epictest/addcat_test.go +++ b/test/epictest/addcat_test.go @@ -115,8 +115,15 @@ func DirectAddCat(data []byte, conf testutil.LatencyConfig) error { } defer catter.Close() - catter.Bootstrap(ctx, []peer.PeerInfo{adder.Peerstore.PeerInfo(adder.Identity)}) - adder.Bootstrap(ctx, []peer.PeerInfo{catter.Peerstore.PeerInfo(catter.Identity)}) + bs1 := []peer.PeerInfo{adder.Peerstore.PeerInfo(adder.Identity)} + bs2 := []peer.PeerInfo{catter.Peerstore.PeerInfo(catter.Identity)} + + if err := catter.Bootstrap(core.BootstrapConfigWithPeers(bs1)); err != nil { + return err + } + if err := adder.Bootstrap(core.BootstrapConfigWithPeers(bs2)); err != nil { + return err + } keyAdded, err := coreunix.Add(adder, bytes.NewReader(data)) if err != nil { diff --git a/test/epictest/three_legged_cat_test.go b/test/epictest/three_legged_cat_test.go index c86403ff70b..ff3f036a372 100644 --- a/test/epictest/three_legged_cat_test.go +++ b/test/epictest/three_legged_cat_test.go @@ -62,9 +62,15 @@ func RunThreeLeggedCat(data []byte, conf testutil.LatencyConfig) error { return err } defer bootstrap.Close() - boostrapInfo := bootstrap.Peerstore.PeerInfo(bootstrap.PeerHost.ID()) - adder.Bootstrap(ctx, []peer.PeerInfo{boostrapInfo}) - catter.Bootstrap(ctx, []peer.PeerInfo{boostrapInfo}) + + bis := bootstrap.Peerstore.PeerInfo(bootstrap.PeerHost.ID()) + bcfg := core.BootstrapConfigWithPeers([]peer.PeerInfo{bis}) + if err := adder.Bootstrap(bcfg); err != nil { + return err + } + if err := catter.Bootstrap(bcfg); err != nil { + return err + } keyAdded, err := coreunix.Add(adder, bytes.NewReader(data)) if err != nil { From 4a5f5e2e2b1da51c56beebfdadc8e8669f6ff951 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Fri, 23 Jan 2015 04:36:32 -0800 Subject: [PATCH 19/20] reprovide: wait a minute before reproviding Many times, a node will start up only to shut down immediately. In these cases, reproviding is costly to both the node, and the rest of the network. Also note: the probability of a node being up another minute increases with uptime. TODO: maybe this should be 5 * time.Minute --- exchange/reprovide/reprovide.go | 5 ++++- pin/indirect.go | 2 +- routing/dht/dht_test.go | 12 ++++++++++-- 3 files changed, 15 insertions(+), 4 deletions(-) diff --git a/exchange/reprovide/reprovide.go b/exchange/reprovide/reprovide.go index 4895e8ccbf1..1534b4ec4e5 100644 --- a/exchange/reprovide/reprovide.go +++ b/exchange/reprovide/reprovide.go @@ -30,7 +30,10 @@ func NewReprovider(rsys routing.IpfsRouting, bstore blocks.Blockstore) *Reprovid } func (rp *Reprovider) ProvideEvery(ctx context.Context, tick time.Duration) { - after := time.After(0) + // dont reprovide immediately. + // may have just started the daemon and shutting it down immediately. + // probability( up another minute | uptime ) increases with uptime. + after := time.After(time.Minute) for { select { case <-ctx.Done(): diff --git a/pin/indirect.go b/pin/indirect.go index 9e67bc2c9ff..09decbb2549 100644 --- a/pin/indirect.go +++ b/pin/indirect.go @@ -32,7 +32,7 @@ func loadIndirPin(d ds.Datastore, k ds.Key) (*indirectPin, error) { keys = append(keys, k) refcnt[k] = v } - log.Debugf("indirPin keys: %#v", keys) + // log.Debugf("indirPin keys: %#v", keys) return &indirectPin{blockset: set.SimpleSetFromKeys(keys), refCounts: refcnt}, nil } diff --git a/routing/dht/dht_test.go b/routing/dht/dht_test.go index 2e1e1129fa5..b7b9faf71d3 100644 --- a/routing/dht/dht_test.go +++ b/routing/dht/dht_test.go @@ -82,10 +82,14 @@ func bootstrap(t *testing.T, ctx context.Context, dhts []*IpfsDHT) { // 100 sync https://gist.github.com/jbenet/6c59e7c15426e48aaedd // probably because results compound + var cfg BootstrapConfig + cfg = DefaultBootstrapConfig + cfg.Queries = 3 + start := rand.Intn(len(dhts)) // randomize to decrease bias. for i := range dhts { dht := dhts[(start+i)%len(dhts)] - dht.runBootstrap(ctx, 3) + dht.runBootstrap(ctx, cfg) } cancel() } @@ -356,11 +360,15 @@ func TestPeriodicBootstrap(t *testing.T) { signal := make(chan time.Time) allSignals := []chan time.Time{} + var cfg BootstrapConfig + cfg = DefaultBootstrapConfig + cfg.Queries = 5 + // kick off periodic bootstrappers with instrumented signals. for _, dht := range dhts { s := make(chan time.Time) allSignals = append(allSignals, s) - dht.BootstrapOnSignal(5, s) + dht.BootstrapOnSignal(cfg, s) } go amplify(signal, allSignals) From 5c33b75b59bc40ff7f2ab11223dc6d0366904d55 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Fri, 23 Jan 2015 05:24:23 -0800 Subject: [PATCH 20/20] p2p/net/conn: timeouts are real failures. --- p2p/net/conn/dial.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/p2p/net/conn/dial.go b/p2p/net/conn/dial.go index 33f6e07b282..adf08711050 100644 --- a/p2p/net/conn/dial.go +++ b/p2p/net/conn/dial.go @@ -147,6 +147,11 @@ func reuseErrShouldRetry(err error) bool { return false // hey, it worked! no need to retry. } + // if it's a network timeout error, it's a legitimate failure. + if nerr, ok := err.(net.Error); ok && nerr.Timeout() { + return true + } + errno, ok := err.(syscall.Errno) if !ok { // not an errno? who knows what this is. retry. return true