Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(tcpreuse): add options for sharing TCP listeners amongst TCP, WS and WSS transports #2984

Merged
merged 32 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
6281130
feat(tcpreuse): add options for sharing TCP listeners amongst TCP, WS…
aschmahmann Sep 25, 2024
2ae20f4
add tests for listener
sukunrt Oct 2, 2024
05ee445
create conn scope early to prevent DoS attacks
sukunrt Oct 6, 2024
3457a7b
add fx option, move rcmgr and upgrader to sharedtcp
sukunrt Oct 6, 2024
ac8b478
make fewer concurrent connections in test, it breaks mac and windows
sukunrt Oct 7, 2024
6d5fa53
add some comments
sukunrt Oct 7, 2024
0115a62
Add OS specific sampledconn
MarcoPolo Oct 29, 2024
cdbf16c
Return net.Conn unwrapped if possible
MarcoPolo Oct 29, 2024
7f4f875
feat(tcpreuse): add Windows sampledconn
aschmahmann Oct 30, 2024
00c9403
Add test for metrics+unix
MarcoPolo Oct 30, 2024
2c053d8
tcp transport: Parameterize metrics collector in TCP
MarcoPolo Oct 30, 2024
7e34d05
simplify demultiplex a bit
MarcoPolo Oct 30, 2024
e465b29
transport-testsuite(old): Parameterize subtests
MarcoPolo Oct 30, 2024
677222a
tcp-transport: selectively run only 1conn tests
MarcoPolo Oct 30, 2024
8c334af
sampledconn: update tests to be manet aware
MarcoPolo Oct 30, 2024
b686b80
Remove unused interface
MarcoPolo Oct 30, 2024
a28a244
sampledconn: Add test case back in
MarcoPolo Oct 30, 2024
a39490f
Comment nits
MarcoPolo Oct 30, 2024
76f560b
tcpreuse: flip reuseport bool
MarcoPolo Oct 31, 2024
c314a2a
tcpreuse: return an error on multiple listeners for the same addr+con…
MarcoPolo Oct 31, 2024
56dabd4
websocket: return consistent error
MarcoPolo Oct 31, 2024
9ebfc89
Remove todo
MarcoPolo Oct 31, 2024
a722a7c
tcp: revert parameterize metrics collector
MarcoPolo Oct 31, 2024
85be1f5
typo
MarcoPolo Oct 31, 2024
18af914
PR review
MarcoPolo Oct 31, 2024
f9f39b7
typo
MarcoPolo Oct 31, 2024
9ca4ae7
add timeout
MarcoPolo Oct 31, 2024
3d26583
pr nit
MarcoPolo Oct 31, 2024
3487369
remove unused option
MarcoPolo Oct 31, 2024
74e9c05
Expand comment
MarcoPolo Oct 31, 2024
fd53643
remove 0x160304 magic byte match
MarcoPolo Oct 31, 2024
ccd1609
Fix test to handle existing listener error
MarcoPolo Oct 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/libp2p/go-libp2p/p2p/protocol/holepunch"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
"github.com/libp2p/go-libp2p/p2p/transport/quicreuse"
"github.com/libp2p/go-libp2p/p2p/transport/tcpreuse"
libp2pwebrtc "github.com/libp2p/go-libp2p/p2p/transport/webrtc"
"github.com/prometheus/client_golang/prometheus"

Expand Down Expand Up @@ -145,6 +146,8 @@ type Config struct {
CustomIPv6BlackHoleSuccessCounter bool

UserFxOptions []fx.Option

ShareTCPListener bool
}

