Skip to content

Commit

Permalink
feat(connecteventmanager): block Connected() until accepted
Browse files Browse the repository at this point in the history
Ref: #432

Minimal attempt at solving #432
  • Loading branch information
rvagg committed Aug 16, 2023
1 parent b96767c commit 73c4112
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ The following emojis are used to highlight certain changes:

- Removed mentions of unused ARC algorithm ([#336](https://github.com/ipfs/boxo/issues/366#issuecomment-1597253540))
- Handle `_redirects` file when `If-None-Match` header is present ([#412](https://github.com/ipfs/boxo/pull/412))
- Address a Bitswap findpeers / connect race condition that can prevent peer communication ([#435](https://github.com/ipfs/boxo/issues/435))

### Security

Expand Down
78 changes: 62 additions & 16 deletions bitswap/network/connecteventmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,38 @@ type connectEventManager struct {

type peerState struct {
newState, curState state
pending bool
accepted chan struct{}
}

func newPeerState() *peerState {
return &peerState{accepted: make(chan struct{})}

Check warning on line 39 in bitswap/network/connecteventmanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/network/connecteventmanager.go#L38-L39

Added lines #L38 - L39 were not covered by tests
}

func (p *peerState) isPending() bool {
select {
case <-p.accepted:
return false
default:

Check warning on line 46 in bitswap/network/connecteventmanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/network/connecteventmanager.go#L42-L46

Added lines #L42 - L46 were not covered by tests
}
return true

Check warning on line 48 in bitswap/network/connecteventmanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/network/connecteventmanager.go#L48

Added line #L48 was not covered by tests
}

func (p *peerState) accept() {
select {
case <-p.accepted:
default:
close(p.accepted)

Check warning on line 55 in bitswap/network/connecteventmanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/network/connecteventmanager.go#L51-L55

Added lines #L51 - L55 were not covered by tests
}
}

func (p *peerState) setPending() {
if !p.isPending() {
p.accepted = make(chan struct{})
}

Check warning on line 62 in bitswap/network/connecteventmanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/network/connecteventmanager.go#L59-L62

Added lines #L59 - L62 were not covered by tests
}

func (p *peerState) waitAccept() {
<-p.accepted

Check warning on line 66 in bitswap/network/connecteventmanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/network/connecteventmanager.go#L65-L66

Added lines #L65 - L66 were not covered by tests
}

func newConnectEventManager(connListeners ...ConnectionListener) *connectEventManager {
Expand Down Expand Up @@ -61,22 +92,33 @@ func (c *connectEventManager) Stop() {
func (c *connectEventManager) getState(p peer.ID) state {
if state, ok := c.peers[p]; ok {
return state.newState
} else {
return stateDisconnected
}
return stateDisconnected

Check warning on line 96 in bitswap/network/connecteventmanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/network/connecteventmanager.go#L96

Added line #L96 was not covered by tests
}

func (c *connectEventManager) setState(p peer.ID, newState state) {
state, ok := c.peers[p]
if !ok {
state = new(peerState)
func (c *connectEventManager) setState(p peer.ID, newState state) bool {
state, isExisting := c.peers[p]
if !isExisting {
state = newPeerState()

Check warning on line 102 in bitswap/network/connecteventmanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/network/connecteventmanager.go#L99-L102

Added lines #L99 - L102 were not covered by tests
c.peers[p] = state
}
state.newState = newState
if !state.pending && state.newState != state.curState {
state.pending = true
c.changeQueue = append(c.changeQueue, p)
c.cond.Broadcast()
if state.newState != state.curState {
if !isExisting || !state.isPending() {
if isExisting {
state.setPending()
}
c.changeQueue = append(c.changeQueue, p)
c.cond.Broadcast()

Check warning on line 112 in bitswap/network/connecteventmanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/network/connecteventmanager.go#L106-L112

Added lines #L106 - L112 were not covered by tests
}
return true

Check warning on line 114 in bitswap/network/connecteventmanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/network/connecteventmanager.go#L114

Added line #L114 was not covered by tests
}
return false

Check warning on line 116 in bitswap/network/connecteventmanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/network/connecteventmanager.go#L116

Added line #L116 was not covered by tests
}

func (c *connectEventManager) waitStateAccept(p peer.ID) {
if state, ok := c.peers[p]; ok {
state.waitAccept()

Check warning on line 121 in bitswap/network/connecteventmanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/network/connecteventmanager.go#L119-L121

Added lines #L119 - L121 were not covered by tests
}
}

Expand Down Expand Up @@ -109,11 +151,9 @@ func (c *connectEventManager) worker() {
continue
}

// Record the fact that this "state" is no longer in the queue.
state.pending = false

// Then, if there's nothing to do, continue.
if state.curState == state.newState {
state.accept()

Check warning on line 156 in bitswap/network/connecteventmanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/network/connecteventmanager.go#L156

Added line #L156 was not covered by tests
continue
}

Expand All @@ -124,6 +164,7 @@ func (c *connectEventManager) worker() {
switch state.newState {
case stateDisconnected:
delete(c.peers, pid)
state.accept()

Check warning on line 167 in bitswap/network/connecteventmanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/network/connecteventmanager.go#L167

Added line #L167 was not covered by tests
fallthrough
case stateUnresponsive:
// Only trigger a disconnect event if the peer was responsive.
Expand All @@ -142,20 +183,25 @@ func (c *connectEventManager) worker() {
}
c.lk.Lock()
}
state.accept()

Check warning on line 186 in bitswap/network/connecteventmanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/network/connecteventmanager.go#L186

Added line #L186 was not covered by tests
}
}

// Called whenever we receive a new connection. May be called many times.
func (c *connectEventManager) Connected(p peer.ID) {
c.lk.Lock()
defer c.lk.Unlock()

// !responsive -> responsive

if c.getState(p) == stateResponsive {
c.lk.Unlock()

Check warning on line 197 in bitswap/network/connecteventmanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/network/connecteventmanager.go#L197

Added line #L197 was not covered by tests
return
}
c.setState(p, stateResponsive)
wait := c.setState(p, stateResponsive)
c.lk.Unlock()
if wait {
c.waitStateAccept(p)
}

Check warning on line 204 in bitswap/network/connecteventmanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/network/connecteventmanager.go#L200-L204

Added lines #L200 - L204 were not covered by tests
}

// Called when we drop the final connection to a peer.
Expand Down

0 comments on commit 73c4112

Please sign in to comment.