From 126e150674e8cac04f1ba126bc961147aaf276ea Mon Sep 17 00:00:00 2001 From: Jeromy Date: Thu, 8 Sep 2016 09:59:28 -0700 Subject: [PATCH] swarm: integrate new dialsync code into swarm --- p2p/net/swarm/dial_test.go | 1 - p2p/net/swarm/limiter.go | 2 +- p2p/net/swarm/limiter_test.go | 2 +- p2p/net/swarm/swarm.go | 5 +- p2p/net/swarm/swarm_dial.go | 135 ++++++---------------------------- 5 files changed, 29 insertions(+), 116 deletions(-) diff --git a/p2p/net/swarm/dial_test.go b/p2p/net/swarm/dial_test.go index cfdb996c28..aeeb6ae42d 100644 --- a/p2p/net/swarm/dial_test.go +++ b/p2p/net/swarm/dial_test.go @@ -415,7 +415,6 @@ func TestDialBackoff(t *testing.T) { if !s1.backf.Backoff(s3p) { t.Error("s3 should be on backoff") } - } } diff --git a/p2p/net/swarm/limiter.go b/p2p/net/swarm/limiter.go index 4bbfb11862..c34954a527 100644 --- a/p2p/net/swarm/limiter.go +++ b/p2p/net/swarm/limiter.go @@ -1,11 +1,11 @@ package swarm import ( + "context" "sync" peer "github.com/ipfs/go-libp2p-peer" ma "github.com/jbenet/go-multiaddr" - context "golang.org/x/net/context" conn "github.com/libp2p/go-libp2p/p2p/net/conn" addrutil "github.com/libp2p/go-libp2p/p2p/net/swarm/addr" diff --git a/p2p/net/swarm/limiter_test.go b/p2p/net/swarm/limiter_test.go index f5fc18746f..93761a5048 100644 --- a/p2p/net/swarm/limiter_test.go +++ b/p2p/net/swarm/limiter_test.go @@ -1,6 +1,7 @@ package swarm import ( + "context" "fmt" "math/rand" "strconv" @@ -10,7 +11,6 @@ import ( peer "github.com/ipfs/go-libp2p-peer" ma "github.com/jbenet/go-multiaddr" mafmt "github.com/whyrusleeping/mafmt" - context "golang.org/x/net/context" conn "github.com/libp2p/go-libp2p/p2p/net/conn" ) diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index abab8e8a43..e36f763fb9 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -3,6 +3,7 @@ package swarm import ( + "context" "fmt" "io/ioutil" "os" @@ -32,7 +33,6 @@ import ( yamux "github.com/whyrusleeping/go-smux-yamux" mafilter "github.com/whyrusleeping/multiaddr-filter" ws "github.com/whyrusleeping/ws-transport" - context "golang.org/x/net/context" ) var log = logging.Logger("swarm2") @@ -76,7 +76,7 @@ type Swarm struct { peers pstore.Peerstore connh ConnHandler - dsync dialsync + dsync *DialSync backf dialbackoff dialT time.Duration // mainly for tests @@ -134,6 +134,7 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr, dialer: conn.NewDialer(local, peers.PrivKey(local), wrap), } + s.dsync = NewDialSync(s.doDial) s.limiter = newDialLimiter(s.dialAddr) // configure Swarm diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index eaf8d1f63a..573651778b 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -1,6 +1,7 @@ package swarm import ( + "context" "errors" "fmt" "sync" @@ -11,7 +12,6 @@ import ( ma "github.com/jbenet/go-multiaddr" conn "github.com/libp2p/go-libp2p/p2p/net/conn" addrutil "github.com/libp2p/go-libp2p/p2p/net/swarm/addr" - context "golang.org/x/net/context" ) // Diagram of dial sync: @@ -53,78 +53,6 @@ const defaultPerPeerRateLimit = 8 // subcomponent of Dial) var DialTimeout = time.Second * 10 -// dialsync is a small object that helps manage ongoing dials. -// this way, if we receive many simultaneous dial requests, one -// can do its thing, while the rest wait. -// -// this interface is so would-be dialers can just: -// -// for { -// c := findConnectionToPeer(peer) -// if c != nil { -// return c -// } -// -// // ok, no connections. should we dial? -// if ok, wait := dialsync.Lock(peer); !ok { -// <-wait // can optionally wait -// continue -// } -// defer dialsync.Unlock(peer) -// -// c := actuallyDial(peer) -// return c -// } -// -type dialsync struct { - // ongoing is a map of tickets for the current peers being dialed. - // this way, we dont kick off N dials simultaneously. - ongoing map[peer.ID]chan struct{} - lock sync.Mutex -} - -// Lock governs the beginning of a dial attempt. -// If there are no ongoing dials, it returns true, and the client is now -// scheduled to dial. Every other goroutine that calls startDial -- with -//the same dst -- will block until client is done. The client MUST call -// ds.Unlock(p) when it is done, to unblock the other callers. -// The client is not reponsible for achieving a successful dial, only for -// reporting the end of the attempt (calling ds.Unlock(p)). -// -// see the example below `dialsync` -func (ds *dialsync) Lock(dst peer.ID) (bool, chan struct{}) { - ds.lock.Lock() - if ds.ongoing == nil { // init if not ready - ds.ongoing = make(map[peer.ID]chan struct{}) - } - wait, found := ds.ongoing[dst] - if !found { - ds.ongoing[dst] = make(chan struct{}) - } - ds.lock.Unlock() - - if found { - return false, wait - } - - // ok! you're signed up to dial! - return true, nil -} - -// Unlock releases waiters to a dial attempt. see Lock. -// if Unlock(p) is called without calling Lock(p) first, Unlock panics. -func (ds *dialsync) Unlock(dst peer.ID) { - ds.lock.Lock() - wait, found := ds.ongoing[dst] - if !found { - panic("called dialDone with no ongoing dials to peer: " + dst.Pretty()) - } - - delete(ds.ongoing, dst) // remove ongoing dial - close(wait) // release everyone else - ds.lock.Unlock() -} - // dialbackoff is a struct used to avoid over-dialing the same, dead peers. // Whenever we totally time out on a peer (all three attempts), we add them // to dialbackoff. Then, whenevers goroutines would _wait_ (dialsync), they @@ -246,8 +174,7 @@ func (s *Swarm) bestConnectionToPeer(p peer.ID) *Conn { // gatedDialAttempt is an attempt to dial a node. It is gated by the swarm's // dial synchronization systems: dialsync and dialbackoff. func (s *Swarm) gatedDialAttempt(ctx context.Context, p peer.ID) (*Conn, error) { - var logdial = lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil) - defer log.EventBegin(ctx, "swarmDialAttemptSync", logdial).Done() + defer log.EventBegin(ctx, "swarmDialAttemptSync", p).Done() // check if we already have an open connection first conn := s.bestConnectionToPeer(p) @@ -257,48 +184,34 @@ func (s *Swarm) gatedDialAttempt(ctx context.Context, p peer.ID) (*Conn, error) // if this peer has been backed off, lets get out of here if s.backf.Backoff(p) { - log.Event(ctx, "swarmDialBackoff", logdial) + log.Event(ctx, "swarmDialBackoff", p) return nil, ErrDialBackoff } - // check if there's an ongoing dial to this peer - if ok, wait := s.dsync.Lock(p); ok { - defer s.dsync.Unlock(p) - - // ok, we have been charged to dial! let's do it. - // if it succeeds, dial will add the conn to the swarm itself. - defer log.EventBegin(ctx, "swarmDialAttemptStart", logdial).Done() - ctxT, cancel := context.WithTimeout(ctx, s.dialT) - conn, err := s.dial(ctxT, p) - cancel() - log.Debugf("dial end %s", conn) - if err != nil { - log.Event(ctx, "swarmDialBackoffAdd", logdial) - s.backf.AddBackoff(p) // let others know to backoff - - // ok, we failed. try again. (if loop is done, our error is output) - return nil, fmt.Errorf("dial attempt failed: %s", err) - } - log.Event(ctx, "swarmDialBackoffClear", logdial) - s.backf.Clear(p) // okay, no longer need to backoff - return conn, nil + return s.dsync.DialLock(ctx, p) +} - } else { - // we did not dial. we must wait for someone else to dial. - defer log.EventBegin(ctx, "swarmDialWait", logdial).Done() - select { - case <-wait: // wait for that other dial to finish. +// doDial is an ugly shim method to retain all the logging and backoff logic +// of the old dialsync code +func (s *Swarm) doDial(ctx context.Context, p peer.ID) (*Conn, error) { + var logdial = lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil) + // ok, we have been charged to dial! let's do it. + // if it succeeds, dial will add the conn to the swarm itself. + defer log.EventBegin(ctx, "swarmDialAttemptStart", logdial).Done() + ctxT, cancel := context.WithTimeout(ctx, s.dialT) + conn, err := s.dial(ctxT, p) + cancel() + log.Debugf("dial end %s", conn) + if err != nil { + log.Event(ctx, "swarmDialBackoffAdd", logdial) + s.backf.AddBackoff(p) // let others know to backoff - // see if it worked, OR we got an incoming dial in the meantime... - conn := s.bestConnectionToPeer(p) - if conn != nil { - return conn, nil - } - return nil, ErrDialFailed - case <-ctx.Done(): // or we may have to bail... - return nil, ctx.Err() - } + // ok, we failed. try again. (if loop is done, our error is output) + return nil, fmt.Errorf("dial attempt failed: %s", err) } + log.Event(ctx, "swarmDialBackoffClear", logdial) + s.backf.Clear(p) // okay, no longer need to backoff + return conn, nil } // dial is the actual swarm's dial logic, gated by Dial.