diff --git a/v2/relay/constraints.go b/v2/relay/constraints.go new file mode 100644 index 0000000..7c40d29 --- /dev/null +++ b/v2/relay/constraints.go @@ -0,0 +1,187 @@ +package relay + +import ( + crand "crypto/rand" + "encoding/binary" + "errors" + "math/rand" + "sync" + "time" + + asnutil "github.com/libp2p/go-libp2p-asn-util" + "github.com/libp2p/go-libp2p-core/peer" + ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" +) + +var cleanupInterval = 2 * time.Minute +var validity = 30 * time.Minute + +var ( + errTooManyReservations = errors.New("too many reservations") + errTooManyReservationsForPeer = errors.New("too many reservations for peer") + errTooManyReservationsForIP = errors.New("too many peers for IP address") + errTooManyReservationsForASN = errors.New("too many peers for ASN") +) + +// constraints implements various reservation constraints +type constraints struct { + rc *Resources + + closed bool + closing, cleanupRunning chan struct{} + + mutex sync.Mutex + rand rand.Rand + total map[uint64]time.Time + peers map[peer.ID]map[uint64]time.Time + ips map[string]map[uint64]time.Time + asns map[string]map[uint64]time.Time +} + +// NewConstraints creates a new constraints object. +// The methods are *not* thread-safe; an external lock must be held if synchronization +// is required. +func NewConstraints(rc *Resources) *constraints { + b := make([]byte, 8) + crand.Read(b) + random := rand.New(rand.NewSource(int64(binary.BigEndian.Uint64(b)))) + + c := &constraints{ + rc: rc, + closing: make(chan struct{}), + cleanupRunning: make(chan struct{}), + rand: *random, + total: make(map[uint64]time.Time), + peers: make(map[peer.ID]map[uint64]time.Time), + ips: make(map[string]map[uint64]time.Time), + asns: make(map[string]map[uint64]time.Time), + } + go c.cleanup() + return c +} + +// AddReservation adds a reservation for a given peer with a given multiaddr. +// If adding this reservation violates IP constraints, an error is returned. +func (c *constraints) AddReservation(p peer.ID, a ma.Multiaddr) error { + c.mutex.Lock() + defer c.mutex.Unlock() + + if len(c.total) >= c.rc.MaxReservations { + return errTooManyReservations + } + + ip, err := manet.ToIP(a) + if err != nil { + return errors.New("no IP address associated with peer") + } + + peerReservations := c.peers[p] + if len(peerReservations) >= c.rc.MaxReservationsPerPeer { + return errTooManyReservationsForPeer + } + + ipStr := ip.String() + ipReservations := c.ips[ipStr] + if len(ipReservations) >= c.rc.MaxReservationsPerIP { + return errTooManyReservationsForIP + } + + var ansReservations map[uint64]time.Time + var asn string + if ip.To4() == nil { + asn, _ = asnutil.Store.AsnForIPv6(ip) + if asn != "" { + ansReservations = c.asns[asn] + if len(ansReservations) >= c.rc.MaxReservationsPerASN { + return errTooManyReservationsForASN + } + } + } + + now := time.Now() + id := c.rand.Uint64() + + c.total[id] = now + + if peerReservations == nil { + peerReservations = make(map[uint64]time.Time) + c.peers[p] = peerReservations + } + peerReservations[id] = now + + if ipReservations == nil { + ipReservations = make(map[uint64]time.Time) + c.ips[ipStr] = ipReservations + } + ipReservations[id] = now + + if asn != "" { + if ansReservations == nil { + ansReservations = make(map[uint64]time.Time) + c.asns[asn] = ansReservations + } + ansReservations[id] = now + } + + return nil +} + +func (c *constraints) cleanup() { + defer close(c.cleanupRunning) + closeChan := c.closing + ticker := time.NewTicker(cleanupInterval) + defer ticker.Stop() + for { + select { + case <-closeChan: + return + case now := <-ticker.C: + c.mutex.Lock() + for id, t := range c.total { + if t.Add(validity).Before(now) { + delete(c.total, id) + } + } + for p, values := range c.peers { + for id, t := range values { + if t.Add(validity).Before(now) { + delete(values, id) + } + } + if len(values) == 0 { + delete(c.peers, p) + } + } + for ip, values := range c.ips { + for id, t := range values { + if t.Add(validity).Before(now) { + delete(values, id) + } + } + if len(values) == 0 { + delete(c.ips, ip) + } + } + for asn, values := range c.asns { + for id, t := range values { + if t.Add(validity).Before(now) { + delete(values, id) + } + } + if len(values) == 0 { + delete(c.asns, asn) + } + } + c.mutex.Unlock() + } + } +} + +func (c *constraints) Close() { + if !c.closed { + close(c.closing) + c.closed = true + <-c.cleanupRunning + } +} diff --git a/v2/relay/constraints_test.go b/v2/relay/constraints_test.go new file mode 100644 index 0000000..47ddaa3 --- /dev/null +++ b/v2/relay/constraints_test.go @@ -0,0 +1,152 @@ +package relay + +import ( + "crypto/rand" + "fmt" + "math" + "net" + "testing" + "time" + + "github.com/libp2p/go-libp2p-core/test" + ma "github.com/multiformats/go-multiaddr" +) + +func randomIPv4Addr(t *testing.T) ma.Multiaddr { + t.Helper() + b := make([]byte, 4) + rand.Read(b) + addr, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/1234", net.IP(b))) + if err != nil { + t.Fatal(err) + } + return addr +} + +func TestConstraints(t *testing.T) { + infResources := func() *Resources { + return &Resources{ + MaxReservations: math.MaxInt32, + MaxReservationsPerPeer: math.MaxInt32, + MaxReservationsPerIP: math.MaxInt32, + MaxReservationsPerASN: math.MaxInt32, + } + } + const limit = 7 + + t.Run("total reservations", func(t *testing.T) { + res := infResources() + res.MaxReservations = limit + c := NewConstraints(res) + defer c.Close() + for i := 0; i < limit; i++ { + if err := c.AddReservation(test.RandPeerIDFatal(t), randomIPv4Addr(t)); err != nil { + t.Fatal(err) + } + } + if err := c.AddReservation(test.RandPeerIDFatal(t), randomIPv4Addr(t)); err != errTooManyReservations { + t.Fatalf("expected to run into total reservation limit, got %v", err) + } + }) + + t.Run("reservations per peer", func(t *testing.T) { + p := test.RandPeerIDFatal(t) + res := infResources() + res.MaxReservationsPerPeer = limit + c := NewConstraints(res) + defer c.Close() + for i := 0; i < limit; i++ { + if err := c.AddReservation(p, randomIPv4Addr(t)); err != nil { + t.Fatal(err) + } + } + if err := c.AddReservation(p, randomIPv4Addr(t)); err != errTooManyReservationsForPeer { + t.Fatalf("expected to run into total reservation limit, got %v", err) + } + if err := c.AddReservation(test.RandPeerIDFatal(t), randomIPv4Addr(t)); err != nil { + t.Fatalf("expected reservation for different peer to be possible, got %v", err) + } + }) + + t.Run("reservations per IP", func(t *testing.T) { + ip := randomIPv4Addr(t) + res := infResources() + res.MaxReservationsPerIP = limit + c := NewConstraints(res) + defer c.Close() + for i := 0; i < limit; i++ { + if err := c.AddReservation(test.RandPeerIDFatal(t), ip); err != nil { + t.Fatal(err) + } + } + if err := c.AddReservation(test.RandPeerIDFatal(t), ip); err != errTooManyReservationsForIP { + t.Fatalf("expected to run into total reservation limit, got %v", err) + } + if err := c.AddReservation(test.RandPeerIDFatal(t), randomIPv4Addr(t)); err != nil { + t.Fatalf("expected reservation for different IP to be possible, got %v", err) + } + }) + + t.Run("reservations per ASN", func(t *testing.T) { + getAddr := func(t *testing.T, ip net.IP) ma.Multiaddr { + t.Helper() + addr, err := ma.NewMultiaddr(fmt.Sprintf("/ip6/%s/tcp/1234", ip)) + if err != nil { + t.Fatal(err) + } + return addr + } + + res := infResources() + res.MaxReservationsPerASN = limit + c := NewConstraints(res) + defer c.Close() + const ipv6Prefix = "2a03:2880:f003:c07:face:b00c::" + for i := 0; i < limit; i++ { + addr := getAddr(t, net.ParseIP(fmt.Sprintf("%s%d", ipv6Prefix, i+1))) + if err := c.AddReservation(test.RandPeerIDFatal(t), addr); err != nil { + t.Fatal(err) + } + } + if err := c.AddReservation(test.RandPeerIDFatal(t), getAddr(t, net.ParseIP(fmt.Sprintf("%s%d", ipv6Prefix, 42)))); err != errTooManyReservationsForASN { + t.Fatalf("expected to run into total reservation limit, got %v", err) + } + if err := c.AddReservation(test.RandPeerIDFatal(t), randomIPv4Addr(t)); err != nil { + t.Fatalf("expected reservation for different IP to be possible, got %v", err) + } + }) +} + +func TestConstraintsCleanup(t *testing.T) { + origValidity := validity + origCleanupInterval := cleanupInterval + defer func() { + validity = origValidity + cleanupInterval = origCleanupInterval + }() + validity = 500 * time.Millisecond + cleanupInterval = validity / 10 + + const limit = 7 + res := &Resources{ + MaxReservations: limit, + MaxReservationsPerPeer: math.MaxInt32, + MaxReservationsPerIP: math.MaxInt32, + MaxReservationsPerASN: math.MaxInt32, + } + c := NewConstraints(res) + defer c.Close() + for i := 0; i < limit; i++ { + if err := c.AddReservation(test.RandPeerIDFatal(t), randomIPv4Addr(t)); err != nil { + t.Fatal(err) + } + } + if err := c.AddReservation(test.RandPeerIDFatal(t), randomIPv4Addr(t)); err != errTooManyReservations { + t.Fatalf("expected to run into total reservation limit, got %v", err) + } + + time.Sleep(validity + 2*cleanupInterval) + if err := c.AddReservation(test.RandPeerIDFatal(t), randomIPv4Addr(t)); err != nil { + t.Fatalf("expected old reservations to have been garbage collected, %v", err) + } +} diff --git a/v2/relay/ipcs.go b/v2/relay/ipcs.go deleted file mode 100644 index 167e08e..0000000 --- a/v2/relay/ipcs.go +++ /dev/null @@ -1,110 +0,0 @@ -package relay - -import ( - "errors" - "net" - - "github.com/libp2p/go-libp2p-core/peer" - - asnutil "github.com/libp2p/go-libp2p-asn-util" - ma "github.com/multiformats/go-multiaddr" - manet "github.com/multiformats/go-multiaddr/net" -) - -var ( - ErrNoIP = errors.New("no IP address associated with peer") - ErrTooManyPeersInIP = errors.New("too many peers in IP address") - ErrTooManyPeersInASN = errors.New("too many peers in ASN") -) - -// IPConstraints implements reservation constraints per IP -type IPConstraints struct { - iplimit, asnlimit int - - peers map[peer.ID]net.IP - ips map[string]map[peer.ID]struct{} - asns map[string]map[peer.ID]struct{} -} - -// NewIPConstraints creates a new IPConstraints object. -// The methods are *not* thread-safe; an external lock must be held if synchronization -// is required. -func NewIPConstraints(rc Resources) *IPConstraints { - return &IPConstraints{ - iplimit: rc.MaxReservationsPerIP, - asnlimit: rc.MaxReservationsPerASN, - - peers: make(map[peer.ID]net.IP), - ips: make(map[string]map[peer.ID]struct{}), - asns: make(map[string]map[peer.ID]struct{}), - } -} - -// AddReservation adds a reservation for a given peer with a given multiaddr. -// If adding this reservation violates IP constraints, an error is returned. -func (ipcs *IPConstraints) AddReservation(p peer.ID, a ma.Multiaddr) error { - ip, err := manet.ToIP(a) - if err != nil { - return ErrNoIP - } - - ips := ip.String() - peersInIP := ipcs.ips[ips] - if len(peersInIP) >= ipcs.iplimit { - return ErrTooManyPeersInIP - } - - var peersInAsn map[peer.ID]struct{} - asn, _ := asnutil.Store.AsnForIPv6(ip) - peersInAsn = ipcs.asns[asn] - if len(peersInAsn) >= ipcs.asnlimit { - return ErrTooManyPeersInASN - } - - ipcs.peers[p] = ip - - if peersInIP == nil { - peersInIP = make(map[peer.ID]struct{}) - ipcs.ips[ips] = peersInIP - } - peersInIP[p] = struct{}{} - - if asn != "" { - if peersInAsn == nil { - peersInAsn = make(map[peer.ID]struct{}) - ipcs.asns[asn] = peersInAsn - } - peersInAsn[p] = struct{}{} - } - - return nil -} - -// RemoveReservation removes a peer from the constraints. -func (ipcs *IPConstraints) RemoveReservation(p peer.ID) { - ip, ok := ipcs.peers[p] - if !ok { - return - } - - ips := ip.String() - asn, _ := asnutil.Store.AsnForIPv6(ip) - - delete(ipcs.peers, p) - - peersInIP, ok := ipcs.ips[ips] - if ok { - delete(peersInIP, p) - if len(peersInIP) == 0 { - delete(ipcs.ips, ips) - } - } - - peersInAsn, ok := ipcs.asns[asn] - if ok { - delete(peersInAsn, p) - if len(peersInAsn) == 0 { - delete(ipcs.asns, asn) - } - } -} diff --git a/v2/relay/relay.go b/v2/relay/relay.go index f1fa5ad..48122d4 100644 --- a/v2/relay/relay.go +++ b/v2/relay/relay.go @@ -41,10 +41,10 @@ type Relay struct { ctx context.Context cancel func() - host host.Host - rc Resources - acl ACLFilter - ipcs *IPConstraints + host host.Host + rc Resources + acl ACLFilter + constraints *constraints mx sync.Mutex rsvp map[peer.ID]time.Time @@ -74,7 +74,7 @@ func New(h host.Host, opts ...Option) (*Relay, error) { } } - r.ipcs = NewIPConstraints(r.rc) + r.constraints = NewConstraints(&r.rc) r.selfAddr = ma.StringCast(fmt.Sprintf("/p2p/%s", h.ID())) h.SetStreamHandler(proto.ProtoIDv2Hop, r.handleStream) @@ -95,6 +95,7 @@ func (r *Relay) Close() error { for p := range r.rsvp { r.host.ConnManager().UntagPeer(p, "relay-reservation") } + r.constraints.Close() r.mx.Unlock() } return nil @@ -153,14 +154,7 @@ func (r *Relay) handleReserve(s network.Stream, msg *pbv2.HopMessage) { _, exists := r.rsvp[p] if !exists { - active := len(r.rsvp) - if active >= r.rc.MaxReservations { - r.mx.Unlock() - log.Debugf("refusing relay reservation for %s; too many reservations", p) - r.handleError(s, pbv2.Status_RESERVATION_REFUSED) - return - } - if err := r.ipcs.AddReservation(p, a); err != nil { + if err := r.constraints.AddReservation(p, a); err != nil { r.mx.Unlock() log.Debugf("refusing relay reservation for %s; IP constraint violation: %s", p, err) r.handleError(s, pbv2.Status_RESERVATION_REFUSED) @@ -181,7 +175,6 @@ func (r *Relay) handleReserve(s network.Stream, msg *pbv2.HopMessage) { log.Debugf("error writing reservation response; retracting reservation for %s", p) r.mx.Lock() delete(r.rsvp, p) - r.ipcs.RemoveReservation(p) r.host.ConnManager().UntagPeer(p, "relay-reservation") r.mx.Unlock() } @@ -496,7 +489,6 @@ func (r *Relay) gc() { for p, expire := range r.rsvp { if expire.Before(now) { delete(r.rsvp, p) - r.ipcs.RemoveReservation(p) r.host.ConnManager().UntagPeer(p, "relay-reservation") } } @@ -518,5 +510,4 @@ func (r *Relay) disconnected(n network.Network, c network.Conn) { defer r.mx.Unlock() delete(r.rsvp, p) - r.ipcs.RemoveReservation(p) } diff --git a/v2/relay/resources.go b/v2/relay/resources.go index 345b527..4026a00 100644 --- a/v2/relay/resources.go +++ b/v2/relay/resources.go @@ -20,6 +20,9 @@ type Resources struct { // BufferSize is the size of the relayed connection buffers; defaults to 2048. BufferSize int + // MaxReservationsPerPeer is the maximum number of reservations originating from the same + // peer; default is 8. + MaxReservationsPerPeer int // MaxReservationsPerIP is the maximum number of reservations originating from the same // IP address; default is 4. MaxReservationsPerIP int @@ -48,8 +51,9 @@ func DefaultResources() Resources { MaxCircuits: 16, BufferSize: 2048, - MaxReservationsPerIP: 4, - MaxReservationsPerASN: 32, + MaxReservationsPerPeer: 8, + MaxReservationsPerIP: 4, + MaxReservationsPerASN: 32, } } diff --git a/v2/test/ipcs_test.go b/v2/test/ipcs_test.go deleted file mode 100644 index 09a1a9b..0000000 --- a/v2/test/ipcs_test.go +++ /dev/null @@ -1,69 +0,0 @@ -package test - -import ( - "fmt" - "net" - "testing" - - "github.com/libp2p/go-libp2p-circuit/v2/relay" - - "github.com/libp2p/go-libp2p-core/peer" - - ma "github.com/multiformats/go-multiaddr" -) - -func TestIPConstraints(t *testing.T) { - ipcs := relay.NewIPConstraints(relay.Resources{ - MaxReservationsPerIP: 1, - MaxReservationsPerASN: 2, - }) - - peerA := peer.ID("A") - peerB := peer.ID("B") - peerC := peer.ID("C") - peerD := peer.ID("D") - peerE := peer.ID("E") - - ipA := net.ParseIP("1.2.3.4") - ipB := ipA - ipC := net.ParseIP("2001:200::1") - ipD := net.ParseIP("2001:200::2") - ipE := net.ParseIP("2001:200::3") - - err := ipcs.AddReservation(peerA, ma.StringCast(fmt.Sprintf("/ip4/%s/tcp/1234", ipA))) - if err != nil { - t.Fatal(err) - } - - err = ipcs.AddReservation(peerB, ma.StringCast(fmt.Sprintf("/ip4/%s/tcp/1234", ipB))) - if err != relay.ErrTooManyPeersInIP { - t.Fatalf("unexpected error: %s", err) - } - - ipcs.RemoveReservation(peerA) - err = ipcs.AddReservation(peerB, ma.StringCast(fmt.Sprintf("/ip4/%s/tcp/1234", ipB))) - if err != nil { - t.Fatal(err) - } - - err = ipcs.AddReservation(peerC, ma.StringCast(fmt.Sprintf("/ip6/%s/tcp/1234", ipC))) - if err != nil { - t.Fatal(err) - } - - err = ipcs.AddReservation(peerD, ma.StringCast(fmt.Sprintf("/ip6/%s/tcp/1234", ipD))) - if err != nil { - t.Fatal(err) - } - - err = ipcs.AddReservation(peerE, ma.StringCast(fmt.Sprintf("/ip6/%s/tcp/1234", ipE))) - if err != relay.ErrTooManyPeersInASN { - t.Fatalf("unexpected error: %s", err) - } - - ipcs.RemoveReservation(peerD) - err = ipcs.AddReservation(peerE, ma.StringCast(fmt.Sprintf("/ip6/%s/tcp/1234", ipE))) - if err != nil { - t.Fatal(err) - } -}