Skip to content

Commit

Permalink
Merge pull request #75 from libp2p/feat/conn-tags
Browse files Browse the repository at this point in the history
Tag peers with live hop streams
  • Loading branch information
Stebalien authored May 7, 2019
2 parents 23f5535 + 66dfe16 commit 7c12b6d
Showing 1 changed file with 16 additions and 1 deletion.
17 changes: 16 additions & 1 deletion p2p/protocol/internal/circuitv1-deprecated/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,15 @@ func NewRelay(ctx context.Context, h host.Host, upgrader *tptu.Upgrader, opts ..

func (r *Relay) addLiveHop(from, to peer.ID) {
atomic.AddInt32(&r.liveHopCount, 1)
r.host.ConnManager().UpsertTag(from, "relay-hop-stream", func(v int) int { return v + 1 })
r.host.ConnManager().UpsertTag(to, "relay-hop-stream", func(v int) int { return v + 1 })
}

func (r *Relay) rmLiveHop(from, to peer.ID) {
atomic.AddInt32(&r.liveHopCount, -1)
r.host.ConnManager().UpsertTag(from, "relay-hop-stream", func(v int) int { return v - 1 })
r.host.ConnManager().UpsertTag(to, "relay-hop-stream", func(v int) int { return v - 1 })

}

func (r *Relay) GetActiveHops() int32 {
Expand Down Expand Up @@ -364,10 +369,18 @@ func (r *Relay) handleHopStream(s inet.Stream, msg *pb.CircuitRelay) {

r.addLiveHop(src.ID, dst.ID)

goroutines := new(int32)
*goroutines = 2
done := func() {
if atomic.AddInt32(goroutines, -1) == 0 {
r.rmLiveHop(src.ID, dst.ID)
}
}

// Don't reset streams after finishing or the other side will get an
// error, not an EOF.
go func() {
defer r.rmLiveHop(src.ID, dst.ID)
defer done()

buf := pool.Get(HopStreamBufferSize)
defer pool.Put(buf)
Expand All @@ -386,6 +399,8 @@ func (r *Relay) handleHopStream(s inet.Stream, msg *pb.CircuitRelay) {
}()

go func() {
defer done()

buf := pool.Get(HopStreamBufferSize)
defer pool.Put(buf)

Expand Down

0 comments on commit 7c12b6d

Please sign in to comment.