Skip to content
This repository has been archived by the owner on Aug 19, 2022. It is now read-only.

limit the number of protocols we store per peer #172

Merged
merged 2 commits into from
Oct 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions pstoreds/ds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"

ds "github.com/ipfs/go-datastore"
badger "github.com/ipfs/go-ds-badger"
leveldb "github.com/ipfs/go-ds-leveldb"
Expand All @@ -27,6 +29,18 @@ func TestDsPeerstore(t *testing.T) {
t.Run(name, func(t *testing.T) {
pt.TestPeerstore(t, peerstoreFactory(t, dsFactory, DefaultOpts()))
})

t.Run("protobook limits", func(t *testing.T) {
const limit = 10
opts := DefaultOpts()
opts.MaxProtocols = limit
ds, close := dsFactory(t)
defer close()
ps, err := NewPeerstore(context.Background(), ds, opts)
require.NoError(t, err)
defer ps.Close()
pt.TestPeerstoreProtoStoreLimits(t, ps, limit)
})
}
}

Expand Down
18 changes: 13 additions & 5 deletions pstoreds/peerstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ import (
"io"
"time"

base32 "github.com/multiformats/go-base32"
"github.com/multiformats/go-base32"

ds "github.com/ipfs/go-datastore"
query "github.com/ipfs/go-datastore/query"
"github.com/ipfs/go-datastore/query"

peer "github.com/libp2p/go-libp2p-core/peer"
peerstore "github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
pstore "github.com/libp2p/go-libp2p-peerstore"
)

