Skip to content

Commit

Permalink
Integrate Zikade/go-kademlia in v2 (#880)
Browse files Browse the repository at this point in the history
* WIP

* revise protobuf

* remove gogo protobuf dependency

* WIP

* add kadt package

* Add routing test

* add custom zikade dependency

* Import zikade code

* Remove generics from zikade imported code

* Update to latest go-kademlia

* Cleanup naming of events

* Minor naming cleanup

* Change maintainers for v2 while being developed

* remove Zikade dependency

* Consolidate type parameters

* Change config test structure

* use opentelemetry

* use convenience attribute methods

* let coord package use tele

* fix golint warnings

* use clock.Clock

* add telemetry context tests

* Improve telemetry documentation

* fix test race

* fix garbage collection race

* Add AddAddresses method to DHT (#879)

* Add AddAddresses method to DHT

* Add AddAddresses method to DHT

* go mod tidy

* Rename Query Skip errors

* go fmt coordinator.go

* Fix test flakes

* Fix lint errors

---------

Co-authored-by: Ian Davis <18375+iand@users.noreply.github.com>
  • Loading branch information
dennis-tra and iand authored Sep 7, 2023
1 parent 2610f89 commit 54f20b5
Show file tree
Hide file tree
Showing 44 changed files with 3,861 additions and 466 deletions.
3 changes: 3 additions & 0 deletions CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,6 @@
# records are IPFS specific
/records.go @libp2p/kubo-maintainers @guillaumemichel
/records_test.go @libp2p/kubo-maintainers @guillaumemichel


/v2/ @dennis-tra @iand
1 change: 1 addition & 0 deletions v2/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
github.com
26 changes: 16 additions & 10 deletions v2/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package dht

import (
"context"
"fmt"

lru "github.com/hashicorp/golang-lru/v2"
"github.com/ipfs/boxo/ipns"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/autobatch"
record "github.com/libp2p/go-libp2p-record"
"github.com/libp2p/go-libp2p/core/peerstore"
)
Expand Down Expand Up @@ -60,9 +60,11 @@ type Backend interface {
// store and fetch IPNS records from the given datastore. The stored and
// returned records must be of type [*recpb.Record]. The cfg parameter can be
// nil, in which case the [DefaultRecordBackendConfig] will be used.
func NewBackendIPNS(ds ds.TxnDatastore, kb peerstore.KeyBook, cfg *RecordBackendConfig) *RecordBackend {
func NewBackendIPNS(ds ds.TxnDatastore, kb peerstore.KeyBook, cfg *RecordBackendConfig) (be *RecordBackend, err error) {
if cfg == nil {
cfg = DefaultRecordBackendConfig()
if cfg, err = DefaultRecordBackendConfig(); err != nil {
return nil, fmt.Errorf("default ipns backend config: %w", err)
}

Check warning on line 67 in v2/backend.go

View check run for this annotation

Codecov / codecov/patch

v2/backend.go#L65-L67

Added lines #L65 - L67 were not covered by tests
}

return &RecordBackend{
Expand All @@ -71,16 +73,18 @@ func NewBackendIPNS(ds ds.TxnDatastore, kb peerstore.KeyBook, cfg *RecordBackend
namespace: namespaceIPNS,
datastore: ds,
validator: ipns.Validator{KeyBook: kb},
}
}, nil
}

// NewBackendPublicKey initializes a new backend for the "pk" namespace that can
// store and fetch public key records from the given datastore. The stored and
// returned records must be of type [*recpb.Record]. The cfg parameter can be
// nil, in which case the [DefaultRecordBackendConfig] will be used.
func NewBackendPublicKey(ds ds.TxnDatastore, cfg *RecordBackendConfig) *RecordBackend {
func NewBackendPublicKey(ds ds.TxnDatastore, cfg *RecordBackendConfig) (be *RecordBackend, err error) {
if cfg == nil {
cfg = DefaultRecordBackendConfig()
if cfg, err = DefaultRecordBackendConfig(); err != nil {
return nil, fmt.Errorf("default public key backend config: %w", err)
}

Check warning on line 87 in v2/backend.go

View check run for this annotation

Codecov / codecov/patch

v2/backend.go#L85-L87

Added lines #L85 - L87 were not covered by tests
}

return &RecordBackend{
Expand All @@ -89,7 +93,7 @@ func NewBackendPublicKey(ds ds.TxnDatastore, cfg *RecordBackendConfig) *RecordBa
namespace: namespacePublicKey,
datastore: ds,
validator: record.PublicKeyValidator{},
}
}, nil
}

// NewBackendProvider initializes a new backend for the "providers" namespace
Expand All @@ -98,9 +102,11 @@ func NewBackendPublicKey(ds ds.TxnDatastore, cfg *RecordBackendConfig) *RecordBa
// The values returned from [ProvidersBackend.Fetch] will be of type
// [*providerSet] (unexported). The cfg parameter can be nil, in which case the
// [DefaultProviderBackendConfig] will be used.
func NewBackendProvider(pstore peerstore.Peerstore, dstore ds.Batching, cfg *ProvidersBackendConfig) (*ProvidersBackend, error) {
func NewBackendProvider(pstore peerstore.Peerstore, dstore ds.Batching, cfg *ProvidersBackendConfig) (be *ProvidersBackend, err error) {
if cfg == nil {
cfg = DefaultProviderBackendConfig()
if cfg, err = DefaultProviderBackendConfig(); err != nil {
return nil, fmt.Errorf("default provider backend config: %w", err)
}

Check warning on line 109 in v2/backend.go

View check run for this annotation

Codecov / codecov/patch

v2/backend.go#L107-L109

Added lines #L107 - L109 were not covered by tests
}

cache, err := lru.New[string, providerSet](cfg.CacheSize)
Expand All @@ -114,7 +120,7 @@ func NewBackendProvider(pstore peerstore.Peerstore, dstore ds.Batching, cfg *Pro
cache: cache,
namespace: namespaceProviders,
addrBook: pstore,
datastore: autobatch.NewAutoBatching(dstore, cfg.BatchSize),
datastore: dstore,
}

return p, nil
Expand Down
40 changes: 21 additions & 19 deletions v2/backend_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,21 @@ import (
"fmt"
"io"
"path"
"strconv"
"strings"
"sync"
"time"

"github.com/benbjohnson/clock"
lru "github.com/hashicorp/golang-lru/v2"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/autobatch"
dsq "github.com/ipfs/go-datastore/query"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/multiformats/go-base32"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.opentelemetry.io/otel/metric"
"golang.org/x/exp/slog"

"github.com/libp2p/go-libp2p-kad-dht/v2/metrics"
"github.com/libp2p/go-libp2p-kad-dht/v2/tele"
)

// ProvidersBackend implements the [Backend] interface and handles provider
Expand All @@ -48,8 +45,9 @@ type ProvidersBackend struct {
// fetch peer multiaddresses from (we don't save them in the datastore).
addrBook peerstore.AddrBook

// datastore is where we save the peer IDs providing a certain multihash
datastore *autobatch.Datastore
// datastore is where we save the peer IDs providing a certain multihash.
// The datastore must be thread-safe.
datastore ds.Datastore

// gcSkip is a sync map that marks records as to-be-skipped by the garbage
// collection process. TODO: this is a sub-optimal pattern.
Expand Down Expand Up @@ -83,9 +81,6 @@ type ProvidersBackendConfig struct {
// requesting peers' side.
AddressTTL time.Duration

// BatchSize specifies how many provider record writes should be batched
BatchSize int

// CacheSize specifies the LRU cache size
CacheSize int

Expand All @@ -95,6 +90,10 @@ type ProvidersBackendConfig struct {
// Logger is the logger to use
Logger *slog.Logger

// Tele holds a reference to the telemetry struct to capture metrics and
// traces.
Tele *tele.Telemetry

// AddressFilter is a filter function that any addresses that we attempt to
// store or fetch from the peerstore's address book need to pass through.
// If you're manually configuring this backend, make sure to align the
Expand All @@ -106,17 +105,22 @@ type ProvidersBackendConfig struct {
// configuration. Use this as a starting point and modify it. If a nil
// configuration is passed to [NewBackendProvider], this default configuration
// here is used.
func DefaultProviderBackendConfig() *ProvidersBackendConfig {
func DefaultProviderBackendConfig() (*ProvidersBackendConfig, error) {
telemetry, err := tele.NewWithGlobalProviders()
if err != nil {
return nil, fmt.Errorf("new telemetry: %w", err)
}

Check warning on line 112 in v2/backend_provider.go

View check run for this annotation

Codecov / codecov/patch

v2/backend_provider.go#L111-L112

Added lines #L111 - L112 were not covered by tests

return &ProvidersBackendConfig{
clk: clock.New(),
ProvideValidity: 48 * time.Hour, // empirically measured in: https://github.com/plprobelab/network-measurements/blob/master/results/rfm17-provider-record-liveness.md
AddressTTL: 24 * time.Hour, // MAGIC
BatchSize: 256, // MAGIC
CacheSize: 256, // MAGIC
GCInterval: time.Hour, // MAGIC
Logger: slog.Default(),
Tele: telemetry,
AddressFilter: AddrFilterIdentity, // verify alignment with [Config.AddressFilter]
}
}, nil
}

// Store implements the [Backend] interface. In the case of a [ProvidersBackend]
Expand Down Expand Up @@ -346,13 +350,11 @@ func (p *ProvidersBackend) collectGarbage(ctx context.Context) {

// trackCacheQuery updates the prometheus metrics about cache hit/miss performance
func (p *ProvidersBackend) trackCacheQuery(ctx context.Context, hit bool) {
_ = stats.RecordWithTags(ctx,
[]tag.Mutator{
tag.Upsert(metrics.KeyCacheHit, strconv.FormatBool(hit)),
tag.Upsert(metrics.KeyRecordType, "provider"),
},
metrics.LRUCache.M(1),
set := tele.FromContext(ctx,
tele.AttrCacheHit(hit),
tele.AttrRecordType("provider"),
)
p.cfg.Tele.LRUCache.Add(ctx, 1, metric.WithAttributeSet(set))
}

// delete is a convenience method to delete the record at the given datastore
Expand Down
18 changes: 11 additions & 7 deletions v2/backend_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ func newBackendProvider(t testing.TB, cfg *ProvidersBackendConfig) *ProvidersBac

func TestProvidersBackend_GarbageCollection(t *testing.T) {
mockClock := clock.NewMock()
cfg := DefaultProviderBackendConfig()
cfg, err := DefaultProviderBackendConfig()
require.NoError(t, err)

cfg.clk = mockClock
cfg.Logger = devnull

Expand All @@ -58,28 +60,28 @@ func TestProvidersBackend_GarbageCollection(t *testing.T) {
// write to datastore
dsKey := newDatastoreKey(namespaceProviders, "random-key", string(p.ID))
rec := expiryRecord{expiry: mockClock.Now()}
err := b.datastore.Put(ctx, dsKey, rec.MarshalBinary())
err = b.datastore.Put(ctx, dsKey, rec.MarshalBinary())
require.NoError(t, err)

// write to peerstore
b.addrBook.AddAddrs(p.ID, p.Addrs, time.Hour)

// advance clock half the gc time and check if record is still there
// advance clock half the validity time and check if record is still there
mockClock.Add(cfg.ProvideValidity / 2)

// sync autobatching datastore to have all put/deletes visible
err = b.datastore.Sync(ctx, ds.NewKey(namespaceProviders))
err = b.datastore.Sync(ctx, ds.NewKey(""))
require.NoError(t, err)

// we expect the record to still be there after half the ProvideValidity
_, err = b.datastore.Get(ctx, dsKey)
require.NoError(t, err)

// advance clock another gc time and check if record was GC'd now
// advance clock another time and check if the record was GC'd now
mockClock.Add(cfg.ProvideValidity + cfg.GCInterval)

// sync autobatching datastore to have all put/deletes visible
err = b.datastore.Sync(ctx, ds.NewKey(namespaceProviders))
err = b.datastore.Sync(ctx, ds.NewKey(""))
require.NoError(t, err)

// we expect the record to be GC'd now
Expand All @@ -90,7 +92,9 @@ func TestProvidersBackend_GarbageCollection(t *testing.T) {
}

func TestProvidersBackend_GarbageCollection_lifecycle_thread_safe(t *testing.T) {
cfg := DefaultProviderBackendConfig()
cfg, err := DefaultProviderBackendConfig()
require.NoError(t, err)

cfg.Logger = devnull

b := newBackendProvider(t, cfg)
Expand Down
22 changes: 17 additions & 5 deletions v2/backend_record.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@ import (
"fmt"
"time"

"github.com/benbjohnson/clock"
ds "github.com/ipfs/go-datastore"
record "github.com/libp2p/go-libp2p-record"
recpb "github.com/libp2p/go-libp2p-record/pb"
"golang.org/x/exp/slog"

"github.com/libp2p/go-libp2p-kad-dht/v2/tele"
)

type RecordBackend struct {
Expand All @@ -23,15 +26,24 @@ type RecordBackend struct {
var _ Backend = (*RecordBackend)(nil)

type RecordBackendConfig struct {
clk clock.Clock
MaxRecordAge time.Duration
Logger *slog.Logger
Tele *tele.Telemetry
}

func DefaultRecordBackendConfig() *RecordBackendConfig {
func DefaultRecordBackendConfig() (*RecordBackendConfig, error) {
telemetry, err := tele.NewWithGlobalProviders()
if err != nil {
return nil, fmt.Errorf("new telemetry: %w", err)
}

Check warning on line 39 in v2/backend_record.go

View check run for this annotation

Codecov / codecov/patch

v2/backend_record.go#L38-L39

Added lines #L38 - L39 were not covered by tests

return &RecordBackendConfig{
clk: clock.New(),
Logger: slog.Default(),
Tele: telemetry,
MaxRecordAge: 48 * time.Hour, // empirically measured in: https://github.com/plprobelab/network-measurements/blob/master/results/rfm17-provider-record-liveness.md
}
}, nil
}

func (r *RecordBackend) Store(ctx context.Context, key string, value any) (any, error) {
Expand Down Expand Up @@ -59,7 +71,7 @@ func (r *RecordBackend) Store(ctx context.Context, key string, value any) (any,
}

// avoid storing arbitrary data, so overwrite that field
rec.TimeReceived = time.Now().UTC().Format(time.RFC3339Nano)
rec.TimeReceived = r.cfg.clk.Now().UTC().Format(time.RFC3339Nano)

data, err := rec.Marshal()
if err != nil {
Expand Down Expand Up @@ -101,7 +113,7 @@ func (r *RecordBackend) Fetch(ctx context.Context, key string) (any, error) {

// validate that we don't serve stale records.
receivedAt, err := time.Parse(time.RFC3339Nano, rec.GetTimeReceived())
if err != nil || time.Since(receivedAt) > r.cfg.MaxRecordAge {
if err != nil || r.cfg.clk.Since(receivedAt) > r.cfg.MaxRecordAge {
errStr := ""
if err != nil {
errStr = err.Error()
Expand All @@ -128,7 +140,7 @@ func (r *RecordBackend) Fetch(ctx context.Context, key string) (any, error) {
// If unmarshalling or validation fails, this function (alongside an error) also
// returns true because the existing record should be replaced.
func (r *RecordBackend) shouldReplaceExistingRecord(ctx context.Context, txn ds.Read, dsKey ds.Key, value []byte) (bool, error) {
ctx, span := tracer.Start(ctx, "DHT.shouldReplaceExistingRecord")
ctx, span := r.cfg.Tele.Tracer.Start(ctx, "RecordBackend.shouldReplaceExistingRecord")
defer span.End()

existingBytes, err := txn.Get(ctx, dsKey)
Expand Down
23 changes: 16 additions & 7 deletions v2/backend_trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,32 @@ import (

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
otel "go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace"
)

// tracedBackend wraps a [Backend] in calls to open telemetry tracing
// directives. In [New] all backends configured in [Config] or automatically
// configured if none are given will be wrapped with this tracedBackend.
type tracedBackend struct {
namespace string // the namespace the backend operates in. Used as a tracing attribute.
backend Backend // the [Backend] to be traced
namespace string // the namespace the backend operates in. Used as a tracing attribute.
backend Backend // the [Backend] to be traced
tracer trace.Tracer // the tracer to be used
}

var _ Backend = (*tracedBackend)(nil)

func traceWrapBackend(namespace string, backend Backend, tracer trace.Tracer) Backend {
return &tracedBackend{
namespace: namespace,
backend: backend,
tracer: tracer,
}
}

// Store implements the [Backend] interface, forwards the call to the wrapped
// backend and manages the trace span.
func (t *tracedBackend) Store(ctx context.Context, key string, value any) (any, error) {
ctx, span := tracer.Start(ctx, "Store", t.traceAttributes(key))
ctx, span := t.tracer.Start(ctx, "Store", t.traceAttributes(key))
defer span.End()

result, err := t.backend.Store(ctx, key, value)
Expand All @@ -36,7 +45,7 @@ func (t *tracedBackend) Store(ctx context.Context, key string, value any) (any,
// Fetch implements the [Backend] interface, forwards the call to the wrapped
// backend and manages the trace span.
func (t *tracedBackend) Fetch(ctx context.Context, key string) (any, error) {
ctx, span := tracer.Start(ctx, "Fetch", t.traceAttributes(key))
ctx, span := t.tracer.Start(ctx, "Fetch", t.traceAttributes(key))
defer span.End()

result, err := t.backend.Fetch(ctx, key)
Expand All @@ -49,6 +58,6 @@ func (t *tracedBackend) Fetch(ctx context.Context, key string) (any, error) {
}

// traceAttributes is a helper to build the trace attributes.
func (t *tracedBackend) traceAttributes(key string) otel.SpanStartEventOption {
return otel.WithAttributes(attribute.String("namespace", t.namespace), attribute.String("key", key))
func (t *tracedBackend) traceAttributes(key string) trace.SpanStartEventOption {
return trace.WithAttributes(attribute.String("namespace", t.namespace), attribute.String("key", key))
}
Loading

0 comments on commit 54f20b5

Please sign in to comment.