func (cfg *Config) makeSwarm(eventBus event.Bus, enableMetrics bool) (*swarm.Swarm, error) {
Expand Down Expand Up @@ -289,6 +292,12 @@ func (cfg *Config) addTransports() ([]fx.Option, error) {
fx.Provide(func() connmgr.ConnectionGater { return cfg.ConnectionGater }),
fx.Provide(func() pnet.PSK { return cfg.PSK }),
fx.Provide(func() network.ResourceManager { return cfg.ResourceManager }),
fx.Provide(func(gater connmgr.ConnectionGater, rcmgr network.ResourceManager) *tcpreuse.ConnMgr {
if !cfg.ShareTCPListener {
return nil
}
return tcpreuse.NewConnMgr(tcpreuse.EnvReuseportVal, gater, rcmgr)
MarcoPolo marked this conversation as resolved.
Show resolved Hide resolved
}),
fx.Provide(func(cm *quicreuse.ConnManager, sw *swarm.Swarm) libp2pwebrtc.ListenUDPFn {
hasQuicAddrPortFor := func(network string, laddr *net.UDPAddr) bool {
quicAddrPorts := map[string]struct{}{}
Expand Down
26 changes: 25 additions & 1 deletion libp2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestTransportConstructor(t *testing.T) {
_ connmgr.ConnectionGater,
upgrader transport.Upgrader,
) transport.Transport {
tpt, err := tcp.NewTCPTransport(upgrader, nil)
tpt, err := tcp.NewTCPTransport(upgrader, nil, nil)
require.NoError(t, err)
return tpt
}
Expand Down Expand Up @@ -751,3 +751,27 @@ func getTLSConf(t *testing.T, ip net.IP, start, end time.Time) *tls.Config {
}},
}
}

func TestSharedTCPAddr(t *testing.T) {
h, err := New(
ShareTCPListener(),
Transport(tcp.NewTCPTransport),
Transport(websocket.New),
ListenAddrStrings("/ip4/0.0.0.0/tcp/8888"),
ListenAddrStrings("/ip4/0.0.0.0/tcp/8888/ws"),
)
require.NoError(t, err)
sawTCP := false
sawWS := false
for _, addr := range h.Addrs() {
if strings.HasSuffix(addr.String(), "/tcp/8888") {
sawTCP = true
}
if strings.HasSuffix(addr.String(), "/tcp/8888/ws") {
sawWS = true
}
}
require.True(t, sawTCP)
require.True(t, sawWS)
h.Close()
}
12 changes: 12 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,3 +643,15 @@ func WithFxOption(opts ...fx.Option) Option {
return nil
}
}

// ShareTCPListener shares the same listen address between TCP and Websocket
// transports. This lets both transports use the same TCP port.
//
// Currently this behavior is Opt-in. In a future release this will be the
// default, and this option will be removed.
func ShareTCPListener() Option {
return func(cfg *Config) error {
cfg.ShareTCPListener = true
return nil
}
}
2 changes: 1 addition & 1 deletion p2p/net/swarm/dial_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func makeSwarmWithNoListenAddrs(t *testing.T, opts ...Option) *Swarm {
upgrader := makeUpgrader(t, s)
var tcpOpts []tcp.Option
tcpOpts = append(tcpOpts, tcp.DisableReuseport())
tcpTransport, err := tcp.NewTCPTransport(upgrader, nil, tcpOpts...)
tcpTransport, err := tcp.NewTCPTransport(upgrader, nil, nil, tcpOpts...)
require.NoError(t, err)
if err := s.AddTransport(tcpTransport); err != nil {
t.Fatal(err)
Expand Down
2 changes: 1 addition & 1 deletion p2p/net/swarm/swarm_addr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestDialAddressSelection(t *testing.T) {
s, err := swarm.NewSwarm("local", nil, eventbus.NewBus())
require.NoError(t, err)

tcpTr, err := tcp.NewTCPTransport(nil, nil)
tcpTr, err := tcp.NewTCPTransport(nil, nil, nil)
require.NoError(t, err)
require.NoError(t, s.AddTransport(tcpTr))
reuse, err := quicreuse.NewConnManager(quic.StatelessResetKey{}, quic.TokenGeneratorKey{})
Expand Down
8 changes: 4 additions & 4 deletions p2p/net/swarm/swarm_dial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestAddrsForDial(t *testing.T) {
ps.AddPrivKey(id, priv)
t.Cleanup(func() { ps.Close() })

tpt, err := websocket.New(nil, &network.NullResourceManager{})
tpt, err := websocket.New(nil, &network.NullResourceManager{}, nil)
require.NoError(t, err)
s, err := NewSwarm(id, ps, eventbus.NewBus(), WithMultiaddrResolver(ResolverFromMaDNS{resolver}))
require.NoError(t, err)
Expand Down Expand Up @@ -100,7 +100,7 @@ func TestDedupAddrsForDial(t *testing.T) {
require.NoError(t, err)
defer s.Close()

tpt, err := tcp.NewTCPTransport(nil, &network.NullResourceManager{})
tpt, err := tcp.NewTCPTransport(nil, &network.NullResourceManager{}, nil)
require.NoError(t, err)
err = s.AddTransport(tpt)
require.NoError(t, err)
Expand Down Expand Up @@ -134,7 +134,7 @@ func newTestSwarmWithResolver(t *testing.T, resolver *madns.Resolver) *Swarm {
})

// Add a tcp transport so that we know we can dial a tcp multiaddr and we don't filter it out.
tpt, err := tcp.NewTCPTransport(nil, &network.NullResourceManager{})
tpt, err := tcp.NewTCPTransport(nil, &network.NullResourceManager{}, nil)
require.NoError(t, err)
err = s.AddTransport(tpt)
require.NoError(t, err)
Expand All @@ -151,7 +151,7 @@ func newTestSwarmWithResolver(t *testing.T, resolver *madns.Resolver) *Swarm {
err = s.AddTransport(wtTpt)
require.NoError(t, err)

wsTpt, err := websocket.New(nil, &network.NullResourceManager{})
wsTpt, err := websocket.New(nil, &network.NullResourceManager{}, nil)
require.NoError(t, err)
err = s.AddTransport(wsTpt)
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion p2p/net/swarm/testing/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func GenSwarm(t testing.TB, opts ...Option) *swarm.Swarm {
if cfg.disableReuseport {
tcpOpts = append(tcpOpts, tcp.DisableReuseport())
}
tcpTransport, err := tcp.NewTCPTransport(upgrader, nil, tcpOpts...)
tcpTransport, err := tcp.NewTCPTransport(upgrader, nil, nil, tcpOpts...)
require.NoError(t, err)
if err := s.AddTransport(tcpTransport); err != nil {
t.Fatal(err)
Expand Down
38 changes: 24 additions & 14 deletions p2p/net/upgrader/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,23 +84,33 @@ func (l *listener) handleIncoming() {
}
catcher.Reset()

// gate the connection if applicable
if l.upgrader.connGater != nil && !l.upgrader.connGater.InterceptAccept(maconn) {
log.Debugf("gater blocked incoming connection on local addr %s from %s",
maconn.LocalMultiaddr(), maconn.RemoteMultiaddr())
if err := maconn.Close(); err != nil {
log.Warnf("failed to close incoming connection rejected by gater: %s", err)
}
continue
// Check if we already have a connection scope. See the comment in tcpreuse/listener.go for an explanation.
var connScope network.ConnManagementScope
if sc, ok := maconn.(interface {
Scope() network.ConnManagementScope
}); ok {
connScope = sc.Scope()
}
if connScope == nil {
// gate the connection if applicable
if l.upgrader.connGater != nil && !l.upgrader.connGater.InterceptAccept(maconn) {
log.Debugf("gater blocked incoming connection on local addr %s from %s",
maconn.LocalMultiaddr(), maconn.RemoteMultiaddr())
if err := maconn.Close(); err != nil {
log.Warnf("failed to close incoming connection rejected by gater: %s", err)
}
continue
}

connScope, err := l.rcmgr.OpenConnection(network.DirInbound, true, maconn.RemoteMultiaddr())
if err != nil {
log.Debugw("resource manager blocked accept of new connection", "error", err)
if err := maconn.Close(); err != nil {
log.Warnf("failed to incoming connection rejected by resource manager: %s", err)
var err error
connScope, err = l.rcmgr.OpenConnection(network.DirInbound, true, maconn.RemoteMultiaddr())
if err != nil {
log.Debugw("resource manager blocked accept of new connection", "error", err)
if err := maconn.Close(); err != nil {
log.Warnf("failed to open incoming connection. Rejected by resource manager: %s", err)
}
continue
}
continue
}

// The go routine below calls Release when the context is
Expand Down
2 changes: 1 addition & 1 deletion p2p/protocol/circuitv2/relay/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func getNetHosts(t *testing.T, ctx context.Context, n int) (hosts []host.Host, u
upgrader := swarmt.GenUpgrader(t, netw, nil)
upgraders = append(upgraders, upgrader)

tpt, err := tcp.NewTCPTransport(upgrader, nil)
tpt, err := tcp.NewTCPTransport(upgrader, nil, nil)
if err != nil {
t.Fatal(err)
}
Expand Down
25 changes: 24 additions & 1 deletion p2p/test/transport/gating_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package transport_integration

import (
"context"
"encoding/binary"
"net/netip"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -30,6 +32,23 @@ func stripCertHash(addr ma.Multiaddr) ma.Multiaddr {
return addr
}

func addrPort(addr ma.Multiaddr) netip.AddrPort {
a := netip.Addr{}
p := uint16(0)
ma.ForEach(addr, func(c ma.Component) bool {
if c.Protocol().Code == ma.P_IP4 || c.Protocol().Code == ma.P_IP6 {
a, _ = netip.AddrFromSlice(c.RawValue())
return false
}
if c.Protocol().Code == ma.P_UDP || c.Protocol().Code == ma.P_TCP {
p = binary.BigEndian.Uint16(c.RawValue())
return true
}
return false
})
return netip.AddrPortFrom(a, p)
}

func TestInterceptPeerDial(t *testing.T) {
if race.WithRace() {
t.Skip("The upgrader spawns a new Go routine, which leads to race conditions when using GoMock.")
Expand Down Expand Up @@ -173,10 +192,14 @@ func TestInterceptAccept(t *testing.T) {
// remove the certhash component from WebTransport addresses
require.Equal(t, stripCertHash(h2.Addrs()[0]), addrs.LocalMultiaddr())
}).AnyTimes()
} else if strings.Contains(tc.Name, "WebSocket-Shared") {
connGater.EXPECT().InterceptAccept(gomock.Any()).Do(func(addrs network.ConnMultiaddrs) {
require.Equal(t, addrPort(h2.Addrs()[0]), addrPort(addrs.LocalMultiaddr()))
})
} else {
connGater.EXPECT().InterceptAccept(gomock.Any()).Do(func(addrs network.ConnMultiaddrs) {
// remove the certhash component from WebTransport addresses
require.Equal(t, stripCertHash(h2.Addrs()[0]), addrs.LocalMultiaddr())
require.Equal(t, stripCertHash(h2.Addrs()[0]), addrs.LocalMultiaddr(), "%s\n%s", h2.Addrs()[0], addrs.LocalMultiaddr())
})
}

Expand Down
32 changes: 32 additions & 0 deletions p2p/test/transport/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,38 @@ var transportsToTest = []TransportTestCase{
return h
},
},
{
Name: "TCP-Shared / TLS / Yamux",
HostGenerator: func(t *testing.T, opts TransportTestCaseOpts) host.Host {
libp2pOpts := transformOpts(opts)
libp2pOpts = append(libp2pOpts, libp2p.ShareTCPListener())
libp2pOpts = append(libp2pOpts, libp2p.Security(tls.ID, tls.New))
libp2pOpts = append(libp2pOpts, libp2p.Muxer(yamux.ID, yamux.DefaultTransport))
if opts.NoListen {
libp2pOpts = append(libp2pOpts, libp2p.NoListenAddrs)
} else {
libp2pOpts = append(libp2pOpts, libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"))
}
h, err := libp2p.New(libp2pOpts...)
require.NoError(t, err)
return h
},
},
{
Name: "WebSocket-Shared",
HostGenerator: func(t *testing.T, opts TransportTestCaseOpts) host.Host {
libp2pOpts := transformOpts(opts)
libp2pOpts = append(libp2pOpts, libp2p.ShareTCPListener())
if opts.NoListen {
libp2pOpts = append(libp2pOpts, libp2p.NoListenAddrs)
} else {
libp2pOpts = append(libp2pOpts, libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0/ws"))
}
h, err := libp2p.New(libp2pOpts...)
require.NoError(t, err)
return h
},
},
{
Name: "WebSocket",
HostGenerator: func(t *testing.T, opts TransportTestCaseOpts) host.Host {
Expand Down
54 changes: 54 additions & 0 deletions p2p/transport/tcp/metrics_unix_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// go:build: unix

package tcp

import (
"testing"

tptu "github.com/libp2p/go-libp2p/p2p/net/upgrader"
"github.com/libp2p/go-libp2p/p2p/transport/tcpreuse"
ttransport "github.com/libp2p/go-libp2p/p2p/transport/testsuite"

"github.com/stretchr/testify/require"
)

func TestTcpTransportCollectsMetricsWithSharedTcpSocket(t *testing.T) {

peerA, ia := makeInsecureMuxer(t)
_, ib := makeInsecureMuxer(t)

sharedTCPSocketA := tcpreuse.NewConnMgr(false, nil, nil)
sharedTCPSocketB := tcpreuse.NewConnMgr(false, nil, nil)

ua, err := tptu.New(ia, muxers, nil, nil, nil)
require.NoError(t, err)
ta, err := NewTCPTransport(ua, nil, sharedTCPSocketA, WithMetrics())
require.NoError(t, err)
ub, err := tptu.New(ib, muxers, nil, nil, nil)
require.NoError(t, err)
tb, err := NewTCPTransport(ub, nil, sharedTCPSocketB, WithMetrics())
require.NoError(t, err)

zero := "/ip4/127.0.0.1/tcp/0"

// Not running any test that needs more than 1 conn because the testsuite
// opens multiple conns via multiple listeners, which is not expected to work
// with the shared TCP socket.
subtestsToRun := []ttransport.TransportSubTestFn{
ttransport.SubtestProtocols,
ttransport.SubtestBasic,
ttransport.SubtestCancel,
ttransport.SubtestPingPong,

// Stolen from the stream muxer test suite.
ttransport.SubtestStress1Conn1Stream1Msg,
ttransport.SubtestStress1Conn1Stream100Msg,
ttransport.SubtestStress1Conn100Stream100Msg,
ttransport.SubtestStress1Conn1000Stream10Msg,
ttransport.SubtestStress1Conn100Stream100Msg10MB,
ttransport.SubtestStreamOpenStress,
ttransport.SubtestStreamReset,
}

ttransport.SubtestTransportWithFs(t, ta, tb, zero, peerA, subtestsToRun)
}
Loading
Loading