Expand All @@ -21,6 +21,9 @@ type Options struct {
// The size of the in-memory cache. A value of 0 or lower disables the cache.
CacheSize uint

// MaxProtocols is the maximum number of protocols we store for one peer.
MaxProtocols int

// Sweep interval to purge expired addresses from the datastore. If this is a zero value, GC will not run
// automatically, but it'll be available on demand via explicit calls.
GCPurgeInterval time.Duration
Expand All @@ -37,12 +40,14 @@ type Options struct {
// DefaultOpts returns the default options for a persistent peerstore, with the full-purge GC algorithm:
//
// * Cache size: 1024.
// * MaxProtocols: 1024.
// * GC purge interval: 2 hours.
// * GC lookahead interval: disabled.
// * GC initial delay: 60 seconds.
func DefaultOpts() Options {
return Options{
CacheSize: 1024,
MaxProtocols: 1024,
GCPurgeInterval: 2 * time.Hour,
GCLookaheadInterval: 0,
GCInitialDelay: 60 * time.Second,
Expand Down Expand Up @@ -75,7 +80,10 @@ func NewPeerstore(ctx context.Context, store ds.Batching, opts Options) (*pstore
return nil, err
}

protoBook := NewProtoBook(peerMetadata)
protoBook, err := NewProtoBook(peerMetadata, WithMaxProtocols(opts.MaxProtocols))
if err != nil {
return nil, err
}

ps := &pstoreds{
Metrics: pstore.NewMetrics(),
Expand Down
45 changes: 36 additions & 9 deletions pstoreds/protobook.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package pstoreds

import (
"errors"
"fmt"
"sync"

peer "github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peer"

pstore "github.com/libp2p/go-libp2p-core/peerstore"
)
Expand All @@ -19,39 +20,62 @@ func (s *protoSegments) get(p peer.ID) *protoSegment {
return s[byte(p[len(p)-1])]
}

var errTooManyProtocols = errors.New("too many protocols")

type ProtoBookOption func(*dsProtoBook) error

func WithMaxProtocols(num int) ProtoBookOption {
return func(pb *dsProtoBook) error {
pb.maxProtos = num
return nil
}
}

type dsProtoBook struct {
segments protoSegments
meta pstore.PeerMetadata
segments protoSegments
meta pstore.PeerMetadata
maxProtos int
}

var _ pstore.ProtoBook = (*dsProtoBook)(nil)

func NewProtoBook(meta pstore.PeerMetadata) *dsProtoBook {
return &dsProtoBook{
func NewProtoBook(meta pstore.PeerMetadata, opts ...ProtoBookOption) (*dsProtoBook, error) {
pb := &dsProtoBook{
meta: meta,
segments: func() (ret protoSegments) {
for i := range ret {
ret[i] = &protoSegment{}
}
return ret
}(),
maxProtos: 1024,
}

for _, opt := range opts {
if err := opt(pb); err != nil {
return nil, err
}
}
return pb, nil
}

func (pb *dsProtoBook) SetProtocols(p peer.ID, protos ...string) error {
if err := p.Validate(); err != nil {
return err
}

s := pb.segments.get(p)
s.Lock()
defer s.Unlock()
if len(protos) > pb.maxProtos {
return errTooManyProtocols
}

protomap := make(map[string]struct{}, len(protos))
for _, proto := range protos {
protomap[proto] = struct{}{}
}

s := pb.segments.get(p)
s.Lock()
defer s.Unlock()

return pb.meta.Put(p, "protocols", protomap)
}

Expand All @@ -68,6 +92,9 @@ func (pb *dsProtoBook) AddProtocols(p peer.ID, protos ...string) error {
if err != nil {
return err
}
if len(pmap)+len(protos) > pb.maxProtos {
return errTooManyProtocols
}

for _, proto := range protos {
pmap[proto] = struct{}{}
Expand Down
28 changes: 22 additions & 6 deletions pstoremem/inmem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package pstoremem
import (
"testing"

"github.com/stretchr/testify/require"

pstore "github.com/libp2p/go-libp2p-core/peerstore"
pt "github.com/libp2p/go-libp2p-peerstore/test"

Expand All @@ -13,42 +15,56 @@ func TestFuzzInMemoryPeerstore(t *testing.T) {
// Just create and close a bunch of peerstores. If this leaks, we'll
// catch it in the leak check below.
for i := 0; i < 100; i++ {
ps := NewPeerstore()
ps, err := NewPeerstore()
require.NoError(t, err)
ps.Close()
}
}

func TestInMemoryPeerstore(t *testing.T) {
pt.TestPeerstore(t, func() (pstore.Peerstore, func()) {
ps := NewPeerstore()
ps, err := NewPeerstore()
require.NoError(t, err)
return ps, func() { ps.Close() }
})
}

func TestPeerstoreProtoStoreLimits(t *testing.T) {
const limit = 10
ps, err := NewPeerstore(WithMaxProtocols(limit))
require.NoError(t, err)
defer ps.Close()
pt.TestPeerstoreProtoStoreLimits(t, ps, limit)
}

func TestInMemoryAddrBook(t *testing.T) {
pt.TestAddrBook(t, func() (pstore.AddrBook, func()) {
ps := NewPeerstore()
ps, err := NewPeerstore()
require.NoError(t, err)
return ps, func() { ps.Close() }
})
}

func TestInMemoryKeyBook(t *testing.T) {
pt.TestKeyBook(t, func() (pstore.KeyBook, func()) {
ps := NewPeerstore()
ps, err := NewPeerstore()
require.NoError(t, err)
return ps, func() { ps.Close() }
})
}

func BenchmarkInMemoryPeerstore(b *testing.B) {
pt.BenchmarkPeerstore(b, func() (pstore.Peerstore, func()) {
ps := NewPeerstore()
ps, err := NewPeerstore()
require.NoError(b, err)
return ps, func() { ps.Close() }
}, "InMem")
}

func BenchmarkInMemoryKeyBook(b *testing.B) {
pt.BenchmarkKeyBook(b, func() (pstore.KeyBook, func()) {
ps := NewPeerstore()
ps, err := NewPeerstore()
require.NoError(b, err)
return ps, func() { ps.Close() }
})
}
Expand Down
20 changes: 16 additions & 4 deletions pstoremem/peerstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package pstoremem

import (
"fmt"
"io"

"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
pstore "github.com/libp2p/go-libp2p-peerstore"
"io"
)

type pstoremem struct {
Expand All @@ -17,15 +18,26 @@ type pstoremem struct {
*memoryPeerMetadata
}

func WithMaxProtocols(num int) Option {
return func(pb *memoryProtoBook) error {
pb.maxProtos = num
return nil
}
}

// NewPeerstore creates an in-memory threadsafe collection of peers.
func NewPeerstore() *pstoremem {
func NewPeerstore(opts ...Option) (*pstoremem, error) {
pb, err := NewProtoBook(opts...)
if err != nil {
return nil, err
}
return &pstoremem{
Metrics: pstore.NewMetrics(),
memoryKeyBook: NewKeyBook(),
memoryAddrBook: NewAddrBook(),
memoryProtoBook: NewProtoBook(),
memoryProtoBook: pb,
memoryPeerMetadata: NewPeerMetadata(),
}
}, nil
}

func (ps *pstoremem) Close() (err error) {
Expand Down
35 changes: 27 additions & 8 deletions pstoremem/protobook.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package pstoremem

import (
"errors"
"sync"

peer "github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peer"

pstore "github.com/libp2p/go-libp2p-core/peerstore"
)
Expand All @@ -19,17 +20,23 @@ func (s *protoSegments) get(p peer.ID) *protoSegment {
return s[byte(p[len(p)-1])]
}

var errTooManyProtocols = errors.New("too many protocols")

type Option func(*memoryProtoBook) error

type memoryProtoBook struct {
segments protoSegments

maxProtos int

lk sync.RWMutex
interned map[string]string
}

var _ pstore.ProtoBook = (*memoryProtoBook)(nil)

func NewProtoBook() *memoryProtoBook {
return &memoryProtoBook{
func NewProtoBook(opts ...Option) (*memoryProtoBook, error) {
pb := &memoryProtoBook{
interned: make(map[string]string, 256),
segments: func() (ret protoSegments) {
for i := range ret {
Expand All @@ -39,7 +46,15 @@ func NewProtoBook() *memoryProtoBook {
}
return ret
}(),
maxProtos: 1024,
}

for _, opt := range opts {
if err := opt(pb); err != nil {
return nil, err
}
}
return pb, nil
}

func (pb *memoryProtoBook) internProtocol(proto string) string {
Expand Down Expand Up @@ -70,17 +85,19 @@ func (pb *memoryProtoBook) SetProtocols(p peer.ID, protos ...string) error {
if err := p.Validate(); err != nil {
return err
}

s := pb.segments.get(p)
s.Lock()
defer s.Unlock()
if len(protos) > pb.maxProtos {
return errTooManyProtocols
}

newprotos := make(map[string]struct{}, len(protos))
for _, proto := range protos {
newprotos[pb.internProtocol(proto)] = struct{}{}
}

s := pb.segments.get(p)
s.Lock()
s.protocols[p] = newprotos
s.Unlock()

return nil
}
Expand All @@ -99,11 +116,13 @@ func (pb *memoryProtoBook) AddProtocols(p peer.ID, protos ...string) error {
protomap = make(map[string]struct{})
s.protocols[p] = protomap
}
if len(protomap)+len(protos) > pb.maxProtos {
return errTooManyProtocols
}

for _, proto := range protos {
protomap[pb.internProtocol(proto)] = struct{}{}
}

return nil
}

Expand Down
Loading