Skip to content

Commit

Permalink
Merge pull request #72 from libp2p/misc/cleanup-limiter
Browse files Browse the repository at this point in the history
limiter: cleanup the code
  • Loading branch information
Stebalien authored Jun 11, 2018
2 parents 1c4ee32 + af53412 commit 8d05824
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 52 deletions.
107 changes: 57 additions & 50 deletions p2p/net/swarm/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@ func (dj *dialJob) cancelled() bool {
}

type dialLimiter struct {
rllock sync.Mutex
lk sync.Mutex

fdConsuming int
fdLimit int
waitingOnFd []*dialJob

dialFunc func(context.Context, peer.ID, ma.Multiaddr) (transport.Conn, error)
dialFunc dialfunc

activePerPeer map[peer.ID]int
perPeerLimit int
Expand All @@ -62,26 +63,26 @@ func newDialLimiterWithParams(df dialfunc, fdLimit, perPeerLimit int) *dialLimit
}
}

func (dl *dialLimiter) finishedDial(dj *dialJob) {
dl.rllock.Lock()
defer dl.rllock.Unlock()

if addrutil.IsFDCostlyTransport(dj.addr) {
dl.fdConsuming--

if len(dl.waitingOnFd) > 0 {
next := dl.waitingOnFd[0]
dl.waitingOnFd[0] = nil // clear out memory
dl.waitingOnFd = dl.waitingOnFd[1:]
if len(dl.waitingOnFd) == 0 {
dl.waitingOnFd = nil // clear out memory
}
dl.fdConsuming++

go dl.executeDial(next)
// freeFDToken frees FD token and if there are any schedules another waiting dialJob
// in it's place
func (dl *dialLimiter) freeFDToken() {
dl.fdConsuming--

if len(dl.waitingOnFd) > 0 {
next := dl.waitingOnFd[0]
dl.waitingOnFd[0] = nil // clear out memory
dl.waitingOnFd = dl.waitingOnFd[1:]
if len(dl.waitingOnFd) == 0 {
dl.waitingOnFd = nil // clear out memory
}
dl.fdConsuming++

// we already have activePerPeer token at this point so we can just dial
go dl.executeDial(next)
}
}

func (dl *dialLimiter) freePeerToken(dj *dialJob) {
// release tokens in reverse order than we take them
dl.activePerPeer[dj.peer]--
if dl.activePerPeer[dj.peer] == 0 {
Expand All @@ -91,45 +92,31 @@ func (dl *dialLimiter) finishedDial(dj *dialJob) {
waitlist := dl.waitingOnPeerLimit[dj.peer]
if !dj.success && len(waitlist) > 0 {
next := waitlist[0]

if len(waitlist) == 1 {
delete(dl.waitingOnPeerLimit, dj.peer)
delete(dl.waitingOnPeerLimit, next.peer)
} else {
waitlist[0] = nil // clear out memory
dl.waitingOnPeerLimit[dj.peer] = waitlist[1:]
dl.waitingOnPeerLimit[next.peer] = waitlist[1:]
}
dl.activePerPeer[dj.peer]++ // just kidding, we still want this token

if addrutil.IsFDCostlyTransport(next.addr) {
if dl.fdConsuming >= dl.fdLimit {
dl.waitingOnFd = append(dl.waitingOnFd, next)
return
}

// take token
dl.fdConsuming++
}
dl.activePerPeer[next.peer]++ // just kidding, we still want this token

// can kick this off right here, dials in this list already
// have the other tokens needed
go dl.executeDial(next)
dl.addCheckFdLimit(next)
}
}

// AddDialJob tries to take the needed tokens for starting the given dial job.
// If it acquires all needed tokens, it immediately starts the dial, otherwise
// it will put it on the waitlist for the requested token.
func (dl *dialLimiter) AddDialJob(dj *dialJob) {
dl.rllock.Lock()
defer dl.rllock.Unlock()
func (dl *dialLimiter) finishedDial(dj *dialJob) {
dl.lk.Lock()
defer dl.lk.Unlock()

if dl.activePerPeer[dj.peer] >= dl.perPeerLimit {
wlist := dl.waitingOnPeerLimit[dj.peer]
dl.waitingOnPeerLimit[dj.peer] = append(wlist, dj)
return
if addrutil.IsFDCostlyTransport(dj.addr) {
dl.freeFDToken()
}
dl.activePerPeer[dj.peer]++

dl.freePeerToken(dj)
}

func (dl *dialLimiter) addCheckFdLimit(dj *dialJob) {
if addrutil.IsFDCostlyTransport(dj.addr) {
if dl.fdConsuming >= dl.fdLimit {
dl.waitingOnFd = append(dl.waitingOnFd, dj)
Expand All @@ -140,15 +127,35 @@ func (dl *dialLimiter) AddDialJob(dj *dialJob) {
dl.fdConsuming++
}

// take second needed token and start dial!
go dl.executeDial(dj)
}

func (dl *dialLimiter) addCheckPeerLimit(dj *dialJob) {
if dl.activePerPeer[dj.peer] >= dl.perPeerLimit {
wlist := dl.waitingOnPeerLimit[dj.peer]
dl.waitingOnPeerLimit[dj.peer] = append(wlist, dj)
return
}
dl.activePerPeer[dj.peer]++

dl.addCheckFdLimit(dj)
}

// AddDialJob tries to take the needed tokens for starting the given dial job.
// If it acquires all needed tokens, it immediately starts the dial, otherwise
// it will put it on the waitlist for the requested token.
func (dl *dialLimiter) AddDialJob(dj *dialJob) {
dl.lk.Lock()
defer dl.lk.Unlock()

dl.addCheckPeerLimit(dj)
}

func (dl *dialLimiter) clearAllPeerDials(p peer.ID) {
dl.rllock.Lock()
defer dl.rllock.Unlock()
dl.lk.Lock()
defer dl.lk.Unlock()
delete(dl.waitingOnPeerLimit, p)
// NB: the waitingOnFd list doesnt need to be cleaned out here, we will
// NB: the waitingOnFd list doesn't need to be cleaned out here, we will
// remove them as we encounter them because they are 'cancelled' at this
// point
}
Expand Down
4 changes: 2 additions & 2 deletions p2p/net/swarm/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,9 +378,9 @@ func TestFDLimitUnderflow(t *testing.T) {

time.Sleep(time.Second * 3)

l.rllock.Lock()
l.lk.Lock()
fdConsuming := l.fdConsuming
l.rllock.Unlock()
l.lk.Unlock()

if fdConsuming < 0 {
t.Fatalf("l.fdConsuming < 0")
Expand Down

0 comments on commit 8d05824

Please sign in to comment.