From 8dea1942614acc6442722ed153e8add5e326c842 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sun, 20 Nov 2022 13:23:45 +1300 Subject: [PATCH 1/4] core/network: rename ConnectionState.NextProto to StreamMultiplexer --- core/network/conn.go | 2 +- p2p/net/upgrader/conn.go | 2 +- p2p/net/upgrader/upgrader.go | 2 +- p2p/security/noise/session.go | 2 +- p2p/security/noise/transport_test.go | 4 ++-- p2p/security/tls/transport.go | 2 +- p2p/security/tls/transport_test.go | 4 ++-- p2p/test/muxer-negotiation/muxer_test.go | 2 +- 8 files changed, 10 insertions(+), 10 deletions(-) diff --git a/core/network/conn.go b/core/network/conn.go index 18414b062c..550b20c27f 100644 --- a/core/network/conn.go +++ b/core/network/conn.go @@ -39,7 +39,7 @@ type ConnectionState struct { // The next protocol used for stream muxer selection. This is derived from // security protocol handshake, for example, Noise handshake payload or // TLS/ALPN negotiation. - NextProto string + StreamMultiplexer string } // ConnSecurity is the interface that one can mix into a connection interface to diff --git a/p2p/net/upgrader/conn.go b/p2p/net/upgrader/conn.go index fa73649144..e3c87547cd 100644 --- a/p2p/net/upgrader/conn.go +++ b/p2p/net/upgrader/conn.go @@ -54,5 +54,5 @@ func (t *transportConn) Close() error { } func (t *transportConn) ConnState() network.ConnectionState { - return network.ConnectionState{NextProto: string(t.muxer)} + return network.ConnectionState{StreamMultiplexer: string(t.muxer)} } diff --git a/p2p/net/upgrader/upgrader.go b/p2p/net/upgrader/upgrader.go index 403034db3a..da893d406e 100644 --- a/p2p/net/upgrader/upgrader.go +++ b/p2p/net/upgrader/upgrader.go @@ -236,7 +236,7 @@ func (u *upgrader) getMuxerByID(id string) *StreamMuxer { } func (u *upgrader) setupMuxer(ctx context.Context, conn sec.SecureConn, server bool, scope network.PeerScope) (protocol.ID, network.MuxedConn, error) { - muxerSelected := conn.ConnState().NextProto + muxerSelected := conn.ConnState().StreamMultiplexer // Use muxer selected from security handshake if available. Otherwise fall back to multistream-selection. if len(muxerSelected) > 0 { m := u.getMuxerByID(muxerSelected) diff --git a/p2p/security/noise/session.go b/p2p/security/noise/session.go index f7e75e64ae..93ce5217be 100644 --- a/p2p/security/noise/session.go +++ b/p2p/security/noise/session.go @@ -136,7 +136,7 @@ func (s *secureSession) Close() error { func SessionWithConnState(s *secureSession, muxer string) *secureSession { if s != nil { - s.connectionState.NextProto = muxer + s.connectionState.StreamMultiplexer = muxer } return s } diff --git a/p2p/security/noise/transport_test.go b/p2p/security/noise/transport_test.go index 056afa3980..0912dab448 100644 --- a/p2p/security/noise/transport_test.go +++ b/p2p/security/noise/transport_test.go @@ -689,8 +689,8 @@ func TestHandshakeWithTransportEarlyData(t *testing.T) { defer initConn.Close() defer respConn.Close() - require.Equal(t, expectedProto, initConn.connectionState.NextProto) - require.Equal(t, expectedProto, respConn.connectionState.NextProto) + require.Equal(t, expectedProto, initConn.connectionState.StreamMultiplexer) + require.Equal(t, expectedProto, respConn.connectionState.StreamMultiplexer) initData := []byte("Test data for noise transport") _, err := initConn.Write(initData) diff --git a/p2p/security/tls/transport.go b/p2p/security/tls/transport.go index a78abec279..b036bf8911 100644 --- a/p2p/security/tls/transport.go +++ b/p2p/security/tls/transport.go @@ -171,7 +171,7 @@ func (t *Transport) setupConn(tlsConn *tls.Conn, remotePubKey ci.PubKey) (sec.Se privKey: t.privKey, remotePeer: remotePeerID, remotePubKey: remotePubKey, - connectionState: network.ConnectionState{NextProto: nextProto}, + connectionState: network.ConnectionState{StreamMultiplexer: nextProto}, }, nil } diff --git a/p2p/security/tls/transport_test.go b/p2p/security/tls/transport_test.go index 0d42eb1e80..aa3d3edacc 100644 --- a/p2p/security/tls/transport_test.go +++ b/p2p/security/tls/transport_test.go @@ -255,7 +255,7 @@ func TestHandshakeWithNextProtoSucceeds(t *testing.T) { require.Equal(t, serverConn.RemotePeer(), clientID) require.True(t, clientConn.RemotePublicKey().Equals(serverKey.GetPublic()), "server public key mismatch") require.True(t, serverConn.RemotePublicKey().Equals(clientKey.GetPublic()), "client public key mismatch") - require.Equal(t, clientConn.ConnState().NextProto, expectedMuxer) + require.Equal(t, clientConn.ConnState().StreamMultiplexer, expectedMuxer) // exchange some data _, err = serverConn.Write([]byte("foobar")) require.NoError(t, err) @@ -265,7 +265,7 @@ func TestHandshakeWithNextProtoSucceeds(t *testing.T) { require.Equal(t, string(b), "foobar") } - // Iterate through the NextProto combinations. + // Iterate through the StreamMultiplexer combinations. for _, test := range tests { clientMuxers := make([]tptu.StreamMuxer, 0, len(test.clientProtos)) for _, id := range test.clientProtos { diff --git a/p2p/test/muxer-negotiation/muxer_test.go b/p2p/test/muxer-negotiation/muxer_test.go index a36bfbde5f..2bfb880615 100644 --- a/p2p/test/muxer-negotiation/muxer_test.go +++ b/p2p/test/muxer-negotiation/muxer_test.go @@ -119,7 +119,7 @@ func TestMuxerNegotiation(t *testing.T) { require.NoError(t, err) conns := client.Network().ConnsToPeer(server.ID()) require.Len(t, conns, 1, "expected exactly one connection") - require.Equal(t, tc.Expected, protocol.ID(conns[0].ConnState().NextProto)) + require.Equal(t, tc.Expected, protocol.ID(conns[0].ConnState().StreamMultiplexer)) }) } } From 9e136c253114e2e0fab8001a911b2131e1852531 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sun, 20 Nov 2022 14:29:49 +1300 Subject: [PATCH 2/4] swarm: move the additional peer ID check to the swarm Only a subset of the transports use the ugprader (which uses the SSMuxer). It's better to perform this check for every transport. --- p2p/net/conn-security-multistream/ssms.go | 17 ++++------------- p2p/net/swarm/swarm_dial.go | 7 +++++++ 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/p2p/net/conn-security-multistream/ssms.go b/p2p/net/conn-security-multistream/ssms.go index 595d8dfde6..676dace798 100644 --- a/p2p/net/conn-security-multistream/ssms.go +++ b/p2p/net/conn-security-multistream/ssms.go @@ -3,7 +3,6 @@ package csms import ( "context" "fmt" - "log" "net" "github.com/libp2p/go-libp2p/core/peer" @@ -57,23 +56,15 @@ func (sm *SSMuxer) SecureOutbound(ctx context.Context, insecure net.Conn, p peer return nil, false, err } - var sconn sec.SecureConn if server { - sconn, err = tpt.SecureInbound(ctx, insecure, p) + sconn, err := tpt.SecureInbound(ctx, insecure, p) if err != nil { return nil, false, fmt.Errorf("failed to secure inbound connection: %s", err) } - // ensure the correct peer connected to us - if sconn.RemotePeer() != p { - sconn.Close() - log.Printf("Handshake failed to properly authenticate peer. Authenticated %s, expected %s.", sconn.RemotePeer(), p) - return nil, false, fmt.Errorf("unexpected peer") - } - } else { - sconn, err = tpt.SecureOutbound(ctx, insecure, p) + return sconn, true, nil } - - return sconn, server, err + sconn, err := tpt.SecureOutbound(ctx, insecure, p) + return sconn, false, err } func (sm *SSMuxer) selectProto(ctx context.Context, insecure net.Conn, server bool) (sec.SecureTransport, bool, error) { diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index da9b10c174..29703f7747 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -269,6 +269,13 @@ func (s *Swarm) dialPeer(ctx context.Context, p peer.ID) (*Conn, error) { conn, err = s.dsync.Dial(ctx, p) if err == nil { + // Ensure we connected to the correct peer. + // This was most likely already checked by the security protocol, but it doesn't hurt do it again here. + if conn.RemotePeer() != p { + conn.Close() + log.Errorw("Handshake failed to properly authenticate peer", "authenticated", conn.RemotePeer(), "expected", p) + return nil, fmt.Errorf("unexpected peer") + } return conn, nil } From e2a246d5b64f01d5881e9ae2660bc8f773b90402 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sun, 20 Nov 2022 16:43:53 +1300 Subject: [PATCH 3/4] upgrader: absorb SSMuxer into the upgrader --- config/config.go | 30 +++-- config/security.go | 23 ---- core/sec/security.go | 15 --- p2p/net/conn-security-multistream/ssms.go | 101 --------------- .../conn-security-multistream/ssms_test.go | 121 ------------------ p2p/net/swarm/dial_worker_test.go | 7 +- p2p/net/swarm/testing/testing.go | 7 +- p2p/net/upgrader/listener_test.go | 48 ++++--- p2p/net/upgrader/upgrader.go | 95 +++++++++++--- p2p/net/upgrader/upgrader_test.go | 19 +-- p2p/transport/tcp/tcp_test.go | 7 +- p2p/transport/websocket/websocket_test.go | 21 +-- 12 files changed, 152 insertions(+), 342 deletions(-) delete mode 100644 config/security.go delete mode 100644 p2p/net/conn-security-multistream/ssms.go delete mode 100644 p2p/net/conn-security-multistream/ssms_test.go diff --git a/config/config.go b/config/config.go index ea674c0d9b..d98d3d91fc 100644 --- a/config/config.go +++ b/config/config.go @@ -14,6 +14,8 @@ import ( "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/pnet" "github.com/libp2p/go-libp2p/core/routing" + "github.com/libp2p/go-libp2p/core/sec" + "github.com/libp2p/go-libp2p/core/sec/insecure" "github.com/libp2p/go-libp2p/core/transport" "github.com/libp2p/go-libp2p/p2p/host/autonat" "github.com/libp2p/go-libp2p/p2p/host/autorelay" @@ -167,20 +169,9 @@ func (cfg *Config) addTransports(h host.Host) error { return fmt.Errorf("swarm does not support transports") } - var security []fx.Option - if cfg.Insecure { - security = append(security, fx.Provide(makeInsecureTransport)) - } else { - security = cfg.SecurityTransports - } - fxopts := []fx.Option{ fx.WithLogger(func() fxevent.Logger { return getFXLogger() }), - fx.Provide(tptu.New), - fx.Provide(fx.Annotate( - makeSecurityMuxer, - fx.ParamTags(`group:"security"`), - )), + fx.Provide(fx.Annotate(tptu.New, fx.ParamTags(`group:"security"`))), fx.Supply(cfg.Muxers), fx.Supply(h.ID()), fx.Provide(func() host.Host { return h }), @@ -191,8 +182,19 @@ func (cfg *Config) addTransports(h host.Host) error { fx.Provide(func() *madns.Resolver { return cfg.MultiaddrResolver }), } fxopts = append(fxopts, cfg.Transports...) - if !cfg.Insecure { - fxopts = append(fxopts, security...) + if cfg.Insecure { + fxopts = append(fxopts, + fx.Provide( + fx.Annotate( + func(id peer.ID, priv crypto.PrivKey) sec.SecureTransport { + return insecure.NewWithIdentity(insecure.ID, id, priv) + }, + fx.ResultTags(`group:"security"`), + ), + ), + ) + } else { + fxopts = append(fxopts, cfg.SecurityTransports...) } fxopts = append(fxopts, fx.Invoke( diff --git a/config/security.go b/config/security.go deleted file mode 100644 index 1a88575f5d..0000000000 --- a/config/security.go +++ /dev/null @@ -1,23 +0,0 @@ -package config - -import ( - "github.com/libp2p/go-libp2p/core/crypto" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/libp2p/go-libp2p/core/sec" - "github.com/libp2p/go-libp2p/core/sec/insecure" - csms "github.com/libp2p/go-libp2p/p2p/net/conn-security-multistream" -) - -func makeInsecureTransport(id peer.ID, privKey crypto.PrivKey) sec.SecureMuxer { - secMuxer := new(csms.SSMuxer) - secMuxer.AddTransport(insecure.ID, insecure.NewWithIdentity(insecure.ID, id, privKey)) - return secMuxer -} - -func makeSecurityMuxer(tpts []sec.SecureTransport) sec.SecureMuxer { - secMuxer := new(csms.SSMuxer) - for _, tpt := range tpts { - secMuxer.AddTransport(string(tpt.ID()), tpt) - } - return secMuxer -} diff --git a/core/sec/security.go b/core/sec/security.go index 8b733b5d06..83059d94ca 100644 --- a/core/sec/security.go +++ b/core/sec/security.go @@ -29,18 +29,3 @@ type SecureTransport interface { // ID is the protocol ID of the security protocol. ID() protocol.ID } - -// A SecureMuxer is a wrapper around SecureTransport which can select security protocols -// and open outbound connections with simultaneous open. -type SecureMuxer interface { - // SecureInbound secures an inbound connection. - // The returned boolean indicates whether the connection should be treated as a server - // connection; in the case of SecureInbound it should always be true. - // If p is empty, connections from any peer are accepted. - SecureInbound(ctx context.Context, insecure net.Conn, p peer.ID) (SecureConn, bool, error) - - // SecureOutbound secures an outbound connection. - // The returned boolean indicates whether the connection should be treated as a server - // connection due to simultaneous open. - SecureOutbound(ctx context.Context, insecure net.Conn, p peer.ID) (SecureConn, bool, error) -} diff --git a/p2p/net/conn-security-multistream/ssms.go b/p2p/net/conn-security-multistream/ssms.go deleted file mode 100644 index 676dace798..0000000000 --- a/p2p/net/conn-security-multistream/ssms.go +++ /dev/null @@ -1,101 +0,0 @@ -package csms - -import ( - "context" - "fmt" - "net" - - "github.com/libp2p/go-libp2p/core/peer" - "github.com/libp2p/go-libp2p/core/sec" - - mss "github.com/multiformats/go-multistream" -) - -// SSMuxer is a multistream stream security transport multiplexer. -// -// SSMuxer is safe to use without initialization. However, it's not safe to move -// after use. -type SSMuxer struct { - mux mss.MultistreamMuxer - tpts map[string]sec.SecureTransport - OrderPreference []string -} - -var _ sec.SecureMuxer = (*SSMuxer)(nil) - -// AddTransport adds a stream security transport to this multistream muxer. -// -// This method is *not* thread-safe. It should be called only when initializing -// the SSMuxer. -func (sm *SSMuxer) AddTransport(path string, transport sec.SecureTransport) { - if sm.tpts == nil { - sm.tpts = make(map[string]sec.SecureTransport, 1) - } - - sm.mux.AddHandler(path, nil) - sm.tpts[path] = transport - sm.OrderPreference = append(sm.OrderPreference, path) -} - -// SecureInbound secures an inbound connection using this multistream -// multiplexed stream security transport. -func (sm *SSMuxer) SecureInbound(ctx context.Context, insecure net.Conn, p peer.ID) (sec.SecureConn, bool, error) { - tpt, _, err := sm.selectProto(ctx, insecure, true) - if err != nil { - return nil, false, err - } - sconn, err := tpt.SecureInbound(ctx, insecure, p) - return sconn, true, err -} - -// SecureOutbound secures an outbound connection using this multistream -// multiplexed stream security transport. -func (sm *SSMuxer) SecureOutbound(ctx context.Context, insecure net.Conn, p peer.ID) (sec.SecureConn, bool, error) { - tpt, server, err := sm.selectProto(ctx, insecure, false) - if err != nil { - return nil, false, err - } - - if server { - sconn, err := tpt.SecureInbound(ctx, insecure, p) - if err != nil { - return nil, false, fmt.Errorf("failed to secure inbound connection: %s", err) - } - return sconn, true, nil - } - sconn, err := tpt.SecureOutbound(ctx, insecure, p) - return sconn, false, err -} - -func (sm *SSMuxer) selectProto(ctx context.Context, insecure net.Conn, server bool) (sec.SecureTransport, bool, error) { - var proto string - var err error - var iamserver bool - done := make(chan struct{}) - go func() { - defer close(done) - if server { - iamserver = true - proto, _, err = sm.mux.Negotiate(insecure) - } else { - proto, iamserver, err = mss.SelectWithSimopenOrFail(sm.OrderPreference, insecure) - } - }() - - select { - case <-done: - if err != nil { - return nil, false, err - } - if tpt, ok := sm.tpts[proto]; ok { - return tpt, iamserver, nil - } - return nil, false, fmt.Errorf("selected unknown security transport") - case <-ctx.Done(): - // We *must* do this. We have outstanding work on the connection - // and it's no longer safe to use. - insecure.Close() - <-done // wait to stop using the connection. - return nil, false, ctx.Err() - } -} diff --git a/p2p/net/conn-security-multistream/ssms_test.go b/p2p/net/conn-security-multistream/ssms_test.go deleted file mode 100644 index 3ccf4a7f26..0000000000 --- a/p2p/net/conn-security-multistream/ssms_test.go +++ /dev/null @@ -1,121 +0,0 @@ -package csms - -import ( - "context" - "crypto/rand" - "io" - "net" - "sync" - "testing" - - "github.com/libp2p/go-libp2p/core/crypto" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/libp2p/go-libp2p/core/sec" - "github.com/libp2p/go-libp2p/core/sec/insecure" - - "github.com/stretchr/testify/require" -) - -func newPeer(t *testing.T) (crypto.PrivKey, peer.ID) { - priv, _, err := crypto.GenerateEd25519Key(rand.Reader) - require.NoError(t, err) - id, err := peer.IDFromPrivateKey(priv) - require.NoError(t, err) - return priv, id -} - -type TransportAdapter struct { - mux *SSMuxer -} - -func (sm *TransportAdapter) SecureInbound(ctx context.Context, insecure net.Conn, p peer.ID) (sec.SecureConn, error) { - sconn, _, err := sm.mux.SecureInbound(ctx, insecure, p) - return sconn, err -} - -func (sm *TransportAdapter) SecureOutbound(ctx context.Context, insecure net.Conn, p peer.ID) (sec.SecureConn, error) { - sconn, _, err := sm.mux.SecureOutbound(ctx, insecure, p) - return sconn, err -} - -func TestCommonProto(t *testing.T) { - privA, idA := newPeer(t) - privB, idB := newPeer(t) - - var at, bt SSMuxer - - atInsecure := insecure.NewWithIdentity(insecure.ID, idA, privA) - btInsecure := insecure.NewWithIdentity(insecure.ID, idB, privB) - at.AddTransport("/plaintext/1.0.0", atInsecure) - bt.AddTransport("/plaintext/1.1.0", btInsecure) - bt.AddTransport("/plaintext/1.0.0", btInsecure) - - ln, err := net.Listen("tcp", "localhost:0") - require.NoError(t, err) - muxB := &TransportAdapter{mux: &bt} - connChan := make(chan sec.SecureConn) - go func() { - conn, err := ln.Accept() - require.NoError(t, err) - c, err := muxB.SecureInbound(context.Background(), conn, idA) - require.NoError(t, err) - connChan <- c - }() - - muxA := &TransportAdapter{mux: &at} - - cconn, err := net.Dial("tcp", ln.Addr().String()) - require.NoError(t, err) - - cc, err := muxA.SecureOutbound(context.Background(), cconn, idB) - require.NoError(t, err) - require.Equal(t, cc.LocalPeer(), idA) - require.Equal(t, cc.RemotePeer(), idB) - _, err = cc.Write([]byte("foobar")) - require.NoError(t, err) - require.NoError(t, cc.Close()) - - sc := <-connChan - require.Equal(t, sc.LocalPeer(), idB) - require.Equal(t, sc.RemotePeer(), idA) - b, err := io.ReadAll(sc) - require.NoError(t, err) - require.Equal(t, "foobar", string(b)) -} - -func TestNoCommonProto(t *testing.T) { - privA, idA := newPeer(t) - privB, idB := newPeer(t) - - var at, bt SSMuxer - atInsecure := insecure.NewWithIdentity(insecure.ID, idA, privA) - btInsecure := insecure.NewWithIdentity(insecure.ID, idB, privB) - - at.AddTransport("/plaintext/1.0.0", atInsecure) - bt.AddTransport("/plaintext/1.1.0", btInsecure) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - a, b := net.Pipe() - - var wg sync.WaitGroup - wg.Add(2) - go func() { - defer wg.Done() - defer a.Close() - _, _, err := at.SecureInbound(ctx, a, "") - if err == nil { - t.Error("connection should have failed") - } - }() - - go func() { - defer wg.Done() - defer b.Close() - _, _, err := bt.SecureOutbound(ctx, b, "peerA") - if err == nil { - t.Error("connection should have failed") - } - }() - wg.Wait() -} diff --git a/p2p/net/swarm/dial_worker_test.go b/p2p/net/swarm/dial_worker_test.go index 14ff3d64d5..346550c1ea 100644 --- a/p2p/net/swarm/dial_worker_test.go +++ b/p2p/net/swarm/dial_worker_test.go @@ -12,11 +12,11 @@ import ( "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/libp2p/go-libp2p/core/sec" "github.com/libp2p/go-libp2p/core/sec/insecure" "github.com/libp2p/go-libp2p/core/transport" "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem" "github.com/libp2p/go-libp2p/p2p/muxer/yamux" - csms "github.com/libp2p/go-libp2p/p2p/net/conn-security-multistream" tptu "github.com/libp2p/go-libp2p/p2p/net/upgrader" quic "github.com/libp2p/go-libp2p/p2p/transport/quic" "github.com/libp2p/go-libp2p/p2p/transport/tcp" @@ -75,10 +75,9 @@ func makeSwarm(t *testing.T) *Swarm { func makeUpgrader(t *testing.T, n *Swarm) transport.Upgrader { id := n.LocalPeer() pk := n.Peerstore().PrivKey(id) - secMuxer := new(csms.SSMuxer) - secMuxer.AddTransport(insecure.ID, insecure.NewWithIdentity(insecure.ID, id, pk)) + st := insecure.NewWithIdentity(insecure.ID, id, pk) - u, err := tptu.New(secMuxer, []tptu.StreamMuxer{{ID: "/yamux/1.0.0", Muxer: yamux.DefaultTransport}}, nil, nil, nil) + u, err := tptu.New([]sec.SecureTransport{st}, []tptu.StreamMuxer{{ID: "/yamux/1.0.0", Muxer: yamux.DefaultTransport}}, nil, nil, nil) require.NoError(t, err) return u } diff --git a/p2p/net/swarm/testing/testing.go b/p2p/net/swarm/testing/testing.go index 8b9420c444..08fe7ad0f3 100644 --- a/p2p/net/swarm/testing/testing.go +++ b/p2p/net/swarm/testing/testing.go @@ -12,11 +12,11 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/libp2p/go-libp2p/core/sec" "github.com/libp2p/go-libp2p/core/sec/insecure" "github.com/libp2p/go-libp2p/core/transport" "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem" "github.com/libp2p/go-libp2p/p2p/muxer/yamux" - csms "github.com/libp2p/go-libp2p/p2p/net/conn-security-multistream" "github.com/libp2p/go-libp2p/p2p/net/swarm" tptu "github.com/libp2p/go-libp2p/p2p/net/upgrader" quic "github.com/libp2p/go-libp2p/p2p/transport/quic" @@ -101,10 +101,9 @@ func OptPeerPrivateKey(sk crypto.PrivKey) Option { func GenUpgrader(t *testing.T, n *swarm.Swarm, connGater connmgr.ConnectionGater, opts ...tptu.Option) transport.Upgrader { id := n.LocalPeer() pk := n.Peerstore().PrivKey(id) - secMuxer := new(csms.SSMuxer) - secMuxer.AddTransport(insecure.ID, insecure.NewWithIdentity(insecure.ID, id, pk)) + st := insecure.NewWithIdentity(insecure.ID, id, pk) - u, err := tptu.New(secMuxer, []tptu.StreamMuxer{{ID: "/yamux/1.0.0", Muxer: yamux.DefaultTransport}}, nil, nil, connGater, opts...) + u, err := tptu.New([]sec.SecureTransport{st}, []tptu.StreamMuxer{{ID: "/yamux/1.0.0", Muxer: yamux.DefaultTransport}}, nil, nil, connGater, opts...) require.NoError(t, err) return u } diff --git a/p2p/net/upgrader/listener_test.go b/p2p/net/upgrader/listener_test.go index 0d872d1d12..26b13f1b2c 100644 --- a/p2p/net/upgrader/listener_test.go +++ b/p2p/net/upgrader/listener_test.go @@ -4,7 +4,6 @@ import ( "context" "errors" "io" - "net" "os" "sync" "sync/atomic" @@ -15,6 +14,7 @@ import ( mocknetwork "github.com/libp2p/go-libp2p/core/network/mocks" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/sec" + "github.com/libp2p/go-libp2p/core/sec/insecure" "github.com/libp2p/go-libp2p/core/transport" "github.com/libp2p/go-libp2p/p2p/net/upgrader" @@ -24,22 +24,6 @@ import ( "github.com/stretchr/testify/require" ) -type MuxAdapter struct { - tpt sec.SecureTransport -} - -var _ sec.SecureMuxer = &MuxAdapter{} - -func (mux *MuxAdapter) SecureInbound(ctx context.Context, insecure net.Conn, p peer.ID) (sec.SecureConn, bool, error) { - sconn, err := mux.tpt.SecureInbound(ctx, insecure, p) - return sconn, true, err -} - -func (mux *MuxAdapter) SecureOutbound(ctx context.Context, insecure net.Conn, p peer.ID) (sec.SecureConn, bool, error) { - sconn, err := mux.tpt.SecureOutbound(ctx, insecure, p) - return sconn, false, err -} - func createListener(t *testing.T, u transport.Upgrader) transport.Listener { t.Helper() addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/0") @@ -401,3 +385,33 @@ func TestListenerResourceManagementDenied(t *testing.T) { require.NoError(t, ln.Close()) <-done } + +func TestNoCommonSecurityProto(t *testing.T) { + idA, privA := newPeer(t) + idB, privB := newPeer(t) + atInsecure := insecure.NewWithIdentity("/plaintext1", idA, privA) + btInsecure := insecure.NewWithIdentity("/plaintext2", idB, privB) + + ua, err := upgrader.New([]sec.SecureTransport{atInsecure}, []upgrader.StreamMuxer{{ID: "negotiate", Muxer: &negotiatingMuxer{}}}, nil, nil, nil) + require.NoError(t, err) + ub, err := upgrader.New([]sec.SecureTransport{btInsecure}, []upgrader.StreamMuxer{{ID: "negotiate", Muxer: &negotiatingMuxer{}}}, nil, nil, nil) + require.NoError(t, err) + + ln := createListener(t, ua) + done := make(chan struct{}) + go func() { + defer close(done) + ln.Accept() + }() + + _, err = dial(t, ub, ln.Multiaddrs()[0], idA, &network.NullScope{}) + require.EqualError(t, err, "failed to negotiate security protocol: protocol not supported") + select { + case <-done: + t.Fatal("didn't expect to accept a connection") + case <-time.After(50 * time.Millisecond): + } + + ln.Close() + <-done +} diff --git a/p2p/net/upgrader/upgrader.go b/p2p/net/upgrader/upgrader.go index da893d406e..5c15417d46 100644 --- a/p2p/net/upgrader/upgrader.go +++ b/p2p/net/upgrader/upgrader.go @@ -49,15 +49,17 @@ type StreamMuxer struct { // Upgrader is a multistream upgrader that can upgrade an underlying connection // to a full transport connection (secure and multiplexed). type upgrader struct { - secure sec.SecureMuxer - psk ipnet.PSK connGater connmgr.ConnectionGater rcmgr network.ResourceManager - msmuxer *mss.MultistreamMuxer - muxers []StreamMuxer - muxerIDs []string + muxerMuxer *mss.MultistreamMuxer + muxers []StreamMuxer + muxerIDs []string + + security []sec.SecureTransport + securityMuxer *mss.MultistreamMuxer + securityIDs []string // AcceptTimeout is the maximum duration an Accept is allowed to take. // This includes the time between accepting the raw network connection, @@ -69,15 +71,16 @@ type upgrader struct { var _ transport.Upgrader = &upgrader{} -func New(secureMuxer sec.SecureMuxer, muxers []StreamMuxer, psk ipnet.PSK, rcmgr network.ResourceManager, connGater connmgr.ConnectionGater, opts ...Option) (transport.Upgrader, error) { +func New(security []sec.SecureTransport, muxers []StreamMuxer, psk ipnet.PSK, rcmgr network.ResourceManager, connGater connmgr.ConnectionGater, opts ...Option) (transport.Upgrader, error) { u := &upgrader{ - secure: secureMuxer, acceptTimeout: defaultAcceptTimeout, rcmgr: rcmgr, connGater: connGater, psk: psk, - msmuxer: mss.NewMultistreamMuxer(), + muxerMuxer: mss.NewMultistreamMuxer(), muxers: muxers, + security: security, + securityMuxer: mss.NewMultistreamMuxer(), } for _, opt := range opts { if err := opt(u); err != nil { @@ -89,9 +92,14 @@ func New(secureMuxer sec.SecureMuxer, muxers []StreamMuxer, psk ipnet.PSK, rcmgr } u.muxerIDs = make([]string, 0, len(muxers)) for _, m := range muxers { - u.msmuxer.AddHandler(string(m.ID), nil) + u.muxerMuxer.AddHandler(string(m.ID), nil) u.muxerIDs = append(u.muxerIDs, string(m.ID)) } + u.securityIDs = make([]string, 0, len(security)) + for _, s := range security { + u.securityMuxer.AddHandler(string(s.ID()), nil) + u.securityIDs = append(u.securityIDs, string(s.ID())) + } return u, nil } @@ -156,7 +164,7 @@ func (u *upgrader) upgrade(ctx context.Context, t transport.Transport, maconn ma log.Errorw("failed to close connection", "peer", p, "addr", maconn.RemoteMultiaddr(), "error", err) } return nil, fmt.Errorf("gater rejected connection with peer %s and addr %s with direction %d", - sconn.RemotePeer().Pretty(), maconn.RemoteMultiaddr(), dir) + sconn.RemotePeer(), maconn.RemoteMultiaddr(), dir) } // Only call SetPeer if it hasn't already been set -- this can happen when we don't know // the peer in advance and in some bug scenarios. @@ -167,7 +175,7 @@ func (u *upgrader) upgrade(ctx context.Context, t transport.Transport, maconn ma log.Errorw("failed to close connection", "peer", p, "addr", maconn.RemoteMultiaddr(), "error", err) } return nil, fmt.Errorf("resource manager connection with peer %s and addr %s with direction %d", - sconn.RemotePeer().Pretty(), maconn.RemoteMultiaddr(), dir) + sconn.RemotePeer(), maconn.RemoteMultiaddr(), dir) } } @@ -190,10 +198,19 @@ func (u *upgrader) upgrade(ctx context.Context, t transport.Transport, maconn ma } func (u *upgrader) setupSecurity(ctx context.Context, conn net.Conn, p peer.ID, dir network.Direction) (sec.SecureConn, bool, error) { - if dir == network.DirInbound { - return u.secure.SecureInbound(ctx, conn, p) + isServer := dir == network.DirInbound + var st sec.SecureTransport + var err error + st, isServer, err = u.negotiateSecurity(ctx, conn, isServer) + if err != nil { + return nil, false, err } - return u.secure.SecureOutbound(ctx, conn, p) + if isServer { + sconn, err := st.SecureInbound(ctx, conn, p) + return sconn, true, err + } + sconn, err := st.SecureOutbound(ctx, conn, p) + return sconn, false, err } func (u *upgrader) negotiateMuxer(nc net.Conn, isServer bool) (*StreamMuxer, error) { @@ -203,7 +220,7 @@ func (u *upgrader) negotiateMuxer(nc net.Conn, isServer bool) (*StreamMuxer, err var proto string if isServer { - selected, _, err := u.msmuxer.Negotiate(nc) + selected, _, err := u.muxerMuxer.Negotiate(nc) if err != nil { return nil, err } @@ -279,3 +296,51 @@ func (u *upgrader) setupMuxer(ctx context.Context, conn sec.SecureConn, server b return "", nil, ctx.Err() } } + +func (u *upgrader) getSecurityByID(id string) sec.SecureTransport { + for _, s := range u.security { + if string(s.ID()) == id { + return s + } + } + return nil +} + +func (u *upgrader) negotiateSecurity(ctx context.Context, insecure net.Conn, server bool) (sec.SecureTransport, bool, error) { + type result struct { + proto string + iamserver bool + err error + } + + done := make(chan result, 1) + go func() { + if server { + var r result + r.iamserver = true + r.proto, _, r.err = u.securityMuxer.Negotiate(insecure) + done <- r + return + } + var r result + r.proto, r.iamserver, r.err = mss.SelectWithSimopenOrFail(u.securityIDs, insecure) + done <- r + }() + + select { + case r := <-done: + if r.err != nil { + return nil, false, r.err + } + if s := u.getSecurityByID(r.proto); s != nil { + return s, r.iamserver, nil + } + return nil, false, fmt.Errorf("selected unknown security transport: %s", r.proto) + case <-ctx.Done(): + // We *must* do this. We have outstanding work on the connection + // and it's no longer safe to use. + insecure.Close() + <-done // wait to stop using the connection. + return nil, false, ctx.Err() + } +} diff --git a/p2p/net/upgrader/upgrader_test.go b/p2p/net/upgrader/upgrader_test.go index ccbd6ffac3..fbe9527aba 100644 --- a/p2p/net/upgrader/upgrader_test.go +++ b/p2p/net/upgrader/upgrader_test.go @@ -2,6 +2,7 @@ package upgrader_test import ( "context" + "crypto/rand" "errors" "net" "testing" @@ -11,8 +12,8 @@ import ( "github.com/libp2p/go-libp2p/core/network" mocknetwork "github.com/libp2p/go-libp2p/core/network/mocks" "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/sec" "github.com/libp2p/go-libp2p/core/sec/insecure" - "github.com/libp2p/go-libp2p/core/test" "github.com/libp2p/go-libp2p/core/transport" "github.com/libp2p/go-libp2p/p2p/muxer/yamux" "github.com/libp2p/go-libp2p/p2p/net/upgrader" @@ -39,12 +40,18 @@ func createUpgraderWithOpts(t *testing.T, opts ...upgrader.Option) (peer.ID, tra return createUpgraderWithMuxers(t, []upgrader.StreamMuxer{{ID: "negotiate", Muxer: &negotiatingMuxer{}}}, nil, nil, opts...) } -func createUpgraderWithMuxers(t *testing.T, muxers []upgrader.StreamMuxer, rcmgr network.ResourceManager, connGater connmgr.ConnectionGater, opts ...upgrader.Option) (peer.ID, transport.Upgrader) { - priv, _, err := test.RandTestKeyPair(crypto.Ed25519, 256) +func newPeer(t *testing.T) (peer.ID, crypto.PrivKey) { + t.Helper() + priv, _, err := crypto.GenerateEd25519Key(rand.Reader) require.NoError(t, err) id, err := peer.IDFromPrivateKey(priv) require.NoError(t, err) - u, err := upgrader.New(&MuxAdapter{tpt: insecure.NewWithIdentity(insecure.ID, id, priv)}, muxers, nil, rcmgr, connGater, opts...) + return id, priv +} + +func createUpgraderWithMuxers(t *testing.T, muxers []upgrader.StreamMuxer, rcmgr network.ResourceManager, connGater connmgr.ConnectionGater, opts ...upgrader.Option) (peer.ID, transport.Upgrader) { + id, priv := newPeer(t) + u, err := upgrader.New([]sec.SecureTransport{insecure.NewWithIdentity(insecure.ID, id, priv)}, muxers, nil, rcmgr, connGater, opts...) require.NoError(t, err) return id, u } @@ -194,8 +201,4 @@ func TestOutboundResourceManagement(t *testing.T) { _, err := dial(t, dialUpgrader, ln.Multiaddrs()[0], id, connScope) require.Error(t, err) }) - - t.Run("blocked by the resource manager", func(t *testing.T) { - - }) } diff --git a/p2p/transport/tcp/tcp_test.go b/p2p/transport/tcp/tcp_test.go index b320c5a40b..1dacc71952 100644 --- a/p2p/transport/tcp/tcp_test.go +++ b/p2p/transport/tcp/tcp_test.go @@ -13,7 +13,6 @@ import ( "github.com/libp2p/go-libp2p/core/sec/insecure" "github.com/libp2p/go-libp2p/core/transport" "github.com/libp2p/go-libp2p/p2p/muxer/yamux" - csms "github.com/libp2p/go-libp2p/p2p/net/conn-security-multistream" tptu "github.com/libp2p/go-libp2p/p2p/net/upgrader" ttransport "github.com/libp2p/go-libp2p/p2p/transport/testsuite" @@ -148,13 +147,11 @@ func TestTcpTransportCantListenUtp(t *testing.T) { envReuseportVal = true } -func makeInsecureMuxer(t *testing.T) (peer.ID, sec.SecureMuxer) { +func makeInsecureMuxer(t *testing.T) (peer.ID, []sec.SecureTransport) { t.Helper() priv, _, err := crypto.GenerateKeyPair(crypto.Ed25519, 256) require.NoError(t, err) id, err := peer.IDFromPrivateKey(priv) require.NoError(t, err) - var secMuxer csms.SSMuxer - secMuxer.AddTransport(insecure.ID, insecure.NewWithIdentity(insecure.ID, id, priv)) - return id, &secMuxer + return id, []sec.SecureTransport{insecure.NewWithIdentity(insecure.ID, id, priv)} } diff --git a/p2p/transport/websocket/websocket_test.go b/p2p/transport/websocket/websocket_test.go index 051c8064b6..0b05f94298 100644 --- a/p2p/transport/websocket/websocket_test.go +++ b/p2p/transport/websocket/websocket_test.go @@ -27,7 +27,6 @@ import ( "github.com/libp2p/go-libp2p/core/test" "github.com/libp2p/go-libp2p/core/transport" "github.com/libp2p/go-libp2p/p2p/muxer/yamux" - csms "github.com/libp2p/go-libp2p/p2p/net/conn-security-multistream" tptu "github.com/libp2p/go-libp2p/p2p/net/upgrader" "github.com/libp2p/go-libp2p/p2p/security/noise" ttransport "github.com/libp2p/go-libp2p/p2p/transport/testsuite" @@ -56,22 +55,16 @@ func newSecureUpgrader(t *testing.T) (peer.ID, transport.Upgrader) { return id, u } -func newInsecureMuxer(t *testing.T) (peer.ID, sec.SecureMuxer) { +func newInsecureMuxer(t *testing.T) (peer.ID, []sec.SecureTransport) { t.Helper() priv, _, err := test.RandTestKeyPair(crypto.Ed25519, 256) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) id, err := peer.IDFromPrivateKey(priv) - if err != nil { - t.Fatal(err) - } - var secMuxer csms.SSMuxer - secMuxer.AddTransport(insecure.ID, insecure.NewWithIdentity(insecure.ID, id, priv)) - return id, &secMuxer + require.NoError(t, err) + return id, []sec.SecureTransport{insecure.NewWithIdentity(insecure.ID, id, priv)} } -func newSecureMuxer(t *testing.T) (peer.ID, sec.SecureMuxer) { +func newSecureMuxer(t *testing.T) (peer.ID, []sec.SecureTransport) { t.Helper() priv, _, err := test.RandTestKeyPair(crypto.Ed25519, 256) if err != nil { @@ -81,11 +74,9 @@ func newSecureMuxer(t *testing.T) (peer.ID, sec.SecureMuxer) { if err != nil { t.Fatal(err) } - var secMuxer csms.SSMuxer noiseTpt, err := noise.New(noise.ID, priv, nil) require.NoError(t, err) - secMuxer.AddTransport(noise.ID, noiseTpt) - return id, &secMuxer + return id, []sec.SecureTransport{noiseTpt} } func lastComponent(t *testing.T, a ma.Multiaddr) ma.Multiaddr { From 83b4e3cf9c6409b283e609aa74c8b77b3bdde486 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sun, 20 Nov 2022 18:36:01 +1300 Subject: [PATCH 4/4] expose the security protocol on the ConnectionState --- core/network/conn.go | 8 ++++---- p2p/net/upgrader/conn.go | 8 ++++++-- p2p/net/upgrader/upgrader.go | 11 ++++++----- 3 files changed, 16 insertions(+), 11 deletions(-) diff --git a/core/network/conn.go b/core/network/conn.go index 550b20c27f..279621146e 100644 --- a/core/network/conn.go +++ b/core/network/conn.go @@ -34,12 +34,12 @@ type Conn interface { GetStreams() []Stream } -// ConnectionState holds extra information releated to the ConnSecurity entity. +// ConnectionState holds information about the connection. type ConnectionState struct { - // The next protocol used for stream muxer selection. This is derived from - // security protocol handshake, for example, Noise handshake payload or - // TLS/ALPN negotiation. + // The stream multiplexer used on this connection (if any). StreamMultiplexer string + // The security protocol used on this connection (if any). + Security string } // ConnSecurity is the interface that one can mix into a connection interface to diff --git a/p2p/net/upgrader/conn.go b/p2p/net/upgrader/conn.go index e3c87547cd..4fdbd05fa8 100644 --- a/p2p/net/upgrader/conn.go +++ b/p2p/net/upgrader/conn.go @@ -16,7 +16,8 @@ type transportConn struct { scope network.ConnManagementScope stat network.ConnStats - muxer protocol.ID + muxer protocol.ID + security protocol.ID } var _ transport.CapableConn = &transportConn{} @@ -54,5 +55,8 @@ func (t *transportConn) Close() error { } func (t *transportConn) ConnState() network.ConnectionState { - return network.ConnectionState{StreamMultiplexer: string(t.muxer)} + return network.ConnectionState{ + StreamMultiplexer: string(t.muxer), + Security: string(t.security), + } } diff --git a/p2p/net/upgrader/upgrader.go b/p2p/net/upgrader/upgrader.go index 5c15417d46..5a69efb0bd 100644 --- a/p2p/net/upgrader/upgrader.go +++ b/p2p/net/upgrader/upgrader.go @@ -152,7 +152,7 @@ func (u *upgrader) upgrade(ctx context.Context, t transport.Transport, maconn ma return nil, ipnet.ErrNotInPrivateNetwork } - sconn, server, err := u.setupSecurity(ctx, conn, p, dir) + sconn, security, server, err := u.setupSecurity(ctx, conn, p, dir) if err != nil { conn.Close() return nil, fmt.Errorf("failed to negotiate security protocol: %s", err) @@ -193,24 +193,25 @@ func (u *upgrader) upgrade(ctx context.Context, t transport.Transport, maconn ma stat: stat, scope: connScope, muxer: muxer, + security: security, } return tc, nil } -func (u *upgrader) setupSecurity(ctx context.Context, conn net.Conn, p peer.ID, dir network.Direction) (sec.SecureConn, bool, error) { +func (u *upgrader) setupSecurity(ctx context.Context, conn net.Conn, p peer.ID, dir network.Direction) (sec.SecureConn, protocol.ID, bool, error) { isServer := dir == network.DirInbound var st sec.SecureTransport var err error st, isServer, err = u.negotiateSecurity(ctx, conn, isServer) if err != nil { - return nil, false, err + return nil, "", false, err } if isServer { sconn, err := st.SecureInbound(ctx, conn, p) - return sconn, true, err + return sconn, st.ID(), true, err } sconn, err := st.SecureOutbound(ctx, conn, p) - return sconn, false, err + return sconn, st.ID(), false, err } func (u *upgrader) negotiateMuxer(nc net.Conn, isServer bool) (*StreamMuxer, error) {