Skip to content
This repository has been archived by the owner on Sep 9, 2022. It is now read-only.

Commit

Permalink
call Stream.Reset instead of Stream.Close
Browse files Browse the repository at this point in the history
May fix ipfs/kubo#6237

Basically,

1. We hang while closing a stream (because `Close` waits).
2. This blocks the connection manager because it assumes that close _doesn't_ wait.

This may also fix a stream leak.
  • Loading branch information
Stebalien authored and vyzo committed May 22, 2019
1 parent 0ca0e1a commit e972c1f
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 9 deletions.
35 changes: 30 additions & 5 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package relay
import (
"fmt"
"net"
"time"

inet "github.com/libp2p/go-libp2p-net"
pstore "github.com/libp2p/go-libp2p-peerstore"
Expand All @@ -11,7 +12,7 @@ import (
)

type Conn struct {
inet.Stream
stream inet.Stream
remote pstore.PeerInfo
}

Expand All @@ -28,29 +29,53 @@ func (n *NetAddr) String() string {
return fmt.Sprintf("relay[%s-%s]", n.Remote, n.Relay)
}

func (c *Conn) Close() error {
return c.stream.Reset()
}

func (c *Conn) Read(buf []byte) (int, error) {
return c.stream.Read(buf)
}

func (c *Conn) Write(buf []byte) (int, error) {
return c.stream.Write(buf)
}

func (c *Conn) SetDeadline(t time.Time) error {
return c.stream.SetDeadline(t)
}

func (c *Conn) SetReadDeadline(t time.Time) error {
return c.stream.SetReadDeadline(t)
}

func (c *Conn) SetWriteDeadline(t time.Time) error {
return c.stream.SetReadDeadline(t)
}

func (c *Conn) RemoteAddr() net.Addr {
return &NetAddr{
Relay: c.Conn().RemotePeer().Pretty(),
Relay: c.stream.Conn().RemotePeer().Pretty(),
Remote: c.remote.ID.Pretty(),
}
}

// TODO: is it okay to cast c.Conn().RemotePeer() into a multiaddr? might be "user input"
func (c *Conn) RemoteMultiaddr() ma.Multiaddr {
proto := ma.ProtocolWithCode(ma.P_P2P).Name
peerid := c.Conn().RemotePeer().Pretty()
peerid := c.stream.Conn().RemotePeer().Pretty()
p2paddr := ma.StringCast(fmt.Sprintf("/%s/%s", proto, peerid))

circaddr := ma.Cast(ma.CodeToVarint(P_CIRCUIT))
return p2paddr.Encapsulate(circaddr)
}

func (c *Conn) LocalMultiaddr() ma.Multiaddr {
return c.Conn().LocalMultiaddr()
return c.stream.Conn().LocalMultiaddr()
}

func (c *Conn) LocalAddr() net.Addr {
na, err := manet.ToNetAddr(c.Conn().LocalMultiaddr())
na, err := manet.ToNetAddr(c.stream.Conn().LocalMultiaddr())
if err != nil {
log.Error("failed to convert local multiaddr to net addr:", err)
return nil
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gxed/hashland/keccakpg v0.0.1/go.mod h1:kRzw3HkwxFU1mpmPP8v1WyQzwdGfmKFJ6tItnhQ67kU=
github.com/gxed/hashland/murmur3 v0.0.1/go.mod h1:KjXop02n4/ckmZSnY2+HKcLud/tcmvhST0bie/0lS48=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uPribsnS6o=
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
Expand Down
4 changes: 2 additions & 2 deletions listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ func (r *Relay) Listener() *RelayListener {
func (l *RelayListener) Accept() (manet.Conn, error) {
select {
case c := <-l.incoming:
err := l.Relay().writeResponse(c.Stream, pb.CircuitRelay_SUCCESS)
err := l.Relay().writeResponse(c.stream, pb.CircuitRelay_SUCCESS)
if err != nil {
log.Debugf("error writing relay response: %s", err.Error())
c.Stream.Reset()
c.stream.Reset()
return nil, err
}

Expand Down
4 changes: 2 additions & 2 deletions relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (r *Relay) DialPeer(ctx context.Context, relay pstore.PeerInfo, dest pstore
return nil, RelayError{msg.GetCode()}
}

return &Conn{Stream: s, remote: dest}, nil
return &Conn{stream: s, remote: dest}, nil
}

func (r *Relay) Matches(addr ma.Multiaddr) bool {
Expand Down Expand Up @@ -438,7 +438,7 @@ func (r *Relay) handleStopStream(s inet.Stream, msg *pb.CircuitRelay) {
}

select {
case r.incoming <- &Conn{Stream: s, remote: src}:
case r.incoming <- &Conn{stream: s, remote: src}:
case <-time.After(RelayAcceptTimeout):
r.handleError(s, pb.CircuitRelay_STOP_RELAY_REFUSED)
}
Expand Down

0 comments on commit e972c1f

Please sign in to comment.