Skip to content

Commit

Permalink
p2p-circuit v2 (#125)
Browse files Browse the repository at this point in the history
* v2 client scaffolding

* gomod: go-libp2p-core and go-libp2p-transport-upgrader feature dependencies

* Conn implements network.ConnStat

* add reservation stub

* utilities

* dial scaffolding and v1 compat dialing

* stream handling scaffolding and v1 incoming connection handling

* implement hop tagging

* export timeout variables

* v2 protobuf

* v2 client protocol implementation

* implement Reserve

* go get go-libp2p-swarm@feat/transient-conns

* implement client.New

* rework pb status codes

* client responds with UNEXPECTED_MESSAGE when it's actually an unexpected message

* relay scaffolding, reservation implementation

* implement relaying

* implement missing details

* add options for resources/limit

* gc idle conn counts

* fix clown shoes in cancellation check

* end to end relay test

* untag peers with expired reservations

* add time limit test

* better debug log for accepted conns

* add data limit test

* add v2-v1 compatibility tests

* godocs

* add WithACL relay option

* only return public relay addrs in reservation record

* remove the refresh restriction madness

* set default limit Data to 128K

* fix typo in AllowReserve godoc

* fix some small issues

- remove context from constructor
- remove stream handler when closing the host
- remove the awkward cancellation check from handleStream

* fix tests

* address review comments

- Add deadline for Reserve calls
- Add deadline for dials
- Add some comments for things that confuse aarsh.

* humor aarsh and add initializers for slices

* comment nitpicks

* fix bug in slice pre-allocations

* add deadline to connectV1

* make Relay.Close thread-safe

* untag peers with reservations when closing the relay

* gomod: get go-libp2p-asn-util

* add IP/ASN reservation constraints

* gomod: update deps

* fix e2e test

* increase default limit duration to 2min

* update protocol for vouched relay addrs; provide absolute expiration time instead of TTL

* update for reservation changes

* add voucher to the reservation pb

* TODO about reservation vouchers

* deduplicate protocol ID definitions between relay and client

* add reservation vouchers

* emit and consume reservation vouchers

* improve limit data test

* deduplicate concurrent relay dials to the samke peer

* improve dialer deduplication

* add a short timeout to dialing the relay in order to aid deduplication

* gomod: fix go1.16 madness

* spec compliance: don't include p2p-circuit in reservation addrs

* spec compliance: refuse reservation and connection attempts over relayed connections

* test shim: add empty file in test directory

* spec compliance: update protobuf

* spec compliance: use libp2p envelopes for reservation vouchers

* fix staticcheck

Co-authored-by: Marten Seemann <martenseemann@gmail.com>
  • Loading branch information
vyzo and marten-seemann committed Sep 2, 2021
1 parent fd4d2a4 commit 7872bd5
Show file tree
Hide file tree
Showing 32 changed files with 4,843 additions and 2 deletions.
3 changes: 2 additions & 1 deletion examples/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -429,8 +429,9 @@ github.com/libp2p/go-flow-metrics v0.0.1/go.mod h1:Iv1GH0sG8DtYN3SVJ2eG221wMiNpZ
github.com/libp2p/go-flow-metrics v0.0.2/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs=
github.com/libp2p/go-flow-metrics v0.0.3 h1:8tAs/hSdNvUiLgtlSy3mxwxWP4I9y/jlkPFT7epKdeM=
github.com/libp2p/go-flow-metrics v0.0.3/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs=
github.com/libp2p/go-libp2p-asn-util v0.0.0-20200825225859-85005c6cf052 h1:BM7aaOF7RpmNn9+9g6uTjGJ0cTzWr5j9i9IKeun2M8U=
github.com/libp2p/go-libp2p-asn-util v0.0.0-20200825225859-85005c6cf052/go.mod h1:nRMRTab+kZuk0LnKZpxhOVH/ndsdr2Nr//Zltc/vwgo=
github.com/libp2p/go-libp2p-asn-util v0.0.0-20210818120414-1f382a4aa43a h1:6yEuCOY31elgeJ2KA2JiREZjIznvH6lOWCdHRuhgEgc=
github.com/libp2p/go-libp2p-asn-util v0.0.0-20210818120414-1f382a4aa43a/go.mod h1:wu+AnM9Ii2KgO5jMmS1rz9dvzTdj8BXqsPR9HR0XB7I=
github.com/libp2p/go-libp2p-autonat v0.4.2 h1:YMp7StMi2dof+baaxkbxaizXjY1RPvU71CXfxExzcUU=
github.com/libp2p/go-libp2p-autonat v0.4.2/go.mod h1:YxaJlpr81FhdOv3W3BTconZPfhaYivRdf53g+S2wobk=
github.com/libp2p/go-libp2p-blankhost v0.2.0 h1:3EsGAi0CBGcZ33GwRuXEYJLLPoVWyXJ1bcJzAJjINkk=
Expand Down
3 changes: 2 additions & 1 deletion examples/ipfs-camp-2019/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -429,8 +429,9 @@ github.com/libp2p/go-flow-metrics v0.0.1/go.mod h1:Iv1GH0sG8DtYN3SVJ2eG221wMiNpZ
github.com/libp2p/go-flow-metrics v0.0.2/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs=
github.com/libp2p/go-flow-metrics v0.0.3 h1:8tAs/hSdNvUiLgtlSy3mxwxWP4I9y/jlkPFT7epKdeM=
github.com/libp2p/go-flow-metrics v0.0.3/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs=
github.com/libp2p/go-libp2p-asn-util v0.0.0-20200825225859-85005c6cf052 h1:BM7aaOF7RpmNn9+9g6uTjGJ0cTzWr5j9i9IKeun2M8U=
github.com/libp2p/go-libp2p-asn-util v0.0.0-20200825225859-85005c6cf052/go.mod h1:nRMRTab+kZuk0LnKZpxhOVH/ndsdr2Nr//Zltc/vwgo=
github.com/libp2p/go-libp2p-asn-util v0.0.0-20210818120414-1f382a4aa43a h1:6yEuCOY31elgeJ2KA2JiREZjIznvH6lOWCdHRuhgEgc=
github.com/libp2p/go-libp2p-asn-util v0.0.0-20210818120414-1f382a4aa43a/go.mod h1:wu+AnM9Ii2KgO5jMmS1rz9dvzTdj8BXqsPR9HR0XB7I=
github.com/libp2p/go-libp2p-autonat v0.4.2 h1:YMp7StMi2dof+baaxkbxaizXjY1RPvU71CXfxExzcUU=
github.com/libp2p/go-libp2p-autonat v0.4.2/go.mod h1:YxaJlpr81FhdOv3W3BTconZPfhaYivRdf53g+S2wobk=
github.com/libp2p/go-libp2p-blankhost v0.2.0 h1:3EsGAi0CBGcZ33GwRuXEYJLLPoVWyXJ1bcJzAJjINkk=
Expand Down
2 changes: 2 additions & 0 deletions examples/pubsub/chat/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ github.com/libp2p/go-addr-util v0.1.0/go.mod h1:6I3ZYuFr2O/9D+SoyM0zEw0EF3YkldtT
github.com/libp2p/go-buffer-pool v0.0.1/go.mod h1:xtyIz9PMobb13WaxR6Zo1Pd1zXJKYg0a8KiIvDp3TzQ=
github.com/libp2p/go-buffer-pool v0.0.2 h1:QNK2iAFa8gjAe1SPz6mHSMuCcjs+X1wlHzeOSqcmlfs=
github.com/libp2p/go-buffer-pool v0.0.2/go.mod h1:MvaB6xw5vOrDl8rYZGLFdKAuk/hRoRZd1Vi32+RXyFM=
github.com/libp2p/go-cidranger v1.1.0/go.mod h1:KWZTfSr+r9qEo9OkI9/SIEeAtw+NNoU0dXIXt15Okic=
github.com/libp2p/go-conn-security-multistream v0.2.0/go.mod h1:hZN4MjlNetKD3Rq5Jb/P5ohUnFLNzEAR4DLSzpn2QLU=
github.com/libp2p/go-conn-security-multistream v0.2.1 h1:ft6/POSK7F+vl/2qzegnHDaXFU0iWB4yVTYrioC6Zy0=
github.com/libp2p/go-conn-security-multistream v0.2.1/go.mod h1:cR1d8gA0Hr59Fj6NhaTpFhJZrjSYuNmhpT2r25zYR70=
Expand All @@ -409,6 +410,7 @@ github.com/libp2p/go-eventbus v0.2.1/go.mod h1:jc2S4SoEVPP48H9Wpzm5aiGwUCBMfGhVh
github.com/libp2p/go-flow-metrics v0.0.1/go.mod h1:Iv1GH0sG8DtYN3SVJ2eG221wMiNpZxBdp967ls1g+k8=
github.com/libp2p/go-flow-metrics v0.0.3 h1:8tAs/hSdNvUiLgtlSy3mxwxWP4I9y/jlkPFT7epKdeM=
github.com/libp2p/go-flow-metrics v0.0.3/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs=
github.com/libp2p/go-libp2p-asn-util v0.0.0-20210818120414-1f382a4aa43a/go.mod h1:wu+AnM9Ii2KgO5jMmS1rz9dvzTdj8BXqsPR9HR0XB7I=
github.com/libp2p/go-libp2p-autonat v0.4.2 h1:YMp7StMi2dof+baaxkbxaizXjY1RPvU71CXfxExzcUU=
github.com/libp2p/go-libp2p-autonat v0.4.2/go.mod h1:YxaJlpr81FhdOv3W3BTconZPfhaYivRdf53g+S2wobk=
github.com/libp2p/go-libp2p-blankhost v0.2.0 h1:3EsGAi0CBGcZ33GwRuXEYJLLPoVWyXJ1bcJzAJjINkk=
Expand Down
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@ require (
github.com/ipfs/go-datastore v0.4.6
github.com/ipfs/go-detect-race v0.0.1
github.com/ipfs/go-ipfs-util v0.0.2
github.com/ipfs/go-log v1.0.5
github.com/ipfs/go-log/v2 v2.3.0
github.com/jbenet/go-cienv v0.1.0
github.com/jbenet/goprocess v0.1.4
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
github.com/koron/go-ssdp v0.0.2 // indirect
github.com/libp2p/go-addr-util v0.1.0
github.com/libp2p/go-buffer-pool v0.0.2
github.com/libp2p/go-conn-security-multistream v0.2.1
github.com/libp2p/go-eventbus v0.2.1
github.com/libp2p/go-libp2p-asn-util v0.0.0-20210818120414-1f382a4aa43a
github.com/libp2p/go-libp2p-autonat v0.4.2
github.com/libp2p/go-libp2p-blankhost v0.2.0
github.com/libp2p/go-libp2p-circuit v0.4.0
Expand All @@ -45,6 +48,7 @@ require (
github.com/multiformats/go-multiaddr v0.4.0
github.com/multiformats/go-multiaddr-dns v0.3.1
github.com/multiformats/go-multistream v0.2.2
github.com/multiformats/go-varint v0.0.6
github.com/prometheus/common v0.30.0 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/stretchr/testify v1.7.0
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,8 @@ github.com/libp2p/go-addr-util v0.1.0/go.mod h1:6I3ZYuFr2O/9D+SoyM0zEw0EF3YkldtT
github.com/libp2p/go-buffer-pool v0.0.1/go.mod h1:xtyIz9PMobb13WaxR6Zo1Pd1zXJKYg0a8KiIvDp3TzQ=
github.com/libp2p/go-buffer-pool v0.0.2 h1:QNK2iAFa8gjAe1SPz6mHSMuCcjs+X1wlHzeOSqcmlfs=
github.com/libp2p/go-buffer-pool v0.0.2/go.mod h1:MvaB6xw5vOrDl8rYZGLFdKAuk/hRoRZd1Vi32+RXyFM=
github.com/libp2p/go-cidranger v1.1.0 h1:ewPN8EZ0dd1LSnrtuwd4709PXVcITVeuwbag38yPW7c=
github.com/libp2p/go-cidranger v1.1.0/go.mod h1:KWZTfSr+r9qEo9OkI9/SIEeAtw+NNoU0dXIXt15Okic=
github.com/libp2p/go-conn-security-multistream v0.1.0/go.mod h1:aw6eD7LOsHEX7+2hJkDxw1MteijaVcI+/eP2/x3J1xc=
github.com/libp2p/go-conn-security-multistream v0.2.0/go.mod h1:hZN4MjlNetKD3Rq5Jb/P5ohUnFLNzEAR4DLSzpn2QLU=
github.com/libp2p/go-conn-security-multistream v0.2.1 h1:ft6/POSK7F+vl/2qzegnHDaXFU0iWB4yVTYrioC6Zy0=
Expand All @@ -432,6 +434,8 @@ github.com/libp2p/go-libp2p v0.6.1/go.mod h1:CTFnWXogryAHjXAKEbOf1OWY+VeAP3lDMZk
github.com/libp2p/go-libp2p v0.7.0/go.mod h1:hZJf8txWeCduQRDC/WSqBGMxaTHCOYHt2xSU1ivxn0k=
github.com/libp2p/go-libp2p v0.7.4/go.mod h1:oXsBlTLF1q7pxr+9w6lqzS1ILpyHsaBPniVO7zIHGMw=
github.com/libp2p/go-libp2p v0.8.1/go.mod h1:QRNH9pwdbEBpx5DTJYg+qxcVaDMAz3Ee/qDKwXujH5o=
github.com/libp2p/go-libp2p-asn-util v0.0.0-20210818120414-1f382a4aa43a h1:6yEuCOY31elgeJ2KA2JiREZjIznvH6lOWCdHRuhgEgc=
github.com/libp2p/go-libp2p-asn-util v0.0.0-20210818120414-1f382a4aa43a/go.mod h1:wu+AnM9Ii2KgO5jMmS1rz9dvzTdj8BXqsPR9HR0XB7I=
github.com/libp2p/go-libp2p-autonat v0.1.1/go.mod h1:OXqkeGOY2xJVWKAGV2inNF5aKN/djNA3fdpCWloIudE=
github.com/libp2p/go-libp2p-autonat v0.2.0/go.mod h1:DX+9teU4pEEoZUqR1PiMlqliONQdNbfzE1C718tcViI=
github.com/libp2p/go-libp2p-autonat v0.2.1/go.mod h1:MWtAhV5Ko1l6QBsHQNSuM6b1sRkXrpk0/LqCr+vCVxI=
Expand Down
66 changes: 66 additions & 0 deletions p2p/host/circuitv2/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package client

import (
"context"
"sync"

"github.com/libp2p/go-libp2p/p2p/host/circuitv2/proto"

"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"

logging "github.com/ipfs/go-log"
tptu "github.com/libp2p/go-libp2p-transport-upgrader"
)

var log = logging.Logger("p2p-circuit")

// Client implements the client-side of the p2p-circuit/v2 protocol:
// - it implements dialing through v2 relays
// - it listens for incoming connections through v2 relays.
//
// For backwards compatibility with v1 relays and older nodes, the client will
// also accept relay connections through v1 relays and fallback dial peers using p2p-circuit/v1.
// This allows us to use the v2 code as drop in replacement for v1 in a host without breaking
// existing code and interoperability with older nodes.
type Client struct {
ctx context.Context
host host.Host
upgrader *tptu.Upgrader

incoming chan accept

mx sync.Mutex
activeDials map[peer.ID]*completion
hopCount map[peer.ID]int
}

type accept struct {
conn *Conn
writeResponse func() error
}

type completion struct {
ch chan struct{}
relay peer.ID
err error
}

// New constructs a new p2p-circuit/v2 client, attached to the given host and using the given
// upgrader to perform connection upgrades.
func New(ctx context.Context, h host.Host, upgrader *tptu.Upgrader) (*Client, error) {
return &Client{
ctx: ctx,
host: h,
upgrader: upgrader,
incoming: make(chan accept),
activeDials: make(map[peer.ID]*completion),
hopCount: make(map[peer.ID]int),
}, nil
}

// Start registers the circuit (client) protocol stream handlers
func (c *Client) Start() {
c.host.SetStreamHandler(proto.ProtoIDv1, c.handleStreamV1)
c.host.SetStreamHandler(proto.ProtoIDv2Stop, c.handleStreamV2)
}
145 changes: 145 additions & 0 deletions p2p/host/circuitv2/client/conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package client

import (
"fmt"
"net"
"time"

"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"

ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)

// HopTagWeight is the connection manager weight for connections carrying relay hop streams
var HopTagWeight = 5

type statLimitDuration struct{}
type statLimitData struct{}

var (
StatLimitDuration = statLimitDuration{}
StatLimitData = statLimitData{}
)

type Conn struct {
stream network.Stream
remote peer.AddrInfo
stat network.Stat

client *Client
}

type NetAddr struct {
Relay string
Remote string
}

var _ net.Addr = (*NetAddr)(nil)

func (n *NetAddr) Network() string {
return "libp2p-circuit-relay"
}

func (n *NetAddr) String() string {
return fmt.Sprintf("relay[%s-%s]", n.Remote, n.Relay)
}

// Conn interface
var _ manet.Conn = (*Conn)(nil)

func (c *Conn) Close() error {
c.untagHop()
return c.stream.Reset()
}

func (c *Conn) Read(buf []byte) (int, error) {
return c.stream.Read(buf)
}

func (c *Conn) Write(buf []byte) (int, error) {
return c.stream.Write(buf)
}

func (c *Conn) SetDeadline(t time.Time) error {
return c.stream.SetDeadline(t)
}

func (c *Conn) SetReadDeadline(t time.Time) error {
return c.stream.SetReadDeadline(t)
}

func (c *Conn) SetWriteDeadline(t time.Time) error {
return c.stream.SetWriteDeadline(t)
}

// TODO: is it okay to cast c.Conn().RemotePeer() into a multiaddr? might be "user input"
func (c *Conn) RemoteMultiaddr() ma.Multiaddr {
// TODO: We should be able to do this directly without converting to/from a string.
relayAddr, err := ma.NewComponent(
ma.ProtocolWithCode(ma.P_P2P).Name,
c.stream.Conn().RemotePeer().Pretty(),
)
if err != nil {
panic(err)
}
return ma.Join(c.stream.Conn().RemoteMultiaddr(), relayAddr, circuitAddr)
}

func (c *Conn) LocalMultiaddr() ma.Multiaddr {
return c.stream.Conn().LocalMultiaddr()
}

func (c *Conn) LocalAddr() net.Addr {
na, err := manet.ToNetAddr(c.stream.Conn().LocalMultiaddr())
if err != nil {
log.Error("failed to convert local multiaddr to net addr:", err)
return nil
}
return na
}

func (c *Conn) RemoteAddr() net.Addr {
return &NetAddr{
Relay: c.stream.Conn().RemotePeer().Pretty(),
Remote: c.remote.ID.Pretty(),
}
}

// ConnStat interface
var _ network.ConnStat = (*Conn)(nil)

func (c *Conn) Stat() network.Stat {
return c.stat
}

// tagHop tags the underlying relay connection so that it can be (somewhat) protected from the
// connection manager as it is an important connection that proxies other connections.
// This is handled here so that the user code doesnt need to bother with this and avoid
// clown shoes situations where a high value peer connection is behind a relayed connection and it is
// implicitly because the connection manager closed the underlying relay connection.
func (c *Conn) tagHop() {
c.client.mx.Lock()
defer c.client.mx.Unlock()

p := c.stream.Conn().RemotePeer()
c.client.hopCount[p]++
if c.client.hopCount[p] == 1 {
c.client.host.ConnManager().TagPeer(p, "relay-hop-stream", HopTagWeight)
}
}

// untagHop removes the relay-hop-stream tag if necessary; it is invoked when a relayed connection
// is closed.
func (c *Conn) untagHop() {
c.client.mx.Lock()
defer c.client.mx.Unlock()

p := c.stream.Conn().RemotePeer()
c.client.hopCount[p]--
if c.client.hopCount[p] == 0 {
c.client.host.ConnManager().UntagPeer(p, "relay-hop-stream")
delete(c.client.hopCount, p)
}
}
Loading

0 comments on commit 7872bd5

Please sign in to comment.