Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

Commit

Permalink
use the ResourceManager
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann committed Jan 14, 2022
1 parent fb190dd commit e760df1
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 21 deletions.
9 changes: 5 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ module github.com/libp2p/go-tcp-transport
go 1.16

require (
github.com/golang/mock v1.6.0
github.com/ipfs/go-log/v2 v2.5.0
github.com/libp2p/go-conn-security-multistream v0.3.0
github.com/libp2p/go-libp2p-core v0.13.1-0.20220104083644-a3dd401efe36
github.com/libp2p/go-libp2p-mplex v0.4.1
github.com/libp2p/go-libp2p-testing v0.5.0
github.com/libp2p/go-libp2p-transport-upgrader v0.6.1-0.20220104084635-5fc0a74b41f0
github.com/libp2p/go-libp2p-core v0.13.1-0.20220114101623-6b8d8bf59647
github.com/libp2p/go-libp2p-mplex v0.4.2-0.20220113124821-8bd3fd12e637
github.com/libp2p/go-libp2p-testing v0.6.1-0.20220114111157-d4fb83f89618
github.com/libp2p/go-libp2p-transport-upgrader v0.6.1-0.20220114112005-92eaefd089ab
github.com/libp2p/go-netroute v0.1.5 // indirect
github.com/libp2p/go-reuseport v0.1.0
github.com/libp2p/go-reuseport-transport v0.1.0
Expand Down
25 changes: 14 additions & 11 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfU
github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
Expand Down Expand Up @@ -163,13 +165,13 @@ github.com/ipfs/go-log v1.0.4/go.mod h1:oDCg2FkjogeFOhqqb+N39l2RpTNPL6F/StPkB3kP
github.com/ipfs/go-log/v2 v2.0.5/go.mod h1:eZs4Xt4ZUJQFM3DlanGhy7TkwwawCZcSByscwkWG+dw=
github.com/ipfs/go-log/v2 v2.1.1/go.mod h1:2v2nsGfZsvvAJz13SyFzf9ObaqwHiHxsPLEHntrv9KM=
github.com/ipfs/go-log/v2 v2.3.0/go.mod h1:QqGoj30OTpnKaG/LKTGTxoP2mmQtjVMEnK72gynbe/g=
github.com/ipfs/go-log/v2 v2.4.0/go.mod h1:nPZnh7Cj7lwS3LpRU5Mwr2ol1c2gXIEXuF6aywqrtmo=
github.com/ipfs/go-log/v2 v2.5.0 h1:+MhAooFd9XZNvR0i9FriKW6HB0ql7HNXUuflWtc0dd4=
github.com/ipfs/go-log/v2 v2.5.0/go.mod h1:prSpmC1Gpllc9UYWxDiZDreBYw7zp4Iqp1kOLU9U5UI=
github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5Djy1+FlA=
github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABoLk/+KKHggpk=
github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk=
github.com/jbenet/goprocess v0.1.3/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4=
github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
Expand Down Expand Up @@ -205,20 +207,21 @@ github.com/libp2p/go-conn-security-multistream v0.3.0/go.mod h1:EEP47t4fw/bTelVm
github.com/libp2p/go-flow-metrics v0.0.3/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs=
github.com/libp2p/go-libp2p-core v0.3.0/go.mod h1:ACp3DmS3/N64c2jDzcV429ukDpicbL6+TrrxANBjPGw=
github.com/libp2p/go-libp2p-core v0.5.0/go.mod h1:49XGI+kc38oGVwqSBhDEwytaAxgZasHhFfQKibzTls0=
github.com/libp2p/go-libp2p-core v0.8.0/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8=
github.com/libp2p/go-libp2p-core v0.10.0/go.mod h1:ECdxehoYosLYHgDDFa2N4yE8Y7aQRAMf0sX9mf2sbGg=
github.com/libp2p/go-libp2p-core v0.13.1-0.20220104083644-a3dd401efe36 h1:b/pMmgc5EV+dqSc+MjkX5xPa1nV6EKiOb0L0XT03Lic=
github.com/libp2p/go-libp2p-core v0.13.1-0.20220104083644-a3dd401efe36/go.mod h1:KlkHsZ0nKerWsXLZJm3LfFQwusI5k3iN4BgtYTE4IYE=
github.com/libp2p/go-libp2p-mplex v0.4.1 h1:/pyhkP1nLwjG3OM+VuaNJkQT/Pqq73WzB3aDN3Fx1sc=
github.com/libp2p/go-libp2p-mplex v0.4.1/go.mod h1:cmy+3GfqfM1PceHTLL7zQzAAYaryDu6iPSC+CIb094g=
github.com/libp2p/go-libp2p-core v0.13.1-0.20220113111946-0391e674afe4/go.mod h1:KlkHsZ0nKerWsXLZJm3LfFQwusI5k3iN4BgtYTE4IYE=
github.com/libp2p/go-libp2p-core v0.13.1-0.20220114101623-6b8d8bf59647 h1:r72wIR0ywdBUSTodR3WyY1Fdb9FUCHcYGT8NWxQH7wY=
github.com/libp2p/go-libp2p-core v0.13.1-0.20220114101623-6b8d8bf59647/go.mod h1:KlkHsZ0nKerWsXLZJm3LfFQwusI5k3iN4BgtYTE4IYE=
github.com/libp2p/go-libp2p-mplex v0.4.2-0.20220113124821-8bd3fd12e637 h1:yYLuo4pZ4JiKojIFkGYnxJ1Nrdr6qZX6jPrjjG5wKio=
github.com/libp2p/go-libp2p-mplex v0.4.2-0.20220113124821-8bd3fd12e637/go.mod h1:avzhJCZQaBxFEPplk5OTe5C9njPax2LRXBZHuOWwpsw=
github.com/libp2p/go-libp2p-pnet v0.2.0 h1:J6htxttBipJujEjz1y0a5+eYoiPcFHhSYHH6na5f0/k=
github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA=
github.com/libp2p/go-libp2p-testing v0.1.2-0.20200422005655-8775583591d8/go.mod h1:Qy8sAncLKpwXtS2dSnDOP8ktexIAHKu+J+pnZOFZLTc=
github.com/libp2p/go-libp2p-testing v0.4.0/go.mod h1:Q+PFXYoiYFN5CAEG2w3gLPEzotlKsNSbKQ/lImlOWF0=
github.com/libp2p/go-libp2p-testing v0.5.0 h1:bTjC29TTQ/ODq0ld3+0KLq3irdA5cAH3OMbRi0/QsvE=
github.com/libp2p/go-libp2p-testing v0.5.0/go.mod h1:QBk8fqIL1XNcno/l3/hhaIEn4aLRijpYOR+zVjjlh+A=
github.com/libp2p/go-libp2p-transport-upgrader v0.6.1-0.20220104084635-5fc0a74b41f0 h1:eD/QJCpcImYOUl6MdBuxMByVaEe5VMm463zJG6oUg9o=
github.com/libp2p/go-libp2p-transport-upgrader v0.6.1-0.20220104084635-5fc0a74b41f0/go.mod h1:ByIyNe8asQhgcyIHetb4f+UgV+hDrA8pQ3L/TgNs+RI=
github.com/libp2p/go-libp2p-testing v0.6.1-0.20220113123347-8dda553fe7f3/go.mod h1:VeazaHJZ7jwgje7xyb46LwP+9AhvIYliIq1FE/vuTMk=
github.com/libp2p/go-libp2p-testing v0.6.1-0.20220114111157-d4fb83f89618 h1:vzHomenj6MtCKN0OwFyCha/MA+9P6TO1p4Tvvj4kM/g=
github.com/libp2p/go-libp2p-testing v0.6.1-0.20220114111157-d4fb83f89618/go.mod h1:/UYj9SOREV7S5U7QUhp7mZnAmu3N+apRuyqIp+hMd7M=
github.com/libp2p/go-libp2p-transport-upgrader v0.6.1-0.20220114112005-92eaefd089ab h1:uWnpS1h44xaoMghNV/cyhLpIUGWwvrMOIMAXi1NplJs=
github.com/libp2p/go-libp2p-transport-upgrader v0.6.1-0.20220114112005-92eaefd089ab/go.mod h1:WJv0GCzEmy+wJlAf9gKtGDHt8cZ0UVS+Y6ubHUHu2Sw=
github.com/libp2p/go-maddr-filter v0.1.0/go.mod h1:VzZhTXkMucEGGEOSKddrwGiOv0tUhgnKqNEmIAz/bPU=
github.com/libp2p/go-mplex v0.3.0 h1:U1T+vmCYJaEoDJPV1aq31N56hS+lJgb397GsylNSgrU=
github.com/libp2p/go-mplex v0.3.0/go.mod h1:0Oy/A9PQlwBytDRp4wSkFnzHYDKcpLot35JQ6msjvYQ=
Expand Down Expand Up @@ -427,7 +430,6 @@ go.opencensus.io v0.20.1/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk=
go.opencensus.io v0.20.2/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
Expand Down Expand Up @@ -560,6 +562,7 @@ golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200103221440-774c71fcf114/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
27 changes: 25 additions & 2 deletions tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ func (ll *tcpListener) Accept() (manet.Conn, error) {
}
tryLinger(c, ll.sec)
tryKeepAlive(c, true)
// We're not calling OpenConnection in the resource manager here,
// since the manet.Conn doesn't allow us to save the scope.
// It's the caller's (usually the go-libp2p-transport-upgrader) responsibility
// to call the resource manager.
return c, nil
}

