Skip to content

Commit

Permalink
p2p-circuit v2 (#125)
Browse files Browse the repository at this point in the history
* v2 client scaffolding

* gomod: go-libp2p-core and go-libp2p-transport-upgrader feature dependencies

* Conn implements network.ConnStat

* add reservation stub

* utilities

* dial scaffolding and v1 compat dialing

* stream handling scaffolding and v1 incoming connection handling

* implement hop tagging

* export timeout variables

* v2 protobuf

* v2 client protocol implementation

* implement Reserve

* go get go-libp2p-swarm@feat/transient-conns

* implement client.New

* rework pb status codes

* client responds with UNEXPECTED_MESSAGE when it's actually an unexpected message

* relay scaffolding, reservation implementation

* implement relaying

* implement missing details

* add options for resources/limit

* gc idle conn counts

* fix clown shoes in cancellation check

* end to end relay test

* untag peers with expired reservations

* add time limit test

* better debug log for accepted conns

* add data limit test

* add v2-v1 compatibility tests

* godocs

* add WithACL relay option

* only return public relay addrs in reservation record

* remove the refresh restriction madness

* set default limit Data to 128K

* fix typo in AllowReserve godoc

* fix some small issues

- remove context from constructor
- remove stream handler when closing the host
- remove the awkward cancellation check from handleStream

* fix tests

* address review comments

- Add deadline for Reserve calls
- Add deadline for dials
- Add some comments for things that confuse aarsh.

* humor aarsh and add initializers for slices

* comment nitpicks

* fix bug in slice pre-allocations

* add deadline to connectV1

* make Relay.Close thread-safe

* untag peers with reservations when closing the relay

* gomod: get go-libp2p-asn-util

* add IP/ASN reservation constraints

* gomod: update deps

* fix e2e test

* increase default limit duration to 2min

* update protocol for vouched relay addrs; provide absolute expiration time instead of TTL

* update for reservation changes

* add voucher to the reservation pb

* TODO about reservation vouchers

* deduplicate protocol ID definitions between relay and client

* add reservation vouchers

* emit and consume reservation vouchers

* improve limit data test

* deduplicate concurrent relay dials to the samke peer

* improve dialer deduplication

* add a short timeout to dialing the relay in order to aid deduplication

* gomod: fix go1.16 madness

* spec compliance: don't include p2p-circuit in reservation addrs

* spec compliance: refuse reservation and connection attempts over relayed connections

* test shim: add empty file in test directory

* spec compliance: update protobuf

* spec compliance: use libp2p envelopes for reservation vouchers

* fix staticcheck

Co-authored-by: Marten Seemann <martenseemann@gmail.com>
  • Loading branch information
vyzo and marten-seemann authored Jul 6, 2021
1 parent f572d19 commit 2a77a3d
Show file tree
Hide file tree
Showing 27 changed files with 4,829 additions and 0 deletions.
66 changes: 66 additions & 0 deletions p2p/protocol/internal/circuitv1-deprecated/v2/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package client

import (
"context"
"sync"

"github.com/libp2p/go-libp2p-circuit/v2/proto"

"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"

logging "github.com/ipfs/go-log"
tptu "github.com/libp2p/go-libp2p-transport-upgrader"
)

var log = logging.Logger("p2p-circuit")

// Client implements the client-side of the p2p-circuit/v2 protocol:
// - it implements dialing through v2 relays
// - it listens for incoming connections through v2 relays.
//
// For backwards compatibility with v1 relays and older nodes, the client will
// also accept relay connections through v1 relays and fallback dial peers using p2p-circuit/v1.
// This allows us to use the v2 code as drop in replacement for v1 in a host without breaking
// existing code and interoperability with older nodes.
type Client struct {
ctx context.Context
host host.Host
upgrader *tptu.Upgrader

incoming chan accept

mx sync.Mutex
activeDials map[peer.ID]*completion
hopCount map[peer.ID]int
}

type accept struct {
conn *Conn
writeResponse func() error
}

type completion struct {
ch chan struct{}
relay peer.ID
err error
}

// New constructs a new p2p-circuit/v2 client, attached to the given host and using the given
// upgrader to perform connection upgrades.
func New(ctx context.Context, h host.Host, upgrader *tptu.Upgrader) (*Client, error) {
return &Client{
ctx: ctx,
host: h,
upgrader: upgrader,
incoming: make(chan accept),
activeDials: make(map[peer.ID]*completion),
hopCount: make(map[peer.ID]int),
}, nil
}

// Start registers the circuit (client) protocol stream handlers
func (c *Client) Start() {
c.host.SetStreamHandler(proto.ProtoIDv1, c.handleStreamV1)
c.host.SetStreamHandler(proto.ProtoIDv2Stop, c.handleStreamV2)
}
145 changes: 145 additions & 0 deletions p2p/protocol/internal/circuitv1-deprecated/v2/client/conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package client

import (
"fmt"
"net"
"time"

"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"

ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)

// HopTagWeight is the connection manager weight for connections carrying relay hop streams
var HopTagWeight = 5

type statLimitDuration struct{}
type statLimitData struct{}

var (
StatLimitDuration = statLimitDuration{}
StatLimitData = statLimitData{}
)

type Conn struct {
stream network.Stream
remote peer.AddrInfo
stat network.Stat

client *Client
}

type NetAddr struct {
Relay string
Remote string
}

var _ net.Addr = (*NetAddr)(nil)

func (n *NetAddr) Network() string {
return "libp2p-circuit-relay"
}

func (n *NetAddr) String() string {
return fmt.Sprintf("relay[%s-%s]", n.Remote, n.Relay)
}

// Conn interface
var _ manet.Conn = (*Conn)(nil)

func (c *Conn) Close() error {
c.untagHop()
return c.stream.Reset()
}

func (c *Conn) Read(buf []byte) (int, error) {
return c.stream.Read(buf)
}

func (c *Conn) Write(buf []byte) (int, error) {
return c.stream.Write(buf)
}

func (c *Conn) SetDeadline(t time.Time) error {
return c.stream.SetDeadline(t)
}

func (c *Conn) SetReadDeadline(t time.Time) error {
return c.stream.SetReadDeadline(t)
}

func (c *Conn) SetWriteDeadline(t time.Time) error {
return c.stream.SetWriteDeadline(t)
}

// TODO: is it okay to cast c.Conn().RemotePeer() into a multiaddr? might be "user input"
func (c *Conn) RemoteMultiaddr() ma.Multiaddr {
// TODO: We should be able to do this directly without converting to/from a string.
relayAddr, err := ma.NewComponent(
ma.ProtocolWithCode(ma.P_P2P).Name,
c.stream.Conn().RemotePeer().Pretty(),
)
if err != nil {
panic(err)
}
return ma.Join(c.stream.Conn().RemoteMultiaddr(), relayAddr, circuitAddr)
}

func (c *Conn) LocalMultiaddr() ma.Multiaddr {
return c.stream.Conn().LocalMultiaddr()
}

func (c *Conn) LocalAddr() net.Addr {
na, err := manet.ToNetAddr(c.stream.Conn().LocalMultiaddr())
if err != nil {
log.Error("failed to convert local multiaddr to net addr:", err)
return nil
}
return na
}

func (c *Conn) RemoteAddr() net.Addr {
return &NetAddr{
Relay: c.stream.Conn().RemotePeer().Pretty(),
Remote: c.remote.ID.Pretty(),
}
}

// ConnStat interface
var _ network.ConnStat = (*Conn)(nil)

func (c *Conn) Stat() network.Stat {
return c.stat
}

// tagHop tags the underlying relay connection so that it can be (somewhat) protected from the
// connection manager as it is an important connection that proxies other connections.
// This is handled here so that the user code doesnt need to bother with this and avoid
// clown shoes situations where a high value peer connection is behind a relayed connection and it is
// implicitly because the connection manager closed the underlying relay connection.
func (c *Conn) tagHop() {
c.client.mx.Lock()
defer c.client.mx.Unlock()

p := c.stream.Conn().RemotePeer()
c.client.hopCount[p]++
if c.client.hopCount[p] == 1 {
c.client.host.ConnManager().TagPeer(p, "relay-hop-stream", HopTagWeight)
}
}

// untagHop removes the relay-hop-stream tag if necessary; it is invoked when a relayed connection
// is closed.
func (c *Conn) untagHop() {
c.client.mx.Lock()
defer c.client.mx.Unlock()

p := c.stream.Conn().RemotePeer()
c.client.hopCount[p]--
if c.client.hopCount[p] == 0 {
c.client.host.ConnManager().UntagPeer(p, "relay-hop-stream")
delete(c.client.hopCount, p)
}
}
Loading

0 comments on commit 2a77a3d

Please sign in to comment.