Skip to content

Commit

Permalink
relay: make only 1 reservation per peer
Browse files Browse the repository at this point in the history
We only need 1 reservation per peer.

This PR also proactively removes relay reservations on disconnecting
from the peer. This helps when the relay client is restarting
frequently.
  • Loading branch information
sukunrt committed Sep 23, 2024
1 parent 9038a72 commit 17569ef
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 72 deletions.
90 changes: 49 additions & 41 deletions p2p/protocol/circuitv2/relay/constraints.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package relay

import (
"errors"
"slices"
"sync"
"time"

Expand All @@ -12,46 +13,48 @@ import (
manet "github.com/multiformats/go-multiaddr/net"
)

var validity = 30 * time.Minute

var (
errTooManyReservations = errors.New("too many reservations")
errTooManyReservationsForPeer = errors.New("too many reservations for peer")
errTooManyReservationsForIP = errors.New("too many peers for IP address")
errTooManyReservationsForASN = errors.New("too many peers for ASN")
errTooManyReservations = errors.New("too many reservations")
errTooManyReservationsForIP = errors.New("too many peers for IP address")
errTooManyReservationsForASN = errors.New("too many peers for ASN")
)

type peerWithExpiry struct {
Expiry time.Time
Peer peer.ID
}

// constraints implements various reservation constraints
type constraints struct {
rc *Resources

mutex sync.Mutex
total []time.Time
peers map[peer.ID][]time.Time
ips map[string][]time.Time
asns map[uint32][]time.Time
total []peerWithExpiry
ips map[string][]peerWithExpiry
asns map[uint32][]peerWithExpiry
}

// newConstraints creates a new constraints object.
// The methods are *not* thread-safe; an external lock must be held if synchronization
// is required.
func newConstraints(rc *Resources) *constraints {
return &constraints{
rc: rc,
peers: make(map[peer.ID][]time.Time),
ips: make(map[string][]time.Time),
asns: make(map[uint32][]time.Time),
rc: rc,
ips: make(map[string][]peerWithExpiry),
asns: make(map[uint32][]peerWithExpiry),
}
}

// AddReservation adds a reservation for a given peer with a given multiaddr.
// If adding this reservation violates IP constraints, an error is returned.
func (c *constraints) AddReservation(p peer.ID, a ma.Multiaddr) error {
func (c *constraints) Reserve(p peer.ID, a ma.Multiaddr, expiry time.Time) error {
c.mutex.Lock()
defer c.mutex.Unlock()

now := time.Now()
c.cleanup(now)
// To handle refreshes correctly, remove the existing reservation for the peer.
c.cleanupPeer(p)

if len(c.total) >= c.rc.MaxReservations {
return errTooManyReservations
Expand All @@ -62,17 +65,12 @@ func (c *constraints) AddReservation(p peer.ID, a ma.Multiaddr) error {
return errors.New("no IP address associated with peer")
}

peerReservations := c.peers[p]
if len(peerReservations) >= c.rc.MaxReservationsPerPeer {
return errTooManyReservationsForPeer
}

ipReservations := c.ips[ip.String()]
if len(ipReservations) >= c.rc.MaxReservationsPerIP {
return errTooManyReservationsForIP
}

var asnReservations []time.Time
var asnReservations []peerWithExpiry
var asn uint32
if ip.To4() == nil {
asn = asnutil.AsnForIPv6(ip)
Expand All @@ -84,42 +82,52 @@ func (c *constraints) AddReservation(p peer.ID, a ma.Multiaddr) error {
}
}

expiry := now.Add(validity)
c.total = append(c.total, expiry)

peerReservations = append(peerReservations, expiry)
c.peers[p] = peerReservations
c.total = append(c.total, peerWithExpiry{Expiry: expiry, Peer: p})

ipReservations = append(ipReservations, expiry)
ipReservations = append(ipReservations, peerWithExpiry{Expiry: expiry, Peer: p})
c.ips[ip.String()] = ipReservations

if asn != 0 {
asnReservations = append(asnReservations, expiry)
asnReservations = append(asnReservations, peerWithExpiry{Expiry: expiry, Peer: p})
c.asns[asn] = asnReservations
}
return nil
}

func (c *constraints) cleanupList(l []time.Time, now time.Time) []time.Time {
var index int
for i, t := range l {
if t.After(now) {
break
func (c *constraints) cleanup(now time.Time) {
expireFunc := func(pe peerWithExpiry) bool {
return pe.Expiry.Before(now)
}
c.total = slices.DeleteFunc(c.total, expireFunc)
for k, ipReservations := range c.ips {
c.ips[k] = slices.DeleteFunc(ipReservations, expireFunc)
if len(c.ips[k]) == 0 {
delete(c.ips, k)
}
}
for k, asnReservations := range c.asns {
c.asns[k] = slices.DeleteFunc(asnReservations, expireFunc)
if len(c.asns[k]) == 0 {
delete(c.asns, k)
}
index = i + 1
}
return l[index:]
}

func (c *constraints) cleanup(now time.Time) {
c.total = c.cleanupList(c.total, now)
for k, peerReservations := range c.peers {
c.peers[k] = c.cleanupList(peerReservations, now)
func (c *constraints) cleanupPeer(p peer.ID) {
removeFunc := func(pe peerWithExpiry) bool {
return pe.Peer == p
}
c.total = slices.DeleteFunc(c.total, removeFunc)
for k, ipReservations := range c.ips {
c.ips[k] = c.cleanupList(ipReservations, now)
c.ips[k] = slices.DeleteFunc(ipReservations, removeFunc)
if len(c.ips[k]) == 0 {
delete(c.ips, k)
}
}
for k, asnReservations := range c.asns {
c.asns[k] = c.cleanupList(asnReservations, now)
c.asns[k] = slices.DeleteFunc(asnReservations, removeFunc)
if len(c.asns[k]) == 0 {
delete(c.asns, k)
}
}
}
49 changes: 26 additions & 23 deletions p2p/protocol/circuitv2/relay/constraints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,35 +34,40 @@ func TestConstraints(t *testing.T) {
}
}
const limit = 7
expiry := time.Now().Add(30 * time.Minute)

t.Run("total reservations", func(t *testing.T) {
res := infResources()
res.MaxReservations = limit
c := newConstraints(res)
for i := 0; i < limit; i++ {
if err := c.AddReservation(test.RandPeerIDFatal(t), randomIPv4Addr(t)); err != nil {
if err := c.Reserve(test.RandPeerIDFatal(t), randomIPv4Addr(t), expiry); err != nil {
t.Fatal(err)
}
}
if err := c.AddReservation(test.RandPeerIDFatal(t), randomIPv4Addr(t)); err != errTooManyReservations {
if err := c.Reserve(test.RandPeerIDFatal(t), randomIPv4Addr(t), expiry); err != errTooManyReservations {
t.Fatalf("expected to run into total reservation limit, got %v", err)
}
})

t.Run("reservations per peer", func(t *testing.T) {
t.Run("updates reservations on the same peer", func(t *testing.T) {
p := test.RandPeerIDFatal(t)
p2 := test.RandPeerIDFatal(t)
res := infResources()
res.MaxReservationsPerPeer = limit
res.MaxReservationsPerIP = 1
c := newConstraints(res)
for i := 0; i < limit; i++ {
if err := c.AddReservation(p, randomIPv4Addr(t)); err != nil {
t.Fatal(err)
}

ipAddr := randomIPv4Addr(t)
if err := c.Reserve(p, ipAddr, expiry); err != nil {
t.Fatal(err)
}
if err := c.AddReservation(p, randomIPv4Addr(t)); err != errTooManyReservationsForPeer {
if err := c.Reserve(p2, ipAddr, expiry); err != errTooManyReservationsForIP {
t.Fatalf("expected to run into total reservation limit, got %v", err)
}
if err := c.AddReservation(test.RandPeerIDFatal(t), randomIPv4Addr(t)); err != nil {
if err := c.Reserve(p, randomIPv4Addr(t), expiry); err != nil {
t.Fatalf("expected to update reservation for peer, got %v", err)
}
if err := c.Reserve(p2, ipAddr, expiry); err != nil {
t.Fatalf("expected reservation for different peer to be possible, got %v", err)
}
})
Expand All @@ -73,14 +78,14 @@ func TestConstraints(t *testing.T) {
res.MaxReservationsPerIP = limit
c := newConstraints(res)
for i := 0; i < limit; i++ {
if err := c.AddReservation(test.RandPeerIDFatal(t), ip); err != nil {
if err := c.Reserve(test.RandPeerIDFatal(t), ip, expiry); err != nil {
t.Fatal(err)
}
}
if err := c.AddReservation(test.RandPeerIDFatal(t), ip); err != errTooManyReservationsForIP {
if err := c.Reserve(test.RandPeerIDFatal(t), ip, expiry); err != errTooManyReservationsForIP {
t.Fatalf("expected to run into total reservation limit, got %v", err)
}
if err := c.AddReservation(test.RandPeerIDFatal(t), randomIPv4Addr(t)); err != nil {
if err := c.Reserve(test.RandPeerIDFatal(t), randomIPv4Addr(t), expiry); err != nil {
t.Fatalf("expected reservation for different IP to be possible, got %v", err)
}
})
Expand All @@ -101,25 +106,23 @@ func TestConstraints(t *testing.T) {
const ipv6Prefix = "2a03:2880:f003:c07:face:b00c::"
for i := 0; i < limit; i++ {
addr := getAddr(t, net.ParseIP(fmt.Sprintf("%s%d", ipv6Prefix, i+1)))
if err := c.AddReservation(test.RandPeerIDFatal(t), addr); err != nil {
if err := c.Reserve(test.RandPeerIDFatal(t), addr, expiry); err != nil {
t.Fatal(err)
}
}
if err := c.AddReservation(test.RandPeerIDFatal(t), getAddr(t, net.ParseIP(fmt.Sprintf("%s%d", ipv6Prefix, 42)))); err != errTooManyReservationsForASN {
if err := c.Reserve(test.RandPeerIDFatal(t), getAddr(t, net.ParseIP(fmt.Sprintf("%s%d", ipv6Prefix, 42))), expiry); err != errTooManyReservationsForASN {
t.Fatalf("expected to run into total reservation limit, got %v", err)
}
if err := c.AddReservation(test.RandPeerIDFatal(t), randomIPv4Addr(t)); err != nil {
if err := c.Reserve(test.RandPeerIDFatal(t), randomIPv4Addr(t), expiry); err != nil {
t.Fatalf("expected reservation for different IP to be possible, got %v", err)
}
})
}

func TestConstraintsCleanup(t *testing.T) {
origValidity := validity
defer func() { validity = origValidity }()
validity = 500 * time.Millisecond

const limit = 7
validity := 500 * time.Millisecond
expiry := time.Now().Add(validity)
res := &Resources{
MaxReservations: limit,
MaxReservationsPerPeer: math.MaxInt32,
Expand All @@ -128,16 +131,16 @@ func TestConstraintsCleanup(t *testing.T) {
}
c := newConstraints(res)
for i := 0; i < limit; i++ {
if err := c.AddReservation(test.RandPeerIDFatal(t), randomIPv4Addr(t)); err != nil {
if err := c.Reserve(test.RandPeerIDFatal(t), randomIPv4Addr(t), expiry); err != nil {
t.Fatal(err)
}
}
if err := c.AddReservation(test.RandPeerIDFatal(t), randomIPv4Addr(t)); err != errTooManyReservations {
if err := c.Reserve(test.RandPeerIDFatal(t), randomIPv4Addr(t), expiry); err != errTooManyReservations {
t.Fatalf("expected to run into total reservation limit, got %v", err)
}

time.Sleep(validity + time.Millisecond)
if err := c.AddReservation(test.RandPeerIDFatal(t), randomIPv4Addr(t)); err != nil {
if err := c.Reserve(test.RandPeerIDFatal(t), randomIPv4Addr(t), expiry); err != nil {
t.Fatalf("expected old reservations to have been garbage collected, %v", err)
}
}
15 changes: 7 additions & 8 deletions p2p/protocol/circuitv2/relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,18 +202,16 @@ func (r *Relay) handleReserve(s network.Stream) pbv2.Status {
return pbv2.Status_PERMISSION_DENIED
}
now := time.Now()
expire := now.Add(r.rc.ReservationTTL)

_, exists := r.rsvp[p]
if !exists {
if err := r.constraints.AddReservation(p, a); err != nil {
r.mx.Unlock()
log.Debugf("refusing relay reservation for %s; IP constraint violation: %s", p, err)
r.handleError(s, pbv2.Status_RESERVATION_REFUSED)
return pbv2.Status_RESERVATION_REFUSED
}
if err := r.constraints.Reserve(p, a, expire); err != nil {
r.mx.Unlock()
log.Debugf("refusing relay reservation for %s; IP constraint violation: %s", p, err)
r.handleError(s, pbv2.Status_RESERVATION_REFUSED)
return pbv2.Status_RESERVATION_REFUSED
}

expire := now.Add(r.rc.ReservationTTL)
r.rsvp[p] = expire
r.host.ConnManager().TagPeer(p, "relay-reservation", ReservationTagWeight)
r.mx.Unlock()
Expand Down Expand Up @@ -673,6 +671,7 @@ func (r *Relay) disconnected(n network.Network, c network.Conn) {
if ok {
delete(r.rsvp, p)
}
r.constraints.cleanupPeer(p)
r.mx.Unlock()

if ok && r.metricsTracer != nil {
Expand Down
2 changes: 2 additions & 0 deletions p2p/protocol/circuitv2/relay/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ type Resources struct {

// MaxReservationsPerPeer is the maximum number of reservations originating from the same
// peer; default is 4.
//
// Deprecated: We only need 1 reservation per peer.
MaxReservationsPerPeer int
// MaxReservationsPerIP is the maximum number of reservations originating from the same
// IP address; default is 8.
Expand Down

0 comments on commit 17569ef

Please sign in to comment.