From e760df1f0b8403f76436e0ad8beb2ef5d250846f Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Wed, 22 Dec 2021 15:11:32 +0400 Subject: [PATCH] use the ResourceManager --- go.mod | 9 ++++---- go.sum | 25 ++++++++++++--------- tcp.go | 27 ++++++++++++++++++++-- tcp_test.go | 64 +++++++++++++++++++++++++++++++++++++++++++++++++---- 4 files changed, 104 insertions(+), 21 deletions(-) diff --git a/go.mod b/go.mod index 0cd6415..178dc1e 100644 --- a/go.mod +++ b/go.mod @@ -3,12 +3,13 @@ module github.com/libp2p/go-tcp-transport go 1.16 require ( + github.com/golang/mock v1.6.0 github.com/ipfs/go-log/v2 v2.5.0 github.com/libp2p/go-conn-security-multistream v0.3.0 - github.com/libp2p/go-libp2p-core v0.13.1-0.20220104083644-a3dd401efe36 - github.com/libp2p/go-libp2p-mplex v0.4.1 - github.com/libp2p/go-libp2p-testing v0.5.0 - github.com/libp2p/go-libp2p-transport-upgrader v0.6.1-0.20220104084635-5fc0a74b41f0 + github.com/libp2p/go-libp2p-core v0.13.1-0.20220114101623-6b8d8bf59647 + github.com/libp2p/go-libp2p-mplex v0.4.2-0.20220113124821-8bd3fd12e637 + github.com/libp2p/go-libp2p-testing v0.6.1-0.20220114111157-d4fb83f89618 + github.com/libp2p/go-libp2p-transport-upgrader v0.6.1-0.20220114112005-92eaefd089ab github.com/libp2p/go-netroute v0.1.5 // indirect github.com/libp2p/go-reuseport v0.1.0 github.com/libp2p/go-reuseport-transport v0.1.0 diff --git a/go.sum b/go.sum index 9634018..bf71bfc 100644 --- a/go.sum +++ b/go.sum @@ -93,6 +93,8 @@ github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfU github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= +github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -163,13 +165,13 @@ github.com/ipfs/go-log v1.0.4/go.mod h1:oDCg2FkjogeFOhqqb+N39l2RpTNPL6F/StPkB3kP github.com/ipfs/go-log/v2 v2.0.5/go.mod h1:eZs4Xt4ZUJQFM3DlanGhy7TkwwawCZcSByscwkWG+dw= github.com/ipfs/go-log/v2 v2.1.1/go.mod h1:2v2nsGfZsvvAJz13SyFzf9ObaqwHiHxsPLEHntrv9KM= github.com/ipfs/go-log/v2 v2.3.0/go.mod h1:QqGoj30OTpnKaG/LKTGTxoP2mmQtjVMEnK72gynbe/g= +github.com/ipfs/go-log/v2 v2.4.0/go.mod h1:nPZnh7Cj7lwS3LpRU5Mwr2ol1c2gXIEXuF6aywqrtmo= github.com/ipfs/go-log/v2 v2.5.0 h1:+MhAooFd9XZNvR0i9FriKW6HB0ql7HNXUuflWtc0dd4= github.com/ipfs/go-log/v2 v2.5.0/go.mod h1:prSpmC1Gpllc9UYWxDiZDreBYw7zp4Iqp1kOLU9U5UI= github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5Djy1+FlA= github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABoLk/+KKHggpk= github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk= github.com/jbenet/goprocess v0.1.3/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4= -github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= @@ -205,20 +207,21 @@ github.com/libp2p/go-conn-security-multistream v0.3.0/go.mod h1:EEP47t4fw/bTelVm github.com/libp2p/go-flow-metrics v0.0.3/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs= github.com/libp2p/go-libp2p-core v0.3.0/go.mod h1:ACp3DmS3/N64c2jDzcV429ukDpicbL6+TrrxANBjPGw= github.com/libp2p/go-libp2p-core v0.5.0/go.mod h1:49XGI+kc38oGVwqSBhDEwytaAxgZasHhFfQKibzTls0= -github.com/libp2p/go-libp2p-core v0.8.0/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8= github.com/libp2p/go-libp2p-core v0.10.0/go.mod h1:ECdxehoYosLYHgDDFa2N4yE8Y7aQRAMf0sX9mf2sbGg= -github.com/libp2p/go-libp2p-core v0.13.1-0.20220104083644-a3dd401efe36 h1:b/pMmgc5EV+dqSc+MjkX5xPa1nV6EKiOb0L0XT03Lic= -github.com/libp2p/go-libp2p-core v0.13.1-0.20220104083644-a3dd401efe36/go.mod h1:KlkHsZ0nKerWsXLZJm3LfFQwusI5k3iN4BgtYTE4IYE= -github.com/libp2p/go-libp2p-mplex v0.4.1 h1:/pyhkP1nLwjG3OM+VuaNJkQT/Pqq73WzB3aDN3Fx1sc= -github.com/libp2p/go-libp2p-mplex v0.4.1/go.mod h1:cmy+3GfqfM1PceHTLL7zQzAAYaryDu6iPSC+CIb094g= +github.com/libp2p/go-libp2p-core v0.13.1-0.20220113111946-0391e674afe4/go.mod h1:KlkHsZ0nKerWsXLZJm3LfFQwusI5k3iN4BgtYTE4IYE= +github.com/libp2p/go-libp2p-core v0.13.1-0.20220114101623-6b8d8bf59647 h1:r72wIR0ywdBUSTodR3WyY1Fdb9FUCHcYGT8NWxQH7wY= +github.com/libp2p/go-libp2p-core v0.13.1-0.20220114101623-6b8d8bf59647/go.mod h1:KlkHsZ0nKerWsXLZJm3LfFQwusI5k3iN4BgtYTE4IYE= +github.com/libp2p/go-libp2p-mplex v0.4.2-0.20220113124821-8bd3fd12e637 h1:yYLuo4pZ4JiKojIFkGYnxJ1Nrdr6qZX6jPrjjG5wKio= +github.com/libp2p/go-libp2p-mplex v0.4.2-0.20220113124821-8bd3fd12e637/go.mod h1:avzhJCZQaBxFEPplk5OTe5C9njPax2LRXBZHuOWwpsw= github.com/libp2p/go-libp2p-pnet v0.2.0 h1:J6htxttBipJujEjz1y0a5+eYoiPcFHhSYHH6na5f0/k= github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA= github.com/libp2p/go-libp2p-testing v0.1.2-0.20200422005655-8775583591d8/go.mod h1:Qy8sAncLKpwXtS2dSnDOP8ktexIAHKu+J+pnZOFZLTc= -github.com/libp2p/go-libp2p-testing v0.4.0/go.mod h1:Q+PFXYoiYFN5CAEG2w3gLPEzotlKsNSbKQ/lImlOWF0= -github.com/libp2p/go-libp2p-testing v0.5.0 h1:bTjC29TTQ/ODq0ld3+0KLq3irdA5cAH3OMbRi0/QsvE= github.com/libp2p/go-libp2p-testing v0.5.0/go.mod h1:QBk8fqIL1XNcno/l3/hhaIEn4aLRijpYOR+zVjjlh+A= -github.com/libp2p/go-libp2p-transport-upgrader v0.6.1-0.20220104084635-5fc0a74b41f0 h1:eD/QJCpcImYOUl6MdBuxMByVaEe5VMm463zJG6oUg9o= -github.com/libp2p/go-libp2p-transport-upgrader v0.6.1-0.20220104084635-5fc0a74b41f0/go.mod h1:ByIyNe8asQhgcyIHetb4f+UgV+hDrA8pQ3L/TgNs+RI= +github.com/libp2p/go-libp2p-testing v0.6.1-0.20220113123347-8dda553fe7f3/go.mod h1:VeazaHJZ7jwgje7xyb46LwP+9AhvIYliIq1FE/vuTMk= +github.com/libp2p/go-libp2p-testing v0.6.1-0.20220114111157-d4fb83f89618 h1:vzHomenj6MtCKN0OwFyCha/MA+9P6TO1p4Tvvj4kM/g= +github.com/libp2p/go-libp2p-testing v0.6.1-0.20220114111157-d4fb83f89618/go.mod h1:/UYj9SOREV7S5U7QUhp7mZnAmu3N+apRuyqIp+hMd7M= +github.com/libp2p/go-libp2p-transport-upgrader v0.6.1-0.20220114112005-92eaefd089ab h1:uWnpS1h44xaoMghNV/cyhLpIUGWwvrMOIMAXi1NplJs= +github.com/libp2p/go-libp2p-transport-upgrader v0.6.1-0.20220114112005-92eaefd089ab/go.mod h1:WJv0GCzEmy+wJlAf9gKtGDHt8cZ0UVS+Y6ubHUHu2Sw= github.com/libp2p/go-maddr-filter v0.1.0/go.mod h1:VzZhTXkMucEGGEOSKddrwGiOv0tUhgnKqNEmIAz/bPU= github.com/libp2p/go-mplex v0.3.0 h1:U1T+vmCYJaEoDJPV1aq31N56hS+lJgb397GsylNSgrU= github.com/libp2p/go-mplex v0.3.0/go.mod h1:0Oy/A9PQlwBytDRp4wSkFnzHYDKcpLot35JQ6msjvYQ= @@ -427,7 +430,6 @@ go.opencensus.io v0.20.1/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk= go.opencensus.io v0.20.2/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= -go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= @@ -560,6 +562,7 @@ golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200103221440-774c71fcf114/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= +golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/tcp.go b/tcp.go index d21dcb0..e9de3b3 100644 --- a/tcp.go +++ b/tcp.go @@ -83,6 +83,10 @@ func (ll *tcpListener) Accept() (manet.Conn, error) { } tryLinger(c, ll.sec) tryKeepAlive(c, true) + // We're not calling OpenConnection in the resource manager here, + // since the manet.Conn doesn't allow us to save the scope. + // It's the caller's (usually the go-libp2p-transport-upgrader) responsibility + // to call the resource manager. return c, nil } @@ -94,6 +98,7 @@ func DisableReuseport() Option { return nil } } + func WithConnectionTimeout(d time.Duration) Option { return func(tr *TcpTransport) error { tr.connectTimeout = d @@ -113,6 +118,8 @@ type TcpTransport struct { // TCP connect timeout connectTimeout time.Duration + rcmgr network.ResourceManager + reuse rtpt.Transport } @@ -120,10 +127,14 @@ var _ transport.Transport = &TcpTransport{} // NewTCPTransport creates a tcp transport object that tracks dialers and listeners // created. It represents an entire TCP stack (though it might not necessarily be). -func NewTCPTransport(upgrader transport.Upgrader, opts ...Option) (*TcpTransport, error) { +func NewTCPTransport(upgrader transport.Upgrader, rcmgr network.ResourceManager, opts ...Option) (*TcpTransport, error) { + if rcmgr == nil { + rcmgr = network.NullResourceManager + } tr := &TcpTransport{ Upgrader: upgrader, connectTimeout: defaultConnectTimeout, // can be set by using the WithConnectionTimeout option + rcmgr: rcmgr, } for _, o := range opts { if err := o(tr); err != nil { @@ -158,8 +169,19 @@ func (t *TcpTransport) maDial(ctx context.Context, raddr ma.Multiaddr) (manet.Co // Dial dials the peer at the remote address. func (t *TcpTransport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID) (transport.CapableConn, error) { + connScope, err := t.rcmgr.OpenConnection(network.DirOutbound, true) + if err != nil { + log.Debugw("resource manager blocked outgoing connection", "peer", p, "addr", raddr, "error", err) + return nil, err + } + if err := connScope.SetPeer(p); err != nil { + log.Debugw("resource manager blocked outgoing connection for peer", "peer", p, "addr", raddr, "error", err) + connScope.Done() + return nil, err + } conn, err := t.maDial(ctx, raddr) if err != nil { + connScope.Done() return nil, err } // Set linger to 0 so we never get stuck in the TIME-WAIT state. When @@ -169,13 +191,14 @@ func (t *TcpTransport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID) tryKeepAlive(conn, true) c, err := newTracingConn(conn, true) if err != nil { + connScope.Done() return nil, err } direction := network.DirOutbound if ok, isClient, _ := network.GetSimultaneousConnect(ctx); ok && !isClient { direction = network.DirInbound } - return t.Upgrader.Upgrade(ctx, t, c, direction, p) + return t.Upgrader.Upgrade(ctx, t, c, direction, p, connScope) } // UseReuseport returns true if reuseport is enabled and available. diff --git a/tcp_test.go b/tcp_test.go index 21a12bd..b650e9c 100644 --- a/tcp_test.go +++ b/tcp_test.go @@ -1,8 +1,12 @@ package tcp import ( + "context" + "errors" "testing" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/sec" @@ -11,11 +15,13 @@ import ( csms "github.com/libp2p/go-conn-security-multistream" mplex "github.com/libp2p/go-libp2p-mplex" + mocknetwork "github.com/libp2p/go-libp2p-testing/mocks/network" ttransport "github.com/libp2p/go-libp2p-testing/suites/transport" tptu "github.com/libp2p/go-libp2p-transport-upgrader" ma "github.com/multiformats/go-multiaddr" + "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" ) @@ -26,11 +32,11 @@ func TestTcpTransport(t *testing.T) { ua, err := tptu.New(ia, new(mplex.Transport)) require.NoError(t, err) - ta, err := NewTCPTransport(ua) + ta, err := NewTCPTransport(ua, nil) require.NoError(t, err) ub, err := tptu.New(ib, new(mplex.Transport)) require.NoError(t, err) - tb, err := NewTCPTransport(ub) + tb, err := NewTCPTransport(ub, nil) require.NoError(t, err) zero := "/ip4/127.0.0.1/tcp/0" @@ -41,13 +47,63 @@ func TestTcpTransport(t *testing.T) { envReuseportVal = true } +func TestResourceManager(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + peerA, ia := makeInsecureMuxer(t) + _, ib := makeInsecureMuxer(t) + + ua, err := tptu.New(ia, new(mplex.Transport)) + require.NoError(t, err) + ta, err := NewTCPTransport(ua, nil) + require.NoError(t, err) + ln, err := ta.Listen(ma.StringCast("/ip4/127.0.0.1/tcp/0")) + require.NoError(t, err) + defer ln.Close() + + ub, err := tptu.New(ib, new(mplex.Transport)) + require.NoError(t, err) + rcmgr := mocknetwork.NewMockResourceManager(ctrl) + tb, err := NewTCPTransport(ub, rcmgr) + require.NoError(t, err) + + t.Run("success", func(t *testing.T) { + scope := mocknetwork.NewMockConnManagementScope(ctrl) + rcmgr.EXPECT().OpenConnection(network.DirOutbound, true).Return(scope, nil) + scope.EXPECT().SetPeer(peerA) + scope.EXPECT().PeerScope().Return(network.NullScope).AnyTimes() // called by the upgrader + conn, err := tb.Dial(context.Background(), ln.Multiaddr(), peerA) + require.NoError(t, err) + scope.EXPECT().Done() + defer conn.Close() + }) + + t.Run("connection denied", func(t *testing.T) { + rerr := errors.New("nope") + rcmgr.EXPECT().OpenConnection(network.DirOutbound, true).Return(nil, rerr) + _, err = tb.Dial(context.Background(), ln.Multiaddr(), peerA) + require.ErrorIs(t, err, rerr) + }) + + t.Run("peer denied", func(t *testing.T) { + scope := mocknetwork.NewMockConnManagementScope(ctrl) + rcmgr.EXPECT().OpenConnection(network.DirOutbound, true).Return(scope, nil) + rerr := errors.New("nope") + scope.EXPECT().SetPeer(peerA).Return(rerr) + scope.EXPECT().Done() + _, err = tb.Dial(context.Background(), ln.Multiaddr(), peerA) + require.ErrorIs(t, err, rerr) + }) +} + func TestTcpTransportCantDialDNS(t *testing.T) { for i := 0; i < 2; i++ { dnsa, err := ma.NewMultiaddr("/dns4/example.com/tcp/1234") require.NoError(t, err) var u transport.Upgrader - tpt, err := NewTCPTransport(u) + tpt, err := NewTCPTransport(u, nil) require.NoError(t, err) if tpt.CanDial(dnsa) { @@ -65,7 +121,7 @@ func TestTcpTransportCantListenUtp(t *testing.T) { require.NoError(t, err) var u transport.Upgrader - tpt, err := NewTCPTransport(u) + tpt, err := NewTCPTransport(u, nil) require.NoError(t, err) _, err = tpt.Listen(utpa)