Skip to content

Commit

Permalink
swarm: integrate new dialsync code into swarm
Browse files Browse the repository at this point in the history
  • Loading branch information
whyrusleeping committed Sep 8, 2016
1 parent e26950f commit 126e150
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 116 deletions.
1 change: 0 additions & 1 deletion p2p/net/swarm/dial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,6 @@ func TestDialBackoff(t *testing.T) {
if !s1.backf.Backoff(s3p) {
t.Error("s3 should be on backoff")
}

}
}

Expand Down
2 changes: 1 addition & 1 deletion p2p/net/swarm/limiter.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package swarm

import (
"context"
"sync"

peer "github.com/ipfs/go-libp2p-peer"
ma "github.com/jbenet/go-multiaddr"
context "golang.org/x/net/context"

conn "github.com/libp2p/go-libp2p/p2p/net/conn"
addrutil "github.com/libp2p/go-libp2p/p2p/net/swarm/addr"
Expand Down
2 changes: 1 addition & 1 deletion p2p/net/swarm/limiter_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package swarm

import (
"context"
"fmt"
"math/rand"
"strconv"
Expand All @@ -10,7 +11,6 @@ import (
peer "github.com/ipfs/go-libp2p-peer"
ma "github.com/jbenet/go-multiaddr"
mafmt "github.com/whyrusleeping/mafmt"
context "golang.org/x/net/context"

conn "github.com/libp2p/go-libp2p/p2p/net/conn"
)
Expand Down
5 changes: 3 additions & 2 deletions p2p/net/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package swarm

import (
"context"
"fmt"
"io/ioutil"
"os"
Expand Down Expand Up @@ -32,7 +33,6 @@ import (
yamux "github.com/whyrusleeping/go-smux-yamux"
mafilter "github.com/whyrusleeping/multiaddr-filter"
ws "github.com/whyrusleeping/ws-transport"
context "golang.org/x/net/context"
)

var log = logging.Logger("swarm2")
Expand Down Expand Up @@ -76,7 +76,7 @@ type Swarm struct {
peers pstore.Peerstore
connh ConnHandler

dsync dialsync
dsync *DialSync
backf dialbackoff
dialT time.Duration // mainly for tests

Expand Down Expand Up @@ -134,6 +134,7 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr,
dialer: conn.NewDialer(local, peers.PrivKey(local), wrap),
}

s.dsync = NewDialSync(s.doDial)
s.limiter = newDialLimiter(s.dialAddr)

// configure Swarm
Expand Down
135 changes: 24 additions & 111 deletions p2p/net/swarm/swarm_dial.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package swarm

import (
"context"
"errors"
"fmt"
"sync"
Expand All @@ -11,7 +12,6 @@ import (
ma "github.com/jbenet/go-multiaddr"
conn "github.com/libp2p/go-libp2p/p2p/net/conn"
addrutil "github.com/libp2p/go-libp2p/p2p/net/swarm/addr"
context "golang.org/x/net/context"
)

// Diagram of dial sync:
Expand Down Expand Up @@ -53,78 +53,6 @@ const defaultPerPeerRateLimit = 8
// subcomponent of Dial)
var DialTimeout = time.Second * 10

// dialsync is a small object that helps manage ongoing dials.
// this way, if we receive many simultaneous dial requests, one
// can do its thing, while the rest wait.
//
// this interface is so would-be dialers can just:
//
// for {
// c := findConnectionToPeer(peer)
// if c != nil {
// return c
// }
//
// // ok, no connections. should we dial?
// if ok, wait := dialsync.Lock(peer); !ok {
// <-wait // can optionally wait
// continue
// }
// defer dialsync.Unlock(peer)
//
// c := actuallyDial(peer)
// return c
// }
//
type dialsync struct {
// ongoing is a map of tickets for the current peers being dialed.
// this way, we dont kick off N dials simultaneously.
ongoing map[peer.ID]chan struct{}
lock sync.Mutex
}

// Lock governs the beginning of a dial attempt.
// If there are no ongoing dials, it returns true, and the client is now
// scheduled to dial. Every other goroutine that calls startDial -- with
//the same dst -- will block until client is done. The client MUST call
// ds.Unlock(p) when it is done, to unblock the other callers.
// The client is not reponsible for achieving a successful dial, only for
// reporting the end of the attempt (calling ds.Unlock(p)).
//
// see the example below `dialsync`
func (ds *dialsync) Lock(dst peer.ID) (bool, chan struct{}) {
ds.lock.Lock()
if ds.ongoing == nil { // init if not ready
ds.ongoing = make(map[peer.ID]chan struct{})
}
wait, found := ds.ongoing[dst]
if !found {
ds.ongoing[dst] = make(chan struct{})
}
ds.lock.Unlock()

if found {
return false, wait
}

// ok! you're signed up to dial!
return true, nil
}

// Unlock releases waiters to a dial attempt. see Lock.
// if Unlock(p) is called without calling Lock(p) first, Unlock panics.
func (ds *dialsync) Unlock(dst peer.ID) {
ds.lock.Lock()
wait, found := ds.ongoing[dst]
if !found {
panic("called dialDone with no ongoing dials to peer: " + dst.Pretty())
}

delete(ds.ongoing, dst) // remove ongoing dial
close(wait) // release everyone else
ds.lock.Unlock()
}

// dialbackoff is a struct used to avoid over-dialing the same, dead peers.
// Whenever we totally time out on a peer (all three attempts), we add them
// to dialbackoff. Then, whenevers goroutines would _wait_ (dialsync), they
Expand Down Expand Up @@ -246,8 +174,7 @@ func (s *Swarm) bestConnectionToPeer(p peer.ID) *Conn {
// gatedDialAttempt is an attempt to dial a node. It is gated by the swarm's
// dial synchronization systems: dialsync and dialbackoff.
func (s *Swarm) gatedDialAttempt(ctx context.Context, p peer.ID) (*Conn, error) {
var logdial = lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil)
defer log.EventBegin(ctx, "swarmDialAttemptSync", logdial).Done()
defer log.EventBegin(ctx, "swarmDialAttemptSync", p).Done()

// check if we already have an open connection first
conn := s.bestConnectionToPeer(p)
Expand All @@ -257,48 +184,34 @@ func (s *Swarm) gatedDialAttempt(ctx context.Context, p peer.ID) (*Conn, error)

// if this peer has been backed off, lets get out of here
if s.backf.Backoff(p) {
log.Event(ctx, "swarmDialBackoff", logdial)
log.Event(ctx, "swarmDialBackoff", p)
return nil, ErrDialBackoff
}

// check if there's an ongoing dial to this peer
if ok, wait := s.dsync.Lock(p); ok {
defer s.dsync.Unlock(p)

// ok, we have been charged to dial! let's do it.
// if it succeeds, dial will add the conn to the swarm itself.
defer log.EventBegin(ctx, "swarmDialAttemptStart", logdial).Done()
ctxT, cancel := context.WithTimeout(ctx, s.dialT)
conn, err := s.dial(ctxT, p)
cancel()
log.Debugf("dial end %s", conn)
if err != nil {
log.Event(ctx, "swarmDialBackoffAdd", logdial)
s.backf.AddBackoff(p) // let others know to backoff

// ok, we failed. try again. (if loop is done, our error is output)
return nil, fmt.Errorf("dial attempt failed: %s", err)
}
log.Event(ctx, "swarmDialBackoffClear", logdial)
s.backf.Clear(p) // okay, no longer need to backoff
return conn, nil
return s.dsync.DialLock(ctx, p)
}

} else {
// we did not dial. we must wait for someone else to dial.
defer log.EventBegin(ctx, "swarmDialWait", logdial).Done()
select {
case <-wait: // wait for that other dial to finish.
// doDial is an ugly shim method to retain all the logging and backoff logic
// of the old dialsync code
func (s *Swarm) doDial(ctx context.Context, p peer.ID) (*Conn, error) {
var logdial = lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil)
// ok, we have been charged to dial! let's do it.
// if it succeeds, dial will add the conn to the swarm itself.
defer log.EventBegin(ctx, "swarmDialAttemptStart", logdial).Done()
ctxT, cancel := context.WithTimeout(ctx, s.dialT)
conn, err := s.dial(ctxT, p)
cancel()
log.Debugf("dial end %s", conn)
if err != nil {
log.Event(ctx, "swarmDialBackoffAdd", logdial)
s.backf.AddBackoff(p) // let others know to backoff

// see if it worked, OR we got an incoming dial in the meantime...
conn := s.bestConnectionToPeer(p)
if conn != nil {
return conn, nil
}
return nil, ErrDialFailed
case <-ctx.Done(): // or we may have to bail...
return nil, ctx.Err()
}
// ok, we failed. try again. (if loop is done, our error is output)
return nil, fmt.Errorf("dial attempt failed: %s", err)
}
log.Event(ctx, "swarmDialBackoffClear", logdial)
s.backf.Clear(p) // okay, no longer need to backoff
return conn, nil
}

// dial is the actual swarm's dial logic, gated by Dial.
Expand Down

0 comments on commit 126e150

Please sign in to comment.