Expand All @@ -94,6 +98,7 @@ func DisableReuseport() Option {
return nil
}
}

func WithConnectionTimeout(d time.Duration) Option {
return func(tr *TcpTransport) error {
tr.connectTimeout = d
Expand All @@ -113,17 +118,23 @@ type TcpTransport struct {
// TCP connect timeout
connectTimeout time.Duration

rcmgr network.ResourceManager

reuse rtpt.Transport
}

var _ transport.Transport = &TcpTransport{}

// NewTCPTransport creates a tcp transport object that tracks dialers and listeners
// created. It represents an entire TCP stack (though it might not necessarily be).
func NewTCPTransport(upgrader transport.Upgrader, opts ...Option) (*TcpTransport, error) {
func NewTCPTransport(upgrader transport.Upgrader, rcmgr network.ResourceManager, opts ...Option) (*TcpTransport, error) {
if rcmgr == nil {
rcmgr = network.NullResourceManager
}
tr := &TcpTransport{
Upgrader: upgrader,
connectTimeout: defaultConnectTimeout, // can be set by using the WithConnectionTimeout option
rcmgr: rcmgr,
}
for _, o := range opts {
if err := o(tr); err != nil {
Expand Down Expand Up @@ -158,8 +169,19 @@ func (t *TcpTransport) maDial(ctx context.Context, raddr ma.Multiaddr) (manet.Co

// Dial dials the peer at the remote address.
func (t *TcpTransport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID) (transport.CapableConn, error) {
connScope, err := t.rcmgr.OpenConnection(network.DirOutbound, true)
if err != nil {
log.Debugw("resource manager blocked outgoing connection", "peer", p, "addr", raddr, "error", err)
return nil, err
}
if err := connScope.SetPeer(p); err != nil {
log.Debugw("resource manager blocked outgoing connection for peer", "peer", p, "addr", raddr, "error", err)
connScope.Done()
return nil, err
}
conn, err := t.maDial(ctx, raddr)
if err != nil {
connScope.Done()
return nil, err
}
// Set linger to 0 so we never get stuck in the TIME-WAIT state. When
Expand All @@ -169,13 +191,14 @@ func (t *TcpTransport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID)
tryKeepAlive(conn, true)
c, err := newTracingConn(conn, true)
if err != nil {
connScope.Done()
return nil, err
}
direction := network.DirOutbound
if ok, isClient, _ := network.GetSimultaneousConnect(ctx); ok && !isClient {
direction = network.DirInbound
}
return t.Upgrader.Upgrade(ctx, t, c, direction, p)
return t.Upgrader.Upgrade(ctx, t, c, direction, p, connScope)
}

// UseReuseport returns true if reuseport is enabled and available.
Expand Down
64 changes: 60 additions & 4 deletions tcp_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package tcp

import (
"context"
"errors"
"testing"

"github.com/libp2p/go-libp2p-core/network"

"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/sec"
Expand All @@ -11,11 +15,13 @@ import (

csms "github.com/libp2p/go-conn-security-multistream"
mplex "github.com/libp2p/go-libp2p-mplex"
mocknetwork "github.com/libp2p/go-libp2p-testing/mocks/network"
ttransport "github.com/libp2p/go-libp2p-testing/suites/transport"
tptu "github.com/libp2p/go-libp2p-transport-upgrader"

ma "github.com/multiformats/go-multiaddr"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
)

Expand All @@ -26,11 +32,11 @@ func TestTcpTransport(t *testing.T) {

ua, err := tptu.New(ia, new(mplex.Transport))
require.NoError(t, err)
ta, err := NewTCPTransport(ua)
ta, err := NewTCPTransport(ua, nil)
require.NoError(t, err)
ub, err := tptu.New(ib, new(mplex.Transport))
require.NoError(t, err)
tb, err := NewTCPTransport(ub)
tb, err := NewTCPTransport(ub, nil)
require.NoError(t, err)

zero := "/ip4/127.0.0.1/tcp/0"
Expand All @@ -41,13 +47,63 @@ func TestTcpTransport(t *testing.T) {
envReuseportVal = true
}

func TestResourceManager(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

peerA, ia := makeInsecureMuxer(t)
_, ib := makeInsecureMuxer(t)

ua, err := tptu.New(ia, new(mplex.Transport))
require.NoError(t, err)
ta, err := NewTCPTransport(ua, nil)
require.NoError(t, err)
ln, err := ta.Listen(ma.StringCast("/ip4/127.0.0.1/tcp/0"))
require.NoError(t, err)
defer ln.Close()

ub, err := tptu.New(ib, new(mplex.Transport))
require.NoError(t, err)
rcmgr := mocknetwork.NewMockResourceManager(ctrl)
tb, err := NewTCPTransport(ub, rcmgr)
require.NoError(t, err)

t.Run("success", func(t *testing.T) {
scope := mocknetwork.NewMockConnManagementScope(ctrl)
rcmgr.EXPECT().OpenConnection(network.DirOutbound, true).Return(scope, nil)
scope.EXPECT().SetPeer(peerA)
scope.EXPECT().PeerScope().Return(network.NullScope).AnyTimes() // called by the upgrader
conn, err := tb.Dial(context.Background(), ln.Multiaddr(), peerA)
require.NoError(t, err)
scope.EXPECT().Done()
defer conn.Close()
})

t.Run("connection denied", func(t *testing.T) {
rerr := errors.New("nope")
rcmgr.EXPECT().OpenConnection(network.DirOutbound, true).Return(nil, rerr)
_, err = tb.Dial(context.Background(), ln.Multiaddr(), peerA)
require.ErrorIs(t, err, rerr)
})

t.Run("peer denied", func(t *testing.T) {
scope := mocknetwork.NewMockConnManagementScope(ctrl)
rcmgr.EXPECT().OpenConnection(network.DirOutbound, true).Return(scope, nil)
rerr := errors.New("nope")
scope.EXPECT().SetPeer(peerA).Return(rerr)
scope.EXPECT().Done()
_, err = tb.Dial(context.Background(), ln.Multiaddr(), peerA)
require.ErrorIs(t, err, rerr)
})
}

func TestTcpTransportCantDialDNS(t *testing.T) {
for i := 0; i < 2; i++ {
dnsa, err := ma.NewMultiaddr("/dns4/example.com/tcp/1234")
require.NoError(t, err)

var u transport.Upgrader
tpt, err := NewTCPTransport(u)
tpt, err := NewTCPTransport(u, nil)
require.NoError(t, err)

if tpt.CanDial(dnsa) {
Expand All @@ -65,7 +121,7 @@ func TestTcpTransportCantListenUtp(t *testing.T) {
require.NoError(t, err)

var u transport.Upgrader
tpt, err := NewTCPTransport(u)
tpt, err := NewTCPTransport(u, nil)
require.NoError(t, err)

_, err = tpt.Listen(utpa)
Expand Down

0 comments on commit e760df1

Please sign in to comment.