From 1224606dd05f9f70bb3f4944113011bb3c00f441 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 7 May 2019 21:13:59 +0300 Subject: [PATCH 1/2] tag peers with live hop streams --- .../internal/circuitv1-deprecated/relay.go | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/p2p/protocol/internal/circuitv1-deprecated/relay.go b/p2p/protocol/internal/circuitv1-deprecated/relay.go index 81d5d0a8b1..646df850cd 100644 --- a/p2p/protocol/internal/circuitv1-deprecated/relay.go +++ b/p2p/protocol/internal/circuitv1-deprecated/relay.go @@ -118,10 +118,15 @@ func NewRelay(ctx context.Context, h host.Host, upgrader *tptu.Upgrader, opts .. func (r *Relay) addLiveHop(from, to peer.ID) { atomic.AddInt32(&r.liveHopCount, 1) + r.host.ConnManager().UpsertTag(from, "relay-hop-stream", func(v int) int { return v + 1 }) + r.host.ConnManager().UpsertTag(to, "relay-hop-stream", func(v int) int { return v + 1 }) } func (r *Relay) rmLiveHop(from, to peer.ID) { atomic.AddInt32(&r.liveHopCount, -1) + r.host.ConnManager().UpsertTag(from, "relay-hop-stream", func(v int) int { return v - 1 }) + r.host.ConnManager().UpsertTag(to, "relay-hop-stream", func(v int) int { return v - 1 }) + } func (r *Relay) GetActiveHops() int32 { @@ -364,10 +369,13 @@ func (r *Relay) handleHopStream(s inet.Stream, msg *pb.CircuitRelay) { r.addLiveHop(src.ID, dst.ID) + var wg sync.WaitGroup + wg.Add(2) + // Don't reset streams after finishing or the other side will get an // error, not an EOF. go func() { - defer r.rmLiveHop(src.ID, dst.ID) + defer wg.Done() buf := pool.Get(HopStreamBufferSize) defer pool.Put(buf) @@ -386,6 +394,8 @@ func (r *Relay) handleHopStream(s inet.Stream, msg *pb.CircuitRelay) { }() go func() { + defer wg.Done() + buf := pool.Get(HopStreamBufferSize) defer pool.Put(buf) @@ -401,6 +411,11 @@ func (r *Relay) handleHopStream(s inet.Stream, msg *pb.CircuitRelay) { } log.Debugf("relayed %d bytes from %s to %s", count, src.ID.Pretty(), dst.ID.Pretty()) }() + + go func() { + wg.Wait() + r.rmLiveHop(src.ID, dst.ID) + }() } func (r *Relay) handleStopStream(s inet.Stream, msg *pb.CircuitRelay) { From 66dfe163ba40cd72c3bd10bb6ddea17a3286117f Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 7 May 2019 22:19:18 +0300 Subject: [PATCH 2/2] don't use an extra goroutine for live hop cleanup --- .../internal/circuitv1-deprecated/relay.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/p2p/protocol/internal/circuitv1-deprecated/relay.go b/p2p/protocol/internal/circuitv1-deprecated/relay.go index 646df850cd..0afcf0467a 100644 --- a/p2p/protocol/internal/circuitv1-deprecated/relay.go +++ b/p2p/protocol/internal/circuitv1-deprecated/relay.go @@ -369,13 +369,18 @@ func (r *Relay) handleHopStream(s inet.Stream, msg *pb.CircuitRelay) { r.addLiveHop(src.ID, dst.ID) - var wg sync.WaitGroup - wg.Add(2) + goroutines := new(int32) + *goroutines = 2 + done := func() { + if atomic.AddInt32(goroutines, -1) == 0 { + r.rmLiveHop(src.ID, dst.ID) + } + } // Don't reset streams after finishing or the other side will get an // error, not an EOF. go func() { - defer wg.Done() + defer done() buf := pool.Get(HopStreamBufferSize) defer pool.Put(buf) @@ -394,7 +399,7 @@ func (r *Relay) handleHopStream(s inet.Stream, msg *pb.CircuitRelay) { }() go func() { - defer wg.Done() + defer done() buf := pool.Get(HopStreamBufferSize) defer pool.Put(buf) @@ -411,11 +416,6 @@ func (r *Relay) handleHopStream(s inet.Stream, msg *pb.CircuitRelay) { } log.Debugf("relayed %d bytes from %s to %s", count, src.ID.Pretty(), dst.ID.Pretty()) }() - - go func() { - wg.Wait() - r.rmLiveHop(src.ID, dst.ID) - }() } func (r *Relay) handleStopStream(s inet.Stream, msg *pb.CircuitRelay) {