diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index f7c7ab7cc7e..5c85e41170e 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -172,7 +172,7 @@ }, { "ImportPath": "github.com/jbenet/goprocess", - "Rev": "7f96033e206c3cd4e79d1c61cbdfff57869feaf8" + "Rev": "c37725a4a97d6ad772818b071ceef82789562142" }, { "ImportPath": "github.com/kr/binarydist", diff --git a/Godeps/_workspace/src/github.com/jbenet/go-logging/examples/example.go b/Godeps/_workspace/src/github.com/jbenet/go-logging/examples/example.go index 28168d00e22..0ad8a7beea8 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-logging/examples/example.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-logging/examples/example.go @@ -3,7 +3,7 @@ package main import ( "os" - "github.com/op/go-logging" + "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-logging" ) var log = logging.MustGetLogger("example") diff --git a/Godeps/_workspace/src/github.com/jbenet/goprocess/.travis.yml b/Godeps/_workspace/src/github.com/jbenet/goprocess/.travis.yml new file mode 100644 index 00000000000..7669438ed9a --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/goprocess/.travis.yml @@ -0,0 +1,11 @@ +language: go + +go: + - 1.2 + - 1.3 + - 1.4 + - release + - tip + +script: + - go test -v ./... diff --git a/Godeps/_workspace/src/github.com/jbenet/goprocess/LICENSE b/Godeps/_workspace/src/github.com/jbenet/goprocess/LICENSE new file mode 100644 index 00000000000..c7386b3c940 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/goprocess/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2014 Juan Batiz-Benet + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/Godeps/_workspace/src/github.com/jbenet/goprocess/README.md b/Godeps/_workspace/src/github.com/jbenet/goprocess/README.md index a19ed31978c..e2f12e16d65 100644 --- a/Godeps/_workspace/src/github.com/jbenet/goprocess/README.md +++ b/Godeps/_workspace/src/github.com/jbenet/goprocess/README.md @@ -1,5 +1,7 @@ # goprocess - lifecycles in go +[![travisbadge](https://travis-ci.org/jbenet/goprocess.svg)](https://travis-ci.org/jbenet/goprocess) + (Based on https://github.com/jbenet/go-ctxgroup) - Godoc: https://godoc.org/github.com/jbenet/goprocess diff --git a/Godeps/_workspace/src/github.com/jbenet/goprocess/goprocess.go b/Godeps/_workspace/src/github.com/jbenet/goprocess/goprocess.go index afe848c6136..762cecb2075 100644 --- a/Godeps/_workspace/src/github.com/jbenet/goprocess/goprocess.go +++ b/Godeps/_workspace/src/github.com/jbenet/goprocess/goprocess.go @@ -18,7 +18,7 @@ import ( // More specifically, it fits this: // // p := WithTeardown(tf) // new process is created, it is now running. -// p.AddChild(q) // can register children **before** Closing. +// p.AddChild(q) // can register children **before** Closed(). // go p.Close() // blocks until done running teardown func. // <-p.Closing() // would now return true. // <-p.childrenDone() // wait on all children to be done diff --git a/Godeps/_workspace/src/github.com/jbenet/goprocess/impl-goroutines.go b/Godeps/_workspace/src/github.com/jbenet/goprocess/impl-goroutines.go deleted file mode 100644 index 831dc938fb0..00000000000 --- a/Godeps/_workspace/src/github.com/jbenet/goprocess/impl-goroutines.go +++ /dev/null @@ -1,114 +0,0 @@ -// +build ignore - -// WARNING: this implementation is not correct. -// here only for historical purposes. - -package goprocess - -import ( - "sync" -) - -// process implements Process -type process struct { - children sync.WaitGroup // wait group for child goroutines - teardown TeardownFunc // called to run the teardown logic. - closing chan struct{} // closed once close starts. - closed chan struct{} // closed once close is done. - closeOnce sync.Once // ensure close is only called once. - closeErr error // error to return to clients of Close() -} - -// newProcess constructs and returns a Process. -// It will call tf TeardownFunc exactly once: -// **after** all children have fully Closed, -// **after** entering <-Closing(), and -// **before** <-Closed(). -func newProcess(tf TeardownFunc) *process { - if tf == nil { - tf = nilTeardownFunc - } - - return &process{ - teardown: tf, - closed: make(chan struct{}), - closing: make(chan struct{}), - } -} - -func (p *process) WaitFor(q Process) { - p.children.Add(1) // p waits on q to be done - go func(p *process, q Process) { - <-q.Closed() // wait until q is closed - p.children.Done() // p done waiting on q - }(p, q) -} - -func (p *process) AddChildNoWait(child Process) { - go func(p, child Process) { - <-p.Closing() // wait until p is closing - child.Close() // close child - }(p, child) -} - -func (p *process) AddChild(child Process) { - select { - case <-p.Closing(): - panic("attempt to add child to closing or closed process") - default: - } - - p.children.Add(1) // p waits on child to be done - go func(p *process, child Process) { - <-p.Closing() // wait until p is closing - child.Close() // close child and wait - p.children.Done() // p done waiting on child - }(p, child) -} - -func (p *process) Go(f ProcessFunc) Process { - select { - case <-p.Closing(): - panic("attempt to add child to closing or closed process") - default: - } - - // this is very similar to AddChild, but also runs the func - // in the child. we replicate it here to save one goroutine. - child := newProcessGoroutines(nil) - child.children.Add(1) // child waits on func to be done - p.AddChild(child) - go func() { - f(child) - child.children.Done() // wait on child's children to be done. - child.Close() // close to tear down. - }() - return child -} - -// Close is the external close function. -// it's a wrapper around internalClose that waits on Closed() -func (p *process) Close() error { - p.closeOnce.Do(p.doClose) - <-p.Closed() // sync.Once should block, but this checks chan is closed too - return p.closeErr -} - -func (p *process) Closing() <-chan struct{} { - return p.closing -} - -func (p *process) Closed() <-chan struct{} { - return p.closed -} - -// the _actual_ close process. -func (p *process) doClose() { - // this function should only be called once (hence the sync.Once). - // and it will panic (on closing channels) otherwise. - - close(p.closing) // signal that we're shutting down (Closing) - p.children.Wait() // wait till all children are done (before teardown) - p.closeErr = p.teardown() // actually run the close logic (ok safe to teardown) - close(p.closed) // signal that we're shut down (Closed) -} diff --git a/Godeps/_workspace/src/github.com/jbenet/goprocess/impl-mutex.go b/Godeps/_workspace/src/github.com/jbenet/goprocess/impl-mutex.go index ed68b9a0349..633d5b056ab 100644 --- a/Godeps/_workspace/src/github.com/jbenet/goprocess/impl-mutex.go +++ b/Godeps/_workspace/src/github.com/jbenet/goprocess/impl-mutex.go @@ -92,16 +92,18 @@ func (p *process) Go(f ProcessFunc) Process { // it's a wrapper around internalClose that waits on Closed() func (p *process) Close() error { p.Lock() - defer p.Unlock() - // if already closed, get out. + // if already closing, or closed, get out. (but wait!) select { - case <-p.Closed(): + case <-p.Closing(): + p.Unlock() + <-p.Closed() return p.closeErr default: } p.doClose() + p.Unlock() return p.closeErr } @@ -120,12 +122,23 @@ func (p *process) doClose() { close(p.closing) // signal that we're shutting down (Closing) - for _, c := range p.children { - go c.Close() // force all children to shut down - } - - for _, w := range p.waitfors { - <-w.Closed() // wait till all waitfors are fully closed (before teardown) + for len(p.children) > 0 || len(p.waitfors) > 0 { + for _, c := range p.children { + go c.Close() // force all children to shut down + } + p.children = nil // clear them + + // we must be careful not to iterate over waitfors directly, as it may + // change under our feet. + wf := p.waitfors + p.waitfors = nil // clear them + for _, w := range wf { + // Here, we wait UNLOCKED, so that waitfors who are in the middle of + // adding a child to us can finish. we will immediately close the child. + p.Unlock() + <-w.Closed() // wait till all waitfors are fully closed (before teardown) + p.Lock() + } } p.closeErr = p.teardown() // actually run the close logic (ok safe to teardown) diff --git a/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic/README.md b/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic/README.md new file mode 100644 index 00000000000..7a2c55db1c6 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic/README.md @@ -0,0 +1,4 @@ +# goprocess/periodic - periodic process creation + +- goprocess: https://github.com/jbenet/goprocess +- Godoc: https://godoc.org/github.com/jbenet/goprocess/periodic diff --git a/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic/examples_test.go b/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic/examples_test.go new file mode 100644 index 00000000000..782353db158 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic/examples_test.go @@ -0,0 +1,85 @@ +package periodicproc_test + +import ( + "fmt" + "time" + + goprocess "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" + periodicproc "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic" +) + +func ExampleEvery() { + tock := make(chan struct{}) + + i := 0 + p := periodicproc.Every(time.Second, func(proc goprocess.Process) { + tock <- struct{}{} + fmt.Printf("hello %d\n", i) + i++ + }) + + <-tock + <-tock + <-tock + p.Close() + + // Output: + // hello 0 + // hello 1 + // hello 2 +} + +func ExampleTick() { + p := periodicproc.Tick(time.Second, func(proc goprocess.Process) { + fmt.Println("tick") + }) + + <-time.After(3*time.Second + 500*time.Millisecond) + p.Close() + + // Output: + // tick + // tick + // tick +} + +func ExampleTickGo() { + + // with TickGo, execution is not rate limited, + // there can be many in-flight simultaneously + + wait := make(chan struct{}) + p := periodicproc.TickGo(time.Second, func(proc goprocess.Process) { + fmt.Println("tick") + <-wait + }) + + <-time.After(3*time.Second + 500*time.Millisecond) + + wait <- struct{}{} + wait <- struct{}{} + wait <- struct{}{} + p.Close() // blocks us until all children are closed. + + // Output: + // tick + // tick + // tick +} + +func ExampleOnSignal() { + sig := make(chan struct{}) + p := periodicproc.OnSignal(sig, func(proc goprocess.Process) { + fmt.Println("fire!") + }) + + sig <- struct{}{} + sig <- struct{}{} + sig <- struct{}{} + p.Close() + + // Output: + // fire! + // fire! + // fire! +} diff --git a/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic/periodic.go b/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic/periodic.go new file mode 100644 index 00000000000..ce1c4611e9c --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic/periodic.go @@ -0,0 +1,232 @@ +// Package periodic is part of github.com/jbenet/goprocess. +// It provides a simple periodic processor that calls a function +// periodically based on some options. +// +// For example: +// +// // use a time.Duration +// p := periodicproc.Every(time.Second, func(proc goprocess.Process) { +// fmt.Printf("the time is %s and all is well", time.Now()) +// }) +// +// <-time.After(5*time.Second) +// p.Close() +// +// // use a time.Time channel (like time.Ticker) +// p := periodicproc.Tick(time.Tick(time.Second), func(proc goprocess.Process) { +// fmt.Printf("the time is %s and all is well", time.Now()) +// }) +// +// <-time.After(5*time.Second) +// p.Close() +// +// // or arbitrary signals +// signal := make(chan struct{}) +// p := periodicproc.OnSignal(signal, func(proc goprocess.Process) { +// fmt.Printf("the time is %s and all is well", time.Now()) +// }) +// +// signal<- struct{}{} +// signal<- struct{}{} +// <-time.After(5 * time.Second) +// signal<- struct{}{} +// p.Close() +// +package periodicproc + +import ( + "time" + + gp "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" +) + +// Every calls the given ProcessFunc at periodic intervals. Internally, it uses +// <-time.After(interval), so it will have the behavior of waiting _at least_ +// interval in between calls. If you'd prefer the time.Ticker behavior, use +// periodicproc.Tick instead. +// This is sequentially rate limited, only one call will be in-flight at a time. +func Every(interval time.Duration, procfunc gp.ProcessFunc) gp.Process { + return gp.Go(func(proc gp.Process) { + for { + select { + case <-time.After(interval): + select { + case <-proc.Go(procfunc).Closed(): // spin it out as a child, and wait till it's done. + case <-proc.Closing(): // we're told to close + return + } + case <-proc.Closing(): // we're told to close + return + } + } + }) +} + +// EveryGo calls the given ProcessFunc at periodic intervals. Internally, it uses +// <-time.After(interval) +// This is not rate limited, multiple calls could be in-flight at the same time. +func EveryGo(interval time.Duration, procfunc gp.ProcessFunc) gp.Process { + return gp.Go(func(proc gp.Process) { + for { + select { + case <-time.After(interval): + proc.Go(procfunc) + case <-proc.Closing(): // we're told to close + return + } + } + }) +} + +// Tick constructs a ticker with interval, and calls the given ProcessFunc every +// time the ticker fires. +// This is sequentially rate limited, only one call will be in-flight at a time. +// +// p := periodicproc.Tick(time.Second, func(proc goprocess.Process) { +// fmt.Println("fire!") +// }) +// +// <-time.After(3 * time.Second) +// p.Close() +// +// // Output: +// // fire! +// // fire! +// // fire! +func Tick(interval time.Duration, procfunc gp.ProcessFunc) gp.Process { + return gp.Go(func(proc gp.Process) { + ticker := time.NewTicker(interval) + callOnTicker(ticker.C, procfunc)(proc) + ticker.Stop() + }) +} + +// TickGo constructs a ticker with interval, and calls the given ProcessFunc every +// time the ticker fires. +// This is not rate limited, multiple calls could be in-flight at the same time. +// +// p := periodicproc.TickGo(time.Second, func(proc goprocess.Process) { +// fmt.Println("fire!") +// <-time.After(10 * time.Second) // will not block sequential execution +// }) +// +// <-time.After(3 * time.Second) +// p.Close() +// +// // Output: +// // fire! +// // fire! +// // fire! +func TickGo(interval time.Duration, procfunc gp.ProcessFunc) gp.Process { + return gp.Go(func(proc gp.Process) { + ticker := time.NewTicker(interval) + goCallOnTicker(ticker.C, procfunc)(proc) + ticker.Stop() + }) +} + +// Ticker calls the given ProcessFunc every time the ticker fires. +// This is sequentially rate limited, only one call will be in-flight at a time. +func Ticker(ticker <-chan time.Time, procfunc gp.ProcessFunc) gp.Process { + return gp.Go(callOnTicker(ticker, procfunc)) +} + +// TickerGo calls the given ProcessFunc every time the ticker fires. +// This is not rate limited, multiple calls could be in-flight at the same time. +func TickerGo(ticker <-chan time.Time, procfunc gp.ProcessFunc) gp.Process { + return gp.Go(goCallOnTicker(ticker, procfunc)) +} + +func callOnTicker(ticker <-chan time.Time, pf gp.ProcessFunc) gp.ProcessFunc { + return func(proc gp.Process) { + for { + select { + case <-ticker: + select { + case <-proc.Go(pf).Closed(): // spin it out as a child, and wait till it's done. + case <-proc.Closing(): // we're told to close + return + } + case <-proc.Closing(): // we're told to close + return + } + } + } +} + +func goCallOnTicker(ticker <-chan time.Time, pf gp.ProcessFunc) gp.ProcessFunc { + return func(proc gp.Process) { + for { + select { + case <-ticker: + proc.Go(pf) + case <-proc.Closing(): // we're told to close + return + } + } + } +} + +// OnSignal calls the given ProcessFunc every time the signal fires, and waits for it to exit. +// This is sequentially rate limited, only one call will be in-flight at a time. +// +// sig := make(chan struct{}) +// p := periodicproc.OnSignal(sig, func(proc goprocess.Process) { +// fmt.Println("fire!") +// <-time.After(time.Second) // delays sequential execution by 1 second +// }) +// +// sig<- struct{} +// sig<- struct{} +// sig<- struct{} +// +// // Output: +// // fire! +// // fire! +// // fire! +func OnSignal(sig <-chan struct{}, procfunc gp.ProcessFunc) gp.Process { + return gp.Go(func(proc gp.Process) { + for { + select { + case <-sig: + select { + case <-proc.Go(procfunc).Closed(): // spin it out as a child, and wait till it's done. + case <-proc.Closing(): // we're told to close + return + } + case <-proc.Closing(): // we're told to close + return + } + } + }) +} + +// OnSignalGo calls the given ProcessFunc every time the signal fires. +// This is not rate limited, multiple calls could be in-flight at the same time. +// +// sig := make(chan struct{}) +// p := periodicproc.OnSignalGo(sig, func(proc goprocess.Process) { +// fmt.Println("fire!") +// <-time.After(time.Second) // wont block execution +// }) +// +// sig<- struct{} +// sig<- struct{} +// sig<- struct{} +// +// // Output: +// // fire! +// // fire! +// // fire! +func OnSignalGo(sig <-chan struct{}, procfunc gp.ProcessFunc) gp.Process { + return gp.Go(func(proc gp.Process) { + for { + select { + case <-sig: + proc.Go(procfunc) + case <-proc.Closing(): // we're told to close + return + } + } + }) +} diff --git a/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic/periodic_test.go b/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic/periodic_test.go new file mode 100644 index 00000000000..c79ed50c64d --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic/periodic_test.go @@ -0,0 +1,260 @@ +package periodicproc + +import ( + "testing" + "time" + + ci "github.com/jbenet/go-cienv" + gp "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" +) + +var ( + grace = time.Millisecond * 5 + interval = time.Millisecond * 10 + timeout = time.Second * 5 +) + +func init() { + if ci.IsRunning() { + grace = time.Millisecond * 500 + interval = time.Millisecond * 1000 + timeout = time.Second * 15 + } +} + +func between(min, diff, max time.Duration) bool { + return min <= diff && diff <= max +} + +func testBetween(t *testing.T, min, diff, max time.Duration) { + if !between(min, diff, max) { + t.Error("time diff incorrect:", min, diff, max) + } +} + +type intervalFunc func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) + +func testSeq(t *testing.T, toTest intervalFunc) { + t.Parallel() + + last := time.Now() + times := make(chan time.Time, 10) + p := toTest(times, nil) + + for i := 0; i < 5; i++ { + next := <-times + testBetween(t, interval-grace, next.Sub(last), interval+grace) + last = next + } + + go p.Close() + select { + case <-p.Closed(): + case <-time.After(timeout): + t.Error("proc failed to close") + } +} + +func testSeqWait(t *testing.T, toTest intervalFunc) { + t.Parallel() + + last := time.Now() + times := make(chan time.Time, 10) + wait := make(chan struct{}) + p := toTest(times, wait) + + for i := 0; i < 5; i++ { + next := <-times + testBetween(t, interval-grace, next.Sub(last), interval+grace) + + <-time.After(interval * 2) // make it wait. + last = time.Now() // make it now (sequential) + wait <- struct{}{} // release it. + } + + go p.Close() + + select { + case <-p.Closed(): + case <-time.After(timeout): + t.Error("proc failed to close") + } +} + +func testSeqNoWait(t *testing.T, toTest intervalFunc) { + t.Parallel() + + last := time.Now() + times := make(chan time.Time, 10) + wait := make(chan struct{}) + p := toTest(times, wait) + + for i := 0; i < 5; i++ { + next := <-times + testBetween(t, 0, next.Sub(last), interval+grace) // min of 0 + + <-time.After(interval * 2) // make it wait. + last = time.Now() // make it now (sequential) + wait <- struct{}{} // release it. + } + + go p.Close() + +end: + select { + case wait <- struct{}{}: // drain any extras. + goto end + case <-p.Closed(): + case <-time.After(timeout): + t.Error("proc failed to close") + } +} + +func testParallel(t *testing.T, toTest intervalFunc) { + t.Parallel() + + last := time.Now() + times := make(chan time.Time, 10) + wait := make(chan struct{}) + p := toTest(times, wait) + + for i := 0; i < 5; i++ { + next := <-times + testBetween(t, interval-grace, next.Sub(last), interval+grace) + last = next + + <-time.After(interval * 2) // make it wait. + wait <- struct{}{} // release it. + } + + go p.Close() + +end: + select { + case wait <- struct{}{}: // drain any extras. + goto end + case <-p.Closed(): + case <-time.After(timeout): + t.Error("proc failed to close") + } +} + +func TestEverySeq(t *testing.T) { + testSeq(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) { + return Every(interval, func(proc gp.Process) { + times <- time.Now() + }) + }) +} + +func TestEverySeqWait(t *testing.T) { + testSeqWait(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) { + return Every(interval, func(proc gp.Process) { + times <- time.Now() + select { + case <-wait: + case <-proc.Closing(): + } + }) + }) +} + +func TestEveryGoSeq(t *testing.T) { + testSeq(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) { + return EveryGo(interval, func(proc gp.Process) { + times <- time.Now() + }) + }) +} + +func TestEveryGoSeqParallel(t *testing.T) { + testParallel(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) { + return EveryGo(interval, func(proc gp.Process) { + times <- time.Now() + select { + case <-wait: + case <-proc.Closing(): + } + }) + }) +} + +func TestTickSeq(t *testing.T) { + testSeq(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) { + return Tick(interval, func(proc gp.Process) { + times <- time.Now() + }) + }) +} + +func TestTickSeqNoWait(t *testing.T) { + testSeqNoWait(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) { + return Tick(interval, func(proc gp.Process) { + times <- time.Now() + select { + case <-wait: + case <-proc.Closing(): + } + }) + }) +} + +func TestTickGoSeq(t *testing.T) { + testSeq(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) { + return TickGo(interval, func(proc gp.Process) { + times <- time.Now() + }) + }) +} + +func TestTickGoSeqParallel(t *testing.T) { + testParallel(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) { + return TickGo(interval, func(proc gp.Process) { + times <- time.Now() + select { + case <-wait: + case <-proc.Closing(): + } + }) + }) +} + +func TestTickerSeq(t *testing.T) { + testSeq(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) { + return Ticker(time.Tick(interval), func(proc gp.Process) { + times <- time.Now() + }) + }) +} + +func TestTickerSeqNoWait(t *testing.T) { + testSeqNoWait(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) { + return Ticker(time.Tick(interval), func(proc gp.Process) { + times <- time.Now() + select { + case <-wait: + case <-proc.Closing(): + } + }) + }) +} + +func TestTickerGoSeq(t *testing.T) { + testSeq(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) { + return TickerGo(time.Tick(interval), func(proc gp.Process) { + times <- time.Now() + }) + }) +} + +func TestTickerGoParallel(t *testing.T) { + testParallel(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) { + return TickerGo(time.Tick(interval), func(proc gp.Process) { + times <- time.Now() + select { + case <-wait: + case <-proc.Closing(): + } + }) + }) +} diff --git a/core/bootstrap.go b/core/bootstrap.go index 72a8be52508..84bf8e65203 100644 --- a/core/bootstrap.go +++ b/core/bootstrap.go @@ -2,6 +2,9 @@ package core import ( "errors" + "fmt" + "io" + "io/ioutil" "math/rand" "sync" "time" @@ -16,119 +19,213 @@ import ( context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" + goprocess "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" + procctx "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context" + periodicproc "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic" ) -const ( - period = 30 * time.Second // how often to check connection status - connectiontimeout time.Duration = period / 3 // duration to wait when attempting to connect - recoveryThreshold = 4 // attempt to bootstrap if connection count falls below this value - numDHTBootstrapQueries = 15 // number of DHT queries to execute -) +// ErrNotEnoughBootstrapPeers signals that we do not have enough bootstrap +// peers to bootstrap correctly. +var ErrNotEnoughBootstrapPeers = errors.New("not enough bootstrap peers to bootstrap") + +// BootstrapConfig specifies parameters used in an IpfsNode's network +// bootstrapping process. +type BootstrapConfig struct { + + // MinPeerThreshold governs whether to bootstrap more connections. If the + // node has less open connections than this number, it will open connections + // to the bootstrap nodes. From there, the routing system should be able + // to use the connections to the bootstrap nodes to connect to even more + // peers. Routing systems like the IpfsDHT do so in their own Bootstrap + // process, which issues random queries to find more peers. + MinPeerThreshold int + + // Period governs the periodic interval at which the node will + // attempt to bootstrap. The bootstrap process is not very expensive, so + // this threshold can afford to be small (<=30s). + Period time.Duration + + // ConnectionTimeout determines how long to wait for a bootstrap + // connection attempt before cancelling it. + ConnectionTimeout time.Duration + + // BootstrapPeers is a function that returns a set of bootstrap peers + // for the bootstrap process to use. This makes it possible for clients + // to control the peers the process uses at any moment. + BootstrapPeers func() []peer.PeerInfo +} -func superviseConnections(parent context.Context, - h host.Host, - route *dht.IpfsDHT, // TODO depend on abstract interface for testing purposes - store peer.Peerstore, - peers []peer.PeerInfo) error { +// DefaultBootstrapConfig specifies default sane parameters for bootstrapping. +var DefaultBootstrapConfig = BootstrapConfig{ + MinPeerThreshold: 4, + Period: 30 * time.Second, + ConnectionTimeout: (30 * time.Second) / 3, // Perod / 3 +} - for { - ctx, _ := context.WithTimeout(parent, connectiontimeout) - // TODO get config from disk so |peers| always reflects the latest - // information - if err := bootstrap(ctx, h, route, store, peers); err != nil { - log.Error(err) - } - select { - case <-parent.Done(): - return parent.Err() - case <-time.Tick(period): - } +func BootstrapConfigWithPeers(pis []peer.PeerInfo) BootstrapConfig { + cfg := DefaultBootstrapConfig + cfg.BootstrapPeers = func() []peer.PeerInfo { + return pis } - return nil + return cfg } -func bootstrap(ctx context.Context, - h host.Host, - r *dht.IpfsDHT, - ps peer.Peerstore, - bootstrapPeers []peer.PeerInfo) error { +// Bootstrap kicks off IpfsNode bootstrapping. This function will periodically +// check the number of open connections and -- if there are too few -- initiate +// connections to well-known bootstrap peers. It also kicks off subsystem +// bootstrapping (i.e. routing). +func Bootstrap(n *IpfsNode, cfg BootstrapConfig) (io.Closer, error) { + + // TODO what bootstrapping should happen if there is no DHT? i.e. we could + // continue connecting to our bootstrap peers, but for what purpose? for now + // simply exit without connecting to any of them. When we introduce another + // routing system that uses bootstrap peers we can change this. + thedht, ok := n.Routing.(*dht.IpfsDHT) + if !ok { + return ioutil.NopCloser(nil), nil + } - connectedPeers := h.Network().Peers() - if len(connectedPeers) >= recoveryThreshold { - log.Event(ctx, "bootstrapSkip", h.ID()) - log.Debugf("%s bootstrap skipped -- connected to %d (> %d) nodes", - h.ID(), len(connectedPeers), recoveryThreshold) + // the periodic bootstrap function -- the connection supervisor + periodic := func(worker goprocess.Process) { + ctx := procctx.WithProcessClosing(context.Background(), worker) + defer log.EventBegin(ctx, "periodicBootstrap", n.Identity).Done() - return nil + if err := bootstrapRound(ctx, n.PeerHost, thedht, n.Peerstore, cfg); err != nil { + log.Event(ctx, "bootstrapError", n.Identity, lgbl.Error(err)) + log.Errorf("%s bootstrap error: %s", n.Identity, err) + } + } + + // kick off the node's periodic bootstrapping + proc := periodicproc.Tick(cfg.Period, periodic) + proc.Go(periodic) // run one right now. + + // kick off dht bootstrapping. + dbproc, err := thedht.Bootstrap(dht.DefaultBootstrapConfig) + if err != nil { + proc.Close() + return nil, err } - numCxnsToCreate := recoveryThreshold - len(connectedPeers) - log.Event(ctx, "bootstrapStart", h.ID()) - log.Debugf("%s bootstrapping to %d more nodes", h.ID(), numCxnsToCreate) + // add dht bootstrap proc as a child, so it is closed automatically when we are. + proc.AddChild(dbproc) + return proc, nil +} +func bootstrapRound(ctx context.Context, + host host.Host, + route *dht.IpfsDHT, + peerstore peer.Peerstore, + cfg BootstrapConfig) error { + + ctx, _ = context.WithTimeout(ctx, cfg.ConnectionTimeout) + id := host.ID() + + // get bootstrap peers from config. retrieving them here makes + // sure we remain observant of changes to client configuration. + peers := cfg.BootstrapPeers() + + // determine how many bootstrap connections to open + connected := host.Network().Peers() + if len(connected) >= cfg.MinPeerThreshold { + log.Event(ctx, "bootstrapSkip", id) + log.Debugf("%s core bootstrap skipped -- connected to %d (> %d) nodes", + id, len(connected), cfg.MinPeerThreshold) + return nil + } + numToDial := cfg.MinPeerThreshold - len(connected) + + // filter out bootstrap nodes we are already connected to var notConnected []peer.PeerInfo - for _, p := range bootstrapPeers { - if h.Network().Connectedness(p.ID) != inet.Connected { + for _, p := range peers { + if host.Network().Connectedness(p.ID) != inet.Connected { notConnected = append(notConnected, p) } } - // if not connected to all bootstrap peer candidates - if len(notConnected) > 0 { - var randomSubset = randomSubsetOfPeers(notConnected, numCxnsToCreate) - log.Debugf("%s bootstrapping to %d nodes: %s", h.ID(), numCxnsToCreate, randomSubset) - if err := connect(ctx, ps, r, randomSubset); err != nil { - log.Event(ctx, "bootstrapError", h.ID(), lgbl.Error(err)) - log.Errorf("%s bootstrap error: %s", h.ID(), err) - return err - } + // if connected to all bootstrap peer candidates, exit + if len(notConnected) < 1 { + log.Debugf("%s no more bootstrap peers to create %d connections", id, numToDial) + return ErrNotEnoughBootstrapPeers } - // we can try running dht bootstrap even if we're connected to all bootstrap peers. - if len(h.Network().Conns()) > 0 { - if err := r.Bootstrap(ctx, numDHTBootstrapQueries); err != nil { - // log this as Info. later on, discern better between errors. - log.Infof("dht bootstrap err: %s", err) - return nil - } + // connect to a random susbset of bootstrap candidates + randSubset := randomSubsetOfPeers(notConnected, numToDial) + + defer log.EventBegin(ctx, "bootstrapStart", id).Done() + log.Debugf("%s bootstrapping to %d nodes: %s", id, numToDial, randSubset) + if err := bootstrapConnect(ctx, peerstore, route, randSubset); err != nil { + return err } return nil } -func connect(ctx context.Context, ps peer.Peerstore, r *dht.IpfsDHT, peers []peer.PeerInfo) error { +func bootstrapConnect(ctx context.Context, + ps peer.Peerstore, + route *dht.IpfsDHT, + peers []peer.PeerInfo) error { if len(peers) < 1 { - return errors.New("bootstrap set empty") + return ErrNotEnoughBootstrapPeers } + errs := make(chan error, len(peers)) var wg sync.WaitGroup for _, p := range peers { // performed asynchronously because when performed synchronously, if // one `Connect` call hangs, subsequent calls are more likely to // fail/abort due to an expiring context. + // Also, performed asynchronously for dial speed. wg.Add(1) go func(p peer.PeerInfo) { defer wg.Done() - log.Event(ctx, "bootstrapDial", r.LocalPeer(), p.ID) - log.Debugf("%s bootstrapping to %s", r.LocalPeer(), p.ID) + defer log.EventBegin(ctx, "bootstrapDial", route.LocalPeer(), p.ID).Done() + log.Debugf("%s bootstrapping to %s", route.LocalPeer(), p.ID) ps.AddAddresses(p.ID, p.Addrs) - err := r.Connect(ctx, p.ID) + err := route.Connect(ctx, p.ID) if err != nil { - log.Event(ctx, "bootstrapFailed", p.ID) - log.Criticalf("failed to bootstrap with %v: %s", p.ID, err) + log.Event(ctx, "bootstrapDialFailed", p.ID) + log.Errorf("failed to bootstrap with %v: %s", p.ID, err) + errs <- err return } - log.Event(ctx, "bootstrapSuccess", p.ID) + log.Event(ctx, "bootstrapDialSuccess", p.ID) log.Infof("bootstrapped with %v", p.ID) }(p) } wg.Wait() + + // our failure condition is when no connection attempt succeeded. + // So drain the errs channel, counting the results. + close(errs) + count := 0 + var err error + for err = range errs { + if err != nil { + count++ + } + } + if count == len(peers) { + return fmt.Errorf("failed to bootstrap. %s", err) + } return nil } -func toPeer(bootstrap config.BootstrapPeer) (p peer.PeerInfo, err error) { +func toPeerInfos(bpeers []config.BootstrapPeer) ([]peer.PeerInfo, error) { + var peers []peer.PeerInfo + for _, bootstrap := range bpeers { + p, err := toPeerInfo(bootstrap) + if err != nil { + return nil, err + } + peers = append(peers, p) + } + return peers, nil +} + +func toPeerInfo(bootstrap config.BootstrapPeer) (p peer.PeerInfo, err error) { id, err := peer.IDB58Decode(bootstrap.PeerID) if err != nil { return diff --git a/core/commands/diag.go b/core/commands/diag.go index ca42858710c..98790f67400 100644 --- a/core/commands/diag.go +++ b/core/commands/diag.go @@ -36,6 +36,8 @@ type DiagnosticOutput struct { Peers []DiagnosticPeer } +var DefaultDiagnosticTimeout = time.Second * 20 + var DiagCmd = &cmds.Command{ Helptext: cmds.HelpText{ Tagline: "Generates diagnostic reports", @@ -57,6 +59,7 @@ connected peers and latencies between them. }, Options: []cmds.Option{ + cmds.StringOption("timeout", "diagnostic timeout duration"), cmds.StringOption("vis", "output vis. one of: "+strings.Join(visFmts, ", ")), }, @@ -75,7 +78,20 @@ connected peers and latencies between them. return nil, err } - info, err := n.Diagnostics.GetDiagnostic(time.Second * 20) + timeoutS, _, err := req.Option("timeout").String() + if err != nil { + return nil, err + } + timeout := DefaultDiagnosticTimeout + if timeoutS != "" { + t, err := time.ParseDuration(timeoutS) + if err != nil { + return nil, cmds.ClientError("error parsing timeout") + } + timeout = t + } + + info, err := n.Diagnostics.GetDiagnostic(timeout) if err != nil { return nil, err } diff --git a/core/commands/swarm.go b/core/commands/swarm.go index 6fa3c8e93d4..4fc53022d25 100644 --- a/core/commands/swarm.go +++ b/core/commands/swarm.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "path" + "sort" cmds "github.com/jbenet/go-ipfs/commands" peer "github.com/jbenet/go-ipfs/p2p/peer" @@ -64,6 +65,7 @@ ipfs swarm peers lists the set of peers this node is connected to. addrs[i] = fmt.Sprintf("%s/%s", addr, pid.Pretty()) } + sort.Sort(sort.StringSlice(addrs)) return &stringList{addrs}, nil }, Marshalers: cmds.MarshalerMap{ diff --git a/core/core.go b/core/core.go index dd922b7afd8..a2839f610dd 100644 --- a/core/core.go +++ b/core/core.go @@ -11,33 +11,36 @@ import ( datastore "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" + eventlog "github.com/jbenet/go-ipfs/thirdparty/eventlog" + debugerror "github.com/jbenet/go-ipfs/util/debugerror" + + diag "github.com/jbenet/go-ipfs/diagnostics" + ic "github.com/jbenet/go-ipfs/p2p/crypto" + p2phost "github.com/jbenet/go-ipfs/p2p/host" + p2pbhost "github.com/jbenet/go-ipfs/p2p/host/basic" + swarm "github.com/jbenet/go-ipfs/p2p/net/swarm" + addrutil "github.com/jbenet/go-ipfs/p2p/net/swarm/addr" + peer "github.com/jbenet/go-ipfs/p2p/peer" + + routing "github.com/jbenet/go-ipfs/routing" + dht "github.com/jbenet/go-ipfs/routing/dht" + offroute "github.com/jbenet/go-ipfs/routing/offline" + bstore "github.com/jbenet/go-ipfs/blocks/blockstore" bserv "github.com/jbenet/go-ipfs/blockservice" - diag "github.com/jbenet/go-ipfs/diagnostics" exchange "github.com/jbenet/go-ipfs/exchange" bitswap "github.com/jbenet/go-ipfs/exchange/bitswap" bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network" offline "github.com/jbenet/go-ipfs/exchange/offline" rp "github.com/jbenet/go-ipfs/exchange/reprovide" + mount "github.com/jbenet/go-ipfs/fuse/mount" merkledag "github.com/jbenet/go-ipfs/merkledag" namesys "github.com/jbenet/go-ipfs/namesys" - ic "github.com/jbenet/go-ipfs/p2p/crypto" - p2phost "github.com/jbenet/go-ipfs/p2p/host" - p2pbhost "github.com/jbenet/go-ipfs/p2p/host/basic" - swarm "github.com/jbenet/go-ipfs/p2p/net/swarm" - addrutil "github.com/jbenet/go-ipfs/p2p/net/swarm/addr" - peer "github.com/jbenet/go-ipfs/p2p/peer" path "github.com/jbenet/go-ipfs/path" pin "github.com/jbenet/go-ipfs/pin" repo "github.com/jbenet/go-ipfs/repo" config "github.com/jbenet/go-ipfs/repo/config" - routing "github.com/jbenet/go-ipfs/routing" - dht "github.com/jbenet/go-ipfs/routing/dht" - offroute "github.com/jbenet/go-ipfs/routing/offline" - eventlog "github.com/jbenet/go-ipfs/thirdparty/eventlog" - debugerror "github.com/jbenet/go-ipfs/util/debugerror" - lgbl "github.com/jbenet/go-ipfs/util/eventlog/loggables" ) const IpnsValidatorTag = "ipns" @@ -75,13 +78,14 @@ type IpfsNode struct { Resolver *path.Resolver // the path resolution system // Online - PrivateKey ic.PrivKey // the local node's private Key - PeerHost p2phost.Host // the network host (server+client) - Routing routing.IpfsRouting // the routing system. recommend ipfs-dht - Exchange exchange.Interface // the block exchange + strategy (bitswap) - Namesys namesys.NameSystem // the name system, resolves paths to hashes - Diagnostics *diag.Diagnostics // the diagnostics service - Reprovider *rp.Reprovider // the value reprovider system + PrivateKey ic.PrivKey // the local node's private Key + PeerHost p2phost.Host // the network host (server+client) + Bootstrapper io.Closer // the periodic bootstrapper + Routing routing.IpfsRouting // the routing system. recommend ipfs-dht + Exchange exchange.Interface // the block exchange + strategy (bitswap) + Namesys namesys.NameSystem // the name system, resolves paths to hashes + Diagnostics *diag.Diagnostics // the diagnostics service + Reprovider *rp.Reprovider // the value reprovider system ctxgroup.ContextGroup @@ -235,29 +239,10 @@ func (n *IpfsNode) StartOnlineServices(ctx context.Context) error { // TODO implement an offline namesys that serves only local names. n.Namesys = namesys.NewNameSystem(n.Routing) - // TODO consider moving connection supervision into the Network. We've - // discussed improvements to this Node constructor. One improvement - // would be to make the node configurable, allowing clients to inject - // an Exchange, Network, or Routing component and have the constructor - // manage the wiring. In that scenario, this dangling function is a bit - // awkward. - var bootstrapPeers []peer.PeerInfo - for _, bootstrap := range n.Repo.Config().Bootstrap { - p, err := toPeer(bootstrap) - if err != nil { - log.Event(ctx, "bootstrapError", n.Identity, lgbl.Error(err)) - log.Errorf("%s bootstrap error: %s", n.Identity, err) - return err - } - bootstrapPeers = append(bootstrapPeers, p) - } - - go superviseConnections(ctx, n.PeerHost, dhtRouting, n.Peerstore, bootstrapPeers) - n.Reprovider = rp.NewReprovider(n.Routing, n.Blockstore) go n.Reprovider.ProvideEvery(ctx, kReprovideFrequency) - return nil + return n.Bootstrap(DefaultBootstrapConfig) } // teardown closes owned children. If any errors occur, this function returns @@ -266,20 +251,20 @@ func (n *IpfsNode) teardown() error { // owned objects are closed in this teardown to ensure that they're closed // regardless of which constructor was used to add them to the node. var closers []io.Closer - if n.Repo != nil { - closers = append(closers, n.Repo) - } - if n.Blocks != nil { - closers = append(closers, n.Blocks) - } - if n.Routing != nil { - if dht, ok := n.Routing.(*dht.IpfsDHT); ok { - closers = append(closers, dht) + addCloser := func(c io.Closer) { + if c != nil { + closers = append(closers, c) } } - if n.PeerHost != nil { - closers = append(closers, n.PeerHost) + + addCloser(n.Bootstrapper) + addCloser(n.Repo) + addCloser(n.Blocks) + if dht, ok := n.Routing.(*dht.IpfsDHT); ok { + addCloser(dht) } + addCloser(n.PeerHost) + var errs []error for _, closer := range closers { if err := closer.Close(); err != nil { @@ -305,17 +290,34 @@ func (n *IpfsNode) Resolve(path string) (*merkledag.Node, error) { return n.Resolver.ResolvePath(path) } -// Bootstrap is undefined when node is not in OnlineMode -func (n *IpfsNode) Bootstrap(ctx context.Context, peers []peer.PeerInfo) error { +func (n *IpfsNode) Bootstrap(cfg BootstrapConfig) error { // TODO what should return value be when in offlineMode? + if n.Routing == nil { + return nil + } - if n.Routing != nil { - if dht, ok := n.Routing.(*dht.IpfsDHT); ok { - return bootstrap(ctx, n.PeerHost, dht, n.Peerstore, peers) + if n.Bootstrapper != nil { + n.Bootstrapper.Close() // stop previous bootstrap process. + } + + // if the caller did not specify a bootstrap peer function, get the + // freshest bootstrap peers from config. this responds to live changes. + if cfg.BootstrapPeers == nil { + cfg.BootstrapPeers = func() []peer.PeerInfo { + bpeers := n.Repo.Config().Bootstrap + ps, err := toPeerInfos(bpeers) + if err != nil { + log.Error("failed to parse bootstrap peers from config: %s", bpeers) + return nil + } + return ps } } - return nil + + var err error + n.Bootstrapper, err = Bootstrap(n, cfg) + return err } func (n *IpfsNode) loadID() error { diff --git a/diagnostics/diag.go b/diagnostics/diag.go index a14f7d239f0..dbdf2be8613 100644 --- a/diagnostics/diag.go +++ b/diagnostics/diag.go @@ -4,10 +4,9 @@ package diagnostics import ( - "bytes" "encoding/json" "errors" - "io" + "fmt" "sync" "time" @@ -16,6 +15,7 @@ import ( "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" ggio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/io" "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" + ctxutil "github.com/jbenet/go-ipfs/util/ctx" host "github.com/jbenet/go-ipfs/p2p/host" inet "github.com/jbenet/go-ipfs/p2p/net" @@ -31,7 +31,10 @@ var log = util.Logger("diagnostics") // ProtocolDiag is the diagnostics protocol.ID var ProtocolDiag protocol.ID = "/ipfs/diagnostics" +var ErrAlreadyRunning = errors.New("diagnostic with that ID already running") + const ResponseTimeout = time.Second * 10 +const HopTimeoutDecrement = time.Second * 2 // Diagnostics is a net service that manages requesting and responding to diagnostic // requests @@ -148,80 +151,69 @@ func (d *Diagnostics) GetDiagnostic(timeout time.Duration) ([]*DiagInfo, error) peers := d.getPeers() log.Debugf("Sending diagnostic request to %d peers.", len(peers)) - var out []*DiagInfo - di := d.getDiagInfo() - out = append(out, di) - pmes := newMessage(diagID) - respdata := make(chan []byte) - sends := 0 - for p, _ := range peers { - log.Debugf("Sending getDiagnostic to: %s", p) - sends++ - go func(p peer.ID) { - data, err := d.getDiagnosticFromPeer(ctx, p, pmes) - if err != nil { - log.Errorf("GetDiagnostic error: %v", err) - respdata <- nil - return - } - respdata <- data - }(p) + pmes.SetTimeoutDuration(timeout - HopTimeoutDecrement) // decrease timeout per hop + dpeers, err := d.getDiagnosticFromPeers(ctx, d.getPeers(), pmes) + if err != nil { + return nil, fmt.Errorf("diagnostic from peers err: %s", err) } - for i := 0; i < sends; i++ { - data := <-respdata - if data == nil { - continue - } - out = appendDiagnostics(data, out) + di := d.getDiagInfo() + out := []*DiagInfo{di} + for dpi := range dpeers { + out = append(out, dpi) } return out, nil } -func appendDiagnostics(data []byte, cur []*DiagInfo) []*DiagInfo { - buf := bytes.NewBuffer(data) - dec := json.NewDecoder(buf) - for { - di := new(DiagInfo) - err := dec.Decode(di) - if err != nil { - if err != io.EOF { - log.Errorf("error decoding DiagInfo: %v", err) - } - break - } - cur = append(cur, di) - } - return cur -} - -// TODO: this method no longer needed. -func (d *Diagnostics) getDiagnosticFromPeer(ctx context.Context, p peer.ID, mes *pb.Message) ([]byte, error) { - rpmes, err := d.sendRequest(ctx, p, mes) +func decodeDiagJson(data []byte) (*DiagInfo, error) { + di := new(DiagInfo) + err := json.Unmarshal(data, di) if err != nil { return nil, err } - return rpmes.GetData(), nil -} -func newMessage(diagID string) *pb.Message { - pmes := new(pb.Message) - pmes.DiagID = proto.String(diagID) - return pmes + return di, nil } -func (d *Diagnostics) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) { +func (d *Diagnostics) getDiagnosticFromPeers(ctx context.Context, peers map[peer.ID]int, pmes *pb.Message) (<-chan *DiagInfo, error) { + respdata := make(chan *DiagInfo) + wg := sync.WaitGroup{} + for p, _ := range peers { + wg.Add(1) + log.Debugf("Sending diagnostic request to peer: %s", p) + go func(p peer.ID) { + defer wg.Done() + out, err := d.getDiagnosticFromPeer(ctx, p, pmes) + if err != nil { + log.Errorf("Error getting diagnostic from %s: %s", p, err) + return + } + for d := range out { + respdata <- d + } + }(p) + } + + go func() { + wg.Wait() + close(respdata) + }() + return respdata, nil +} + +func (d *Diagnostics) getDiagnosticFromPeer(ctx context.Context, p peer.ID, pmes *pb.Message) (<-chan *DiagInfo, error) { s, err := d.host.NewStream(ProtocolDiag, p) if err != nil { return nil, err } - defer s.Close() - r := ggio.NewDelimitedReader(s, inet.MessageSizeMax) - w := ggio.NewDelimitedWriter(s) + cr := ctxutil.NewReader(ctx, s) // ok to use. we defer close stream in this func + cw := ctxutil.NewWriter(ctx, s) // ok to use. we defer close stream in this func + r := ggio.NewDelimitedReader(cr, inet.MessageSizeMax) + w := ggio.NewDelimitedWriter(cw) start := time.Now() @@ -229,72 +221,57 @@ func (d *Diagnostics) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Messa return nil, err } - rpmes := new(pb.Message) - if err := r.ReadMsg(rpmes); err != nil { - return nil, err - } - if rpmes == nil { - return nil, errors.New("no response to request") - } + out := make(chan *DiagInfo) + go func() { - rtt := time.Since(start) - log.Infof("diagnostic request took: %s", rtt.String()) - return rpmes, nil -} + defer func() { + close(out) + s.Close() + rtt := time.Since(start) + log.Infof("diagnostic request took: %s", rtt.String()) + }() -func (d *Diagnostics) handleDiagnostic(p peer.ID, pmes *pb.Message) (*pb.Message, error) { - log.Debugf("HandleDiagnostic from %s for id = %s", p, util.Key(pmes.GetDiagID()).B58String()) - resp := newMessage(pmes.GetDiagID()) - - // Make sure we havent already handled this request to prevent loops - d.diagLock.Lock() - _, found := d.diagMap[pmes.GetDiagID()] - if found { - d.diagLock.Unlock() - return resp, nil - } - d.diagMap[pmes.GetDiagID()] = time.Now() - d.diagLock.Unlock() - - buf := new(bytes.Buffer) - di := d.getDiagInfo() - buf.Write(di.Marshal()) - - ctx, _ := context.WithTimeout(context.TODO(), ResponseTimeout) + for { + rpmes := new(pb.Message) + if err := r.ReadMsg(rpmes); err != nil { + log.Errorf("Error reading diagnostic from stream: %s", err) + return + } + if rpmes == nil { + log.Error("Got no response back from diag request.") + return + } - respdata := make(chan []byte) - sendcount := 0 - for p, _ := range d.getPeers() { - log.Debugf("Sending diagnostic request to peer: %s", p) - sendcount++ - go func(p peer.ID) { - out, err := d.getDiagnosticFromPeer(ctx, p, pmes) + di, err := decodeDiagJson(rpmes.GetData()) if err != nil { - log.Errorf("getDiagnostic error: %v", err) - respdata <- nil + log.Error(err) return } - respdata <- out - }(p) - } - for i := 0; i < sendcount; i++ { - out := <-respdata - _, err := buf.Write(out) - if err != nil { - log.Errorf("getDiagnostic write output error: %v", err) - continue + select { + case out <- di: + case <-ctx.Done(): + return + } } - } - resp.Data = buf.Bytes() - return resp, nil + }() + + return out, nil +} + +func newMessage(diagID string) *pb.Message { + pmes := new(pb.Message) + pmes.DiagID = proto.String(diagID) + return pmes } func (d *Diagnostics) HandleMessage(ctx context.Context, s inet.Stream) error { - r := ggio.NewDelimitedReader(s, 32768) // maxsize - w := ggio.NewDelimitedWriter(s) + cr := ctxutil.NewReader(ctx, s) + cw := ctxutil.NewWriter(ctx, s) + r := ggio.NewDelimitedReader(cr, inet.MessageSizeMax) // maxsize + w := ggio.NewDelimitedWriter(cw) // deserialize msg pmes := new(pb.Message) @@ -307,28 +284,54 @@ func (d *Diagnostics) HandleMessage(ctx context.Context, s inet.Stream) error { log.Infof("[peer: %s] Got message from [%s]\n", d.self.Pretty(), s.Conn().RemotePeer()) - // dispatch handler. - p := s.Conn().RemotePeer() - rpmes, err := d.handleDiagnostic(p, pmes) - if err != nil { - log.Errorf("handleDiagnostic error: %s", err) + // Make sure we havent already handled this request to prevent loops + if err := d.startDiag(pmes.GetDiagID()); err != nil { return nil } - // if nil response, return it before serializing - if rpmes == nil { - return nil + resp := newMessage(pmes.GetDiagID()) + resp.Data = d.getDiagInfo().Marshal() + if err := w.WriteMsg(resp); err != nil { + log.Errorf("Failed to write protobuf message over stream: %s", err) + return err } - // serialize + send response msg - if err := w.WriteMsg(rpmes); err != nil { - log.Errorf("Failed to encode protobuf message: %v", err) - return nil + timeout := pmes.GetTimeoutDuration() + if timeout < HopTimeoutDecrement { + return fmt.Errorf("timeout too short: %s", timeout) + } + ctx, _ = context.WithTimeout(ctx, timeout) + pmes.SetTimeoutDuration(timeout - HopTimeoutDecrement) + + dpeers, err := d.getDiagnosticFromPeers(ctx, d.getPeers(), pmes) + if err != nil { + log.Errorf("diagnostic from peers err: %s", err) + return err + } + for b := range dpeers { + resp := newMessage(pmes.GetDiagID()) + resp.Data = b.Marshal() + if err := w.WriteMsg(resp); err != nil { + log.Errorf("Failed to write protobuf message over stream: %s", err) + return err + } } return nil } +func (d *Diagnostics) startDiag(id string) error { + d.diagLock.Lock() + _, found := d.diagMap[id] + if found { + d.diagLock.Unlock() + return ErrAlreadyRunning + } + d.diagMap[id] = time.Now() + d.diagLock.Unlock() + return nil +} + func (d *Diagnostics) handleNewStream(s inet.Stream) { d.HandleMessage(context.Background(), s) s.Close() diff --git a/diagnostics/internal/pb/diagnostics.pb.go b/diagnostics/internal/pb/diagnostics.pb.go index 4aa721711eb..0da512c3ec2 100644 --- a/diagnostics/internal/pb/diagnostics.pb.go +++ b/diagnostics/internal/pb/diagnostics.pb.go @@ -14,15 +14,18 @@ It has these top-level messages: package diagnostics_pb import proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/proto" +import json "encoding/json" import math "math" -// Reference imports to suppress errors if they are not otherwise used. +// Reference proto, json, and math imports to suppress error if they are not otherwise used. var _ = proto.Marshal +var _ = &json.SyntaxError{} var _ = math.Inf type Message struct { DiagID *string `protobuf:"bytes,1,req" json:"DiagID,omitempty"` Data []byte `protobuf:"bytes,2,opt" json:"Data,omitempty"` + Timeout *int64 `protobuf:"varint,3,opt" json:"Timeout,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -44,5 +47,12 @@ func (m *Message) GetData() []byte { return nil } +func (m *Message) GetTimeout() int64 { + if m != nil && m.Timeout != nil { + return *m.Timeout + } + return 0 +} + func init() { } diff --git a/diagnostics/internal/pb/diagnostics.proto b/diagnostics/internal/pb/diagnostics.proto index 3ffe2f523ce..2202f7c246b 100644 --- a/diagnostics/internal/pb/diagnostics.proto +++ b/diagnostics/internal/pb/diagnostics.proto @@ -3,4 +3,5 @@ package diagnostics.pb; message Message { required string DiagID = 1; optional bytes Data = 2; + optional int64 Timeout = 3; // in nanoseconds } diff --git a/diagnostics/internal/pb/timeout.go b/diagnostics/internal/pb/timeout.go new file mode 100644 index 00000000000..f2043c0e7c3 --- /dev/null +++ b/diagnostics/internal/pb/timeout.go @@ -0,0 +1,14 @@ +package diagnostics_pb + +import ( + "time" +) + +func (m *Message) GetTimeoutDuration() time.Duration { + return time.Duration(m.GetTimeout()) +} + +func (m *Message) SetTimeoutDuration(t time.Duration) { + it := int64(t) + m.Timeout = &it +} diff --git a/exchange/reprovide/reprovide.go b/exchange/reprovide/reprovide.go index 4895e8ccbf1..1534b4ec4e5 100644 --- a/exchange/reprovide/reprovide.go +++ b/exchange/reprovide/reprovide.go @@ -30,7 +30,10 @@ func NewReprovider(rsys routing.IpfsRouting, bstore blocks.Blockstore) *Reprovid } func (rp *Reprovider) ProvideEvery(ctx context.Context, tick time.Duration) { - after := time.After(0) + // dont reprovide immediately. + // may have just started the daemon and shutting it down immediately. + // probability( up another minute | uptime ) increases with uptime. + after := time.After(time.Minute) for { select { case <-ctx.Done(): diff --git a/p2p/net/conn/dial.go b/p2p/net/conn/dial.go index 33f6e07b282..adf08711050 100644 --- a/p2p/net/conn/dial.go +++ b/p2p/net/conn/dial.go @@ -147,6 +147,11 @@ func reuseErrShouldRetry(err error) bool { return false // hey, it worked! no need to retry. } + // if it's a network timeout error, it's a legitimate failure. + if nerr, ok := err.(net.Error); ok && nerr.Timeout() { + return true + } + errno, ok := err.(syscall.Errno) if !ok { // not an errno? who knows what this is. retry. return true diff --git a/p2p/protocol/identify/id.go b/p2p/protocol/identify/id.go index 3a0012287e9..c1c799b66ae 100644 --- a/p2p/protocol/identify/id.go +++ b/p2p/protocol/identify/id.go @@ -118,8 +118,8 @@ func (ids *IDService) ResponseHandler(s inet.Stream) { r := ggio.NewDelimitedReader(s, 2048) mes := pb.Identify{} if err := r.ReadMsg(&mes); err != nil { - log.Errorf("%s error receiving message from %s %s", ID, - c.RemotePeer(), c.RemoteMultiaddr()) + log.Errorf("%s error receiving message from %s %s %s", ID, + c.RemotePeer(), c.RemoteMultiaddr(), err) return } ids.consumeMessage(&mes, c) diff --git a/p2p/protocol/mux.go b/p2p/protocol/mux.go index 91f28413f66..e2d121f961d 100644 --- a/p2p/protocol/mux.go +++ b/p2p/protocol/mux.go @@ -115,7 +115,7 @@ func (m *Mux) HandleSync(s inet.Stream) { return } - log.Infof("muxer handle protocol: %s", name) + log.Infof("muxer handle protocol %s: %s", s.Conn().RemotePeer(), name) handler(s) } diff --git a/pin/indirect.go b/pin/indirect.go index 9e67bc2c9ff..09decbb2549 100644 --- a/pin/indirect.go +++ b/pin/indirect.go @@ -32,7 +32,7 @@ func loadIndirPin(d ds.Datastore, k ds.Key) (*indirectPin, error) { keys = append(keys, k) refcnt[k] = v } - log.Debugf("indirPin keys: %#v", keys) + // log.Debugf("indirPin keys: %#v", keys) return &indirectPin{blockset: set.SimpleSetFromKeys(keys), refCounts: refcnt}, nil } diff --git a/routing/dht/dht.go b/routing/dht/dht.go index 923c8c69b71..0fd5177a2ce 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -370,66 +370,3 @@ func (dht *IpfsDHT) PingRoutine(t time.Duration) { } } } - -// Bootstrap builds up list of peers by requesting random peer IDs -func (dht *IpfsDHT) Bootstrap(ctx context.Context, queries int) error { - var merr u.MultiErr - - randomID := func() peer.ID { - // 16 random bytes is not a valid peer id. it may be fine becuase - // the dht will rehash to its own keyspace anyway. - id := make([]byte, 16) - rand.Read(id) - return peer.ID(id) - } - - // bootstrap sequentially, as results will compound - runQuery := func(ctx context.Context, id peer.ID) { - p, err := dht.FindPeer(ctx, id) - if err == routing.ErrNotFound { - // this isn't an error. this is precisely what we expect. - } else if err != nil { - merr = append(merr, err) - } else { - // woah, actually found a peer with that ID? this shouldn't happen normally - // (as the ID we use is not a real ID). this is an odd error worth logging. - err := fmt.Errorf("Bootstrap peer error: Actually FOUND peer. (%s, %s)", id, p) - log.Errorf("%s", err) - merr = append(merr, err) - } - } - - sequential := true - if sequential { - // these should be parallel normally. but can make them sequential for debugging. - // note that the core/bootstrap context deadline should be extended too for that. - for i := 0; i < queries; i++ { - id := randomID() - log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, queries, id) - runQuery(ctx, id) - } - - } else { - // note on parallelism here: the context is passed in to the queries, so they - // **should** exit when it exceeds, making this function exit on ctx cancel. - // normally, we should be selecting on ctx.Done() here too, but this gets - // complicated to do with WaitGroup, and doesnt wait for the children to exit. - var wg sync.WaitGroup - for i := 0; i < queries; i++ { - wg.Add(1) - go func() { - defer wg.Done() - - id := randomID() - log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, queries, id) - runQuery(ctx, id) - }() - } - wg.Wait() - } - - if len(merr) > 0 { - return merr - } - return nil -} diff --git a/routing/dht/dht_bootstrap.go b/routing/dht/dht_bootstrap.go new file mode 100644 index 00000000000..c91df05e5f1 --- /dev/null +++ b/routing/dht/dht_bootstrap.go @@ -0,0 +1,158 @@ +// Package dht implements a distributed hash table that satisfies the ipfs routing +// interface. This DHT is modeled after kademlia with Coral and S/Kademlia modifications. +package dht + +import ( + "crypto/rand" + "fmt" + "sync" + "time" + + peer "github.com/jbenet/go-ipfs/p2p/peer" + routing "github.com/jbenet/go-ipfs/routing" + u "github.com/jbenet/go-ipfs/util" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + goprocess "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" + periodicproc "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic" +) + +// BootstrapConfig specifies parameters used bootstrapping the DHT. +// +// Note there is a tradeoff between the bootstrap period and the +// number of queries. We could support a higher period with less +// queries. +type BootstrapConfig struct { + Queries int // how many queries to run per period + Period time.Duration // how often to run periodi cbootstrap. + Timeout time.Duration // how long to wait for a bootstrao query to run +} + +var DefaultBootstrapConfig = BootstrapConfig{ + // For now, this is set to 1 query. + // We are currently more interested in ensuring we have a properly formed + // DHT than making sure our dht minimizes traffic. Once we are more certain + // of our implementation's robustness, we should lower this down to 8 or 4. + Queries: 1, + + // For now, this is set to 10 seconds, which is an aggressive period. We are + // We are currently more interested in ensuring we have a properly formed + // DHT than making sure our dht minimizes traffic. Once we are more certain + // implementation's robustness, we should lower this down to 30s or 1m. + Period: time.Duration(20 * time.Second), + + Timeout: time.Duration(20 * time.Second), +} + +// Bootstrap ensures the dht routing table remains healthy as peers come and go. +// it builds up a list of peers by requesting random peer IDs. The Bootstrap +// process will run a number of queries each time, and run every time signal fires. +// These parameters are configurable. +// +// Bootstrap returns a process, so the user can stop it. +func (dht *IpfsDHT) Bootstrap(config BootstrapConfig) (goprocess.Process, error) { + sig := time.Tick(config.Period) + return dht.BootstrapOnSignal(config, sig) +} + +// SignalBootstrap ensures the dht routing table remains healthy as peers come and go. +// it builds up a list of peers by requesting random peer IDs. The Bootstrap +// process will run a number of queries each time, and run every time signal fires. +// These parameters are configurable. +// +// SignalBootstrap returns a process, so the user can stop it. +func (dht *IpfsDHT) BootstrapOnSignal(cfg BootstrapConfig, signal <-chan time.Time) (goprocess.Process, error) { + if cfg.Queries <= 0 { + return nil, fmt.Errorf("invalid number of queries: %d", cfg.Queries) + } + + if signal == nil { + return nil, fmt.Errorf("invalid signal: %v", signal) + } + + proc := periodicproc.Ticker(signal, func(worker goprocess.Process) { + // it would be useful to be able to send out signals of when we bootstrap, too... + // maybe this is a good case for whole module event pub/sub? + + ctx := dht.Context() + if err := dht.runBootstrap(ctx, cfg); err != nil { + log.Error(err) + // A bootstrapping error is important to notice but not fatal. + } + }) + + return proc, nil +} + +// runBootstrap builds up list of peers by requesting random peer IDs +func (dht *IpfsDHT) runBootstrap(ctx context.Context, cfg BootstrapConfig) error { + bslog := func(msg string) { + log.Debugf("DHT %s dhtRunBootstrap %s -- routing table size: %d", dht.self, msg, dht.routingTable.Size()) + } + bslog("start") + defer bslog("end") + defer log.EventBegin(ctx, "dhtRunBootstrap").Done() + + var merr u.MultiErr + + randomID := func() peer.ID { + // 16 random bytes is not a valid peer id. it may be fine becuase + // the dht will rehash to its own keyspace anyway. + id := make([]byte, 16) + rand.Read(id) + id = u.Hash(id) + return peer.ID(id) + } + + // bootstrap sequentially, as results will compound + ctx, cancel := context.WithTimeout(ctx, cfg.Timeout) + defer cancel() + runQuery := func(ctx context.Context, id peer.ID) { + p, err := dht.FindPeer(ctx, id) + if err == routing.ErrNotFound { + // this isn't an error. this is precisely what we expect. + } else if err != nil { + merr = append(merr, err) + } else { + // woah, actually found a peer with that ID? this shouldn't happen normally + // (as the ID we use is not a real ID). this is an odd error worth logging. + err := fmt.Errorf("Bootstrap peer error: Actually FOUND peer. (%s, %s)", id, p) + log.Errorf("%s", err) + merr = append(merr, err) + } + } + + sequential := true + if sequential { + // these should be parallel normally. but can make them sequential for debugging. + // note that the core/bootstrap context deadline should be extended too for that. + for i := 0; i < cfg.Queries; i++ { + id := randomID() + log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, cfg.Queries, id) + runQuery(ctx, id) + } + + } else { + // note on parallelism here: the context is passed in to the queries, so they + // **should** exit when it exceeds, making this function exit on ctx cancel. + // normally, we should be selecting on ctx.Done() here too, but this gets + // complicated to do with WaitGroup, and doesnt wait for the children to exit. + var wg sync.WaitGroup + for i := 0; i < cfg.Queries; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + id := randomID() + log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, cfg.Queries, id) + runQuery(ctx, id) + }() + } + wg.Wait() + } + + if len(merr) > 0 { + return merr + } + return nil +} diff --git a/routing/dht/dht_test.go b/routing/dht/dht_test.go index 07211f5fea1..b7b9faf71d3 100644 --- a/routing/dht/dht_test.go +++ b/routing/dht/dht_test.go @@ -75,25 +75,22 @@ func connect(t *testing.T, ctx context.Context, a, b *IpfsDHT) { func bootstrap(t *testing.T, ctx context.Context, dhts []*IpfsDHT) { ctx, cancel := context.WithCancel(ctx) + log.Debugf("bootstrapping dhts...") - rounds := 1 + // tried async. sequential fares much better. compare: + // 100 async https://gist.github.com/jbenet/56d12f0578d5f34810b2 + // 100 sync https://gist.github.com/jbenet/6c59e7c15426e48aaedd + // probably because results compound - for i := 0; i < rounds; i++ { - log.Debugf("bootstrapping round %d/%d\n", i, rounds) + var cfg BootstrapConfig + cfg = DefaultBootstrapConfig + cfg.Queries = 3 - // tried async. sequential fares much better. compare: - // 100 async https://gist.github.com/jbenet/56d12f0578d5f34810b2 - // 100 sync https://gist.github.com/jbenet/6c59e7c15426e48aaedd - // probably because results compound - - start := rand.Intn(len(dhts)) // randomize to decrease bias. - for i := range dhts { - dht := dhts[(start+i)%len(dhts)] - log.Debugf("bootstrapping round %d/%d -- %s\n", i, rounds, dht.self) - dht.Bootstrap(ctx, 3) - } + start := rand.Intn(len(dhts)) // randomize to decrease bias. + for i := range dhts { + dht := dhts[(start+i)%len(dhts)] + dht.runBootstrap(ctx, cfg) } - cancel() } @@ -235,6 +232,53 @@ func TestProvides(t *testing.T) { } } +// if minPeers or avgPeers is 0, dont test for it. +func waitForWellFormedTables(t *testing.T, dhts []*IpfsDHT, minPeers, avgPeers int, timeout time.Duration) bool { + // test "well-formed-ness" (>= minPeers peers in every routing table) + + checkTables := func() bool { + totalPeers := 0 + for _, dht := range dhts { + rtlen := dht.routingTable.Size() + totalPeers += rtlen + if minPeers > 0 && rtlen < minPeers { + t.Logf("routing table for %s only has %d peers (should have >%d)", dht.self, rtlen, minPeers) + return false + } + } + actualAvgPeers := totalPeers / len(dhts) + t.Logf("avg rt size: %d", actualAvgPeers) + if avgPeers > 0 && actualAvgPeers < avgPeers { + t.Logf("avg rt size: %d < %d", actualAvgPeers, avgPeers) + return false + } + return true + } + + timeoutA := time.After(timeout) + for { + select { + case <-timeoutA: + log.Error("did not reach well-formed routing tables by %s", timeout) + return false // failed + case <-time.After(5 * time.Millisecond): + if checkTables() { + return true // succeeded + } + } + } +} + +func printRoutingTables(dhts []*IpfsDHT) { + // the routing tables should be full now. let's inspect them. + fmt.Println("checking routing table of %d", len(dhts)) + for _, dht := range dhts { + fmt.Printf("checking routing table of %s\n", dht.self) + dht.routingTable.Print() + fmt.Println("") + } +} + func TestBootstrap(t *testing.T) { // t.Skip("skipping test to debug another") if testing.Short() { @@ -258,38 +302,109 @@ func TestBootstrap(t *testing.T) { } <-time.After(100 * time.Millisecond) - t.Logf("bootstrapping them so they find each other", nDHTs) - ctxT, _ := context.WithTimeout(ctx, 5*time.Second) - bootstrap(t, ctxT, dhts) + // bootstrap a few times until we get good tables. + stop := make(chan struct{}) + go func() { + for { + t.Logf("bootstrapping them so they find each other", nDHTs) + ctxT, _ := context.WithTimeout(ctx, 5*time.Second) + bootstrap(t, ctxT, dhts) + + select { + case <-time.After(50 * time.Millisecond): + continue // being explicit + case <-stop: + return + } + } + }() + + waitForWellFormedTables(t, dhts, 7, 10, 5*time.Second) + close(stop) if u.Debug { // the routing tables should be full now. let's inspect them. - <-time.After(5 * time.Second) - t.Logf("checking routing table of %d", nDHTs) - for _, dht := range dhts { - fmt.Printf("checking routing table of %s\n", dht.self) - dht.routingTable.Print() - fmt.Println("") + printRoutingTables(dhts) + } +} + +func TestPeriodicBootstrap(t *testing.T) { + // t.Skip("skipping test to debug another") + if testing.Short() { + t.SkipNow() + } + + ctx := context.Background() + + nDHTs := 30 + _, _, dhts := setupDHTS(ctx, nDHTs, t) + defer func() { + for i := 0; i < nDHTs; i++ { + dhts[i].Close() + defer dhts[i].host.Close() + } + }() + + // signal amplifier + amplify := func(signal chan time.Time, other []chan time.Time) { + for t := range signal { + for _, s := range other { + s <- t + } + } + for _, s := range other { + close(s) } } - // test "well-formed-ness" (>= 3 peers in every routing table) - avgsize := 0 + signal := make(chan time.Time) + allSignals := []chan time.Time{} + + var cfg BootstrapConfig + cfg = DefaultBootstrapConfig + cfg.Queries = 5 + + // kick off periodic bootstrappers with instrumented signals. + for _, dht := range dhts { + s := make(chan time.Time) + allSignals = append(allSignals, s) + dht.BootstrapOnSignal(cfg, s) + } + go amplify(signal, allSignals) + + t.Logf("dhts are not connected.", nDHTs) + for _, dht := range dhts { + rtlen := dht.routingTable.Size() + if rtlen > 0 { + t.Errorf("routing table for %s should have 0 peers. has %d", dht.self, rtlen) + } + } + + for i := 0; i < nDHTs; i++ { + connect(t, ctx, dhts[i], dhts[(i+1)%len(dhts)]) + } + + t.Logf("dhts are now connected to 1-2 others.", nDHTs) for _, dht := range dhts { rtlen := dht.routingTable.Size() - avgsize += rtlen - t.Logf("routing table for %s has %d peers", dht.self, rtlen) - if rtlen < 4 { - // currently, we dont have good bootstrapping guarantees. - // t.Errorf("routing table for %s only has %d peers", dht.self, rtlen) + if rtlen > 2 { + t.Errorf("routing table for %s should have at most 2 peers. has %d", dht.self, rtlen) } } - avgsize = avgsize / len(dhts) - avgsizeExpected := 6 - t.Logf("avg rt size: %d", avgsize) - if avgsize < avgsizeExpected { - t.Errorf("avg rt size: %d < %d", avgsize, avgsizeExpected) + if u.Debug { + printRoutingTables(dhts) + } + + t.Logf("bootstrapping them so they find each other", nDHTs) + signal <- time.Now() + + // this is async, and we dont know when it's finished with one cycle, so keep checking + // until the routing tables look better, or some long timeout for the failure case. + waitForWellFormedTables(t, dhts, 7, 10, 5*time.Second) + + if u.Debug { + printRoutingTables(dhts) } } @@ -319,7 +434,6 @@ func TestProvidesMany(t *testing.T) { if u.Debug { // the routing tables should be full now. let's inspect them. - <-time.After(5 * time.Second) t.Logf("checking routing table of %d", nDHTs) for _, dht := range dhts { fmt.Printf("checking routing table of %s\n", dht.self) diff --git a/routing/dht/ext_test.go b/routing/dht/ext_test.go index e151af5d246..ab756b5e4d6 100644 --- a/routing/dht/ext_test.go +++ b/routing/dht/ext_test.go @@ -49,6 +49,10 @@ func TestGetFailures(t *testing.T) { // u.POut("Timout Test\n") ctx1, _ := context.WithTimeout(context.Background(), 200*time.Millisecond) if _, err := d.GetValue(ctx1, u.Key("test")); err != nil { + if merr, ok := err.(u.MultiErr); ok && len(merr) > 0 { + err = merr[0] + } + if err != context.DeadlineExceeded { t.Fatal("Got different error than we expected", err) } @@ -86,6 +90,9 @@ func TestGetFailures(t *testing.T) { ctx2, _ := context.WithTimeout(context.Background(), 20*time.Second) _, err = d.GetValue(ctx2, u.Key("test")) if err != nil { + if merr, ok := err.(u.MultiErr); ok && len(merr) > 0 { + err = merr[0] + } if err != routing.ErrNotFound { t.Fatalf("Expected ErrNotFound, got: %s", err) } @@ -202,6 +209,9 @@ func TestNotFound(t *testing.T) { v, err := d.GetValue(ctx, u.Key("hello")) log.Debugf("get value got %v", v) if err != nil { + if merr, ok := err.(u.MultiErr); ok && len(merr) > 0 { + err = merr[0] + } switch err { case routing.ErrNotFound: //Success! diff --git a/routing/dht/query.go b/routing/dht/query.go index 53c232323c7..687d2621fd6 100644 --- a/routing/dht/query.go +++ b/routing/dht/query.go @@ -62,7 +62,7 @@ type dhtQueryRunner struct { peersRemaining todoctr.Counter // peersToQuery + currently processing result *dhtQueryResult // query result - errs []error // result errors. maybe should be a map[peer.ID]error + errs u.MultiErr // result errors. maybe should be a map[peer.ID]error rateLimit chan struct{} // processing semaphore log eventlog.EventLogger @@ -122,8 +122,12 @@ func (r *dhtQueryRunner) Run(peers []peer.ID) (*dhtQueryResult, error) { r.RLock() defer r.RUnlock() - if len(r.errs) > 0 { - err = r.errs[0] // take the first? + err = routing.ErrNotFound + + // if every query to every peer failed, something must be very wrong. + if len(r.errs) > 0 && len(r.errs) == r.peersSeen.Size() { + log.Debugf("query errs: %s", r.errs) + err = r.errs[0] } case <-r.cg.Closed(): diff --git a/routing/dht/routing.go b/routing/dht/routing.go index 07d9ddc431f..2054e03fd0c 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -88,9 +88,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) { // get closest peers in the routing table rtp := dht.routingTable.ListPeers() log.Debugf("peers in rt: %s", len(rtp), rtp) - - closest := dht.routingTable.NearestPeers(kb.ConvertKey(key), PoolSize) - if closest == nil || len(closest) == 0 { + if len(rtp) == 0 { log.Warning("No peers from routing table!") return nil, errors.Wrap(kb.ErrLookupFailure) } @@ -111,7 +109,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) { }) // run it! - result, err := query.Run(ctx, closest) + result, err := query.Run(ctx, rtp) if err != nil { return nil, err } @@ -170,7 +168,7 @@ func (dht *IpfsDHT) FindProviders(ctx context.Context, key u.Key) ([]peer.PeerIn // to the given key func (dht *IpfsDHT) getClosestPeers(ctx context.Context, key u.Key) (<-chan peer.ID, error) { e := log.EventBegin(ctx, "getClosestPeers", &key) - tablepeers := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue) + tablepeers := dht.routingTable.ListPeers() if len(tablepeers) == 0 { return nil, errors.Wrap(kb.ErrLookupFailure) } @@ -313,7 +311,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co return &dhtQueryResult{closerPeers: clpeers}, nil }) - peers := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue) + peers := dht.routingTable.ListPeers() _, err := query.Run(ctx, peers) if err != nil { log.Errorf("Query error: %s", err) @@ -329,13 +327,13 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.PeerInfo, er return pi, nil } - closest := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue) - if closest == nil || len(closest) == 0 { + peers := dht.routingTable.ListPeers() + if len(peers) == 0 { return peer.PeerInfo{}, errors.Wrap(kb.ErrLookupFailure) } // Sanity... - for _, p := range closest { + for _, p := range peers { if p == id { log.Error("Found target peer in list of closest peers...") return dht.peerstore.PeerInfo(p), nil @@ -367,7 +365,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.PeerInfo, er }) // run it! - result, err := query.Run(ctx, closest) + result, err := query.Run(ctx, peers) if err != nil { return peer.PeerInfo{}, err } @@ -386,8 +384,8 @@ func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (< peerchan := make(chan peer.PeerInfo, asyncQueryBuffer) peersSeen := peer.Set{} - closest := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue) - if closest == nil || len(closest) == 0 { + peers := dht.routingTable.ListPeers() + if len(peers) == 0 { return nil, errors.Wrap(kb.ErrLookupFailure) } @@ -432,7 +430,7 @@ func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (< // run it! run it asynchronously to gen peers as results are found. // this does no error checking go func() { - if _, err := query.Run(ctx, closest); err != nil { + if _, err := query.Run(ctx, peers); err != nil { log.Error(err) } diff --git a/test/epictest/addcat_test.go b/test/epictest/addcat_test.go index 047c312e941..9c5486ed62b 100644 --- a/test/epictest/addcat_test.go +++ b/test/epictest/addcat_test.go @@ -115,8 +115,15 @@ func DirectAddCat(data []byte, conf testutil.LatencyConfig) error { } defer catter.Close() - catter.Bootstrap(ctx, []peer.PeerInfo{adder.Peerstore.PeerInfo(adder.Identity)}) - adder.Bootstrap(ctx, []peer.PeerInfo{catter.Peerstore.PeerInfo(catter.Identity)}) + bs1 := []peer.PeerInfo{adder.Peerstore.PeerInfo(adder.Identity)} + bs2 := []peer.PeerInfo{catter.Peerstore.PeerInfo(catter.Identity)} + + if err := catter.Bootstrap(core.BootstrapConfigWithPeers(bs1)); err != nil { + return err + } + if err := adder.Bootstrap(core.BootstrapConfigWithPeers(bs2)); err != nil { + return err + } keyAdded, err := coreunix.Add(adder, bytes.NewReader(data)) if err != nil { diff --git a/test/epictest/three_legged_cat_test.go b/test/epictest/three_legged_cat_test.go index c86403ff70b..ff3f036a372 100644 --- a/test/epictest/three_legged_cat_test.go +++ b/test/epictest/three_legged_cat_test.go @@ -62,9 +62,15 @@ func RunThreeLeggedCat(data []byte, conf testutil.LatencyConfig) error { return err } defer bootstrap.Close() - boostrapInfo := bootstrap.Peerstore.PeerInfo(bootstrap.PeerHost.ID()) - adder.Bootstrap(ctx, []peer.PeerInfo{boostrapInfo}) - catter.Bootstrap(ctx, []peer.PeerInfo{boostrapInfo}) + + bis := bootstrap.Peerstore.PeerInfo(bootstrap.PeerHost.ID()) + bcfg := core.BootstrapConfigWithPeers([]peer.PeerInfo{bis}) + if err := adder.Bootstrap(bcfg); err != nil { + return err + } + if err := catter.Bootstrap(bcfg); err != nil { + return err + } keyAdded, err := coreunix.Add(adder, bytes.NewReader(data)) if err != nil {