Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate Zikade/go-kademlia in v2 #880

Merged
merged 30 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
f88eb9e
WIP
dennis-tra Aug 28, 2023
3898f39
revise protobuf
dennis-tra Aug 29, 2023
d31e9c9
remove gogo protobuf dependency
dennis-tra Aug 30, 2023
d2cfda6
WIP
dennis-tra Aug 30, 2023
f2f22a3
add kadt package
dennis-tra Aug 30, 2023
7e8f1df
Add routing test
dennis-tra Aug 30, 2023
95520ac
add custom zikade dependency
dennis-tra Aug 30, 2023
b2dbd5a
Merge branch 'v2-develop' into zikade-import
dennis-tra Aug 30, 2023
fc2d370
Merge branch 'v2-develop' into zikade-import
dennis-tra Sep 1, 2023
d0c67d3
Import zikade code
iand Sep 1, 2023
55fac36
Remove generics from zikade imported code
iand Sep 4, 2023
7add530
Update to latest go-kademlia
iand Sep 4, 2023
25bcc7f
Cleanup naming of events
iand Sep 4, 2023
fbe3f44
Minor naming cleanup
iand Sep 4, 2023
300e402
Merge pull request #875 from libp2p/zikade-import-id
iand Sep 5, 2023
dca5c5d
Change maintainers for v2 while being developed
iand Sep 5, 2023
afa8051
remove Zikade dependency
dennis-tra Sep 5, 2023
79c7009
Consolidate type parameters
dennis-tra Sep 5, 2023
1d7cdea
Change config test structure
dennis-tra Sep 5, 2023
e8d82f1
use opentelemetry
dennis-tra Sep 6, 2023
165b00b
use convenience attribute methods
dennis-tra Sep 6, 2023
413aefd
let coord package use tele
dennis-tra Sep 6, 2023
53821dd
fix golint warnings
dennis-tra Sep 6, 2023
05e2d90
use clock.Clock
dennis-tra Sep 6, 2023
5d7df1c
add telemetry context tests
dennis-tra Sep 6, 2023
a749dcf
Improve telemetry documentation
dennis-tra Sep 6, 2023
885bd82
Merge branch 'v2-develop' into zikade-import
dennis-tra Sep 7, 2023
5fa755b
fix test race
dennis-tra Sep 7, 2023
6170d5d
fix garbage collection race
dennis-tra Sep 7, 2023
46ff621
Add AddAddresses method to DHT (#879)
iand Sep 7, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,6 @@
# records are IPFS specific
/records.go @libp2p/kubo-maintainers @guillaumemichel
/records_test.go @libp2p/kubo-maintainers @guillaumemichel


/v2/ @dennis-tra @iand
1 change: 1 addition & 0 deletions v2/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
github.com
26 changes: 16 additions & 10 deletions v2/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package dht

import (
"context"
"fmt"

lru "github.com/hashicorp/golang-lru/v2"
"github.com/ipfs/boxo/ipns"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/autobatch"
record "github.com/libp2p/go-libp2p-record"
"github.com/libp2p/go-libp2p/core/peerstore"
)
Expand Down Expand Up @@ -60,9 +60,11 @@ type Backend interface {
// store and fetch IPNS records from the given datastore. The stored and
// returned records must be of type [*recpb.Record]. The cfg parameter can be
// nil, in which case the [DefaultRecordBackendConfig] will be used.
func NewBackendIPNS(ds ds.TxnDatastore, kb peerstore.KeyBook, cfg *RecordBackendConfig) *RecordBackend {
func NewBackendIPNS(ds ds.TxnDatastore, kb peerstore.KeyBook, cfg *RecordBackendConfig) (be *RecordBackend, err error) {
if cfg == nil {
cfg = DefaultRecordBackendConfig()
if cfg, err = DefaultRecordBackendConfig(); err != nil {
return nil, fmt.Errorf("default ipns backend config: %w", err)
}
}

return &RecordBackend{
Expand All @@ -71,16 +73,18 @@ func NewBackendIPNS(ds ds.TxnDatastore, kb peerstore.KeyBook, cfg *RecordBackend
namespace: namespaceIPNS,
datastore: ds,
validator: ipns.Validator{KeyBook: kb},
}
}, nil
}

// NewBackendPublicKey initializes a new backend for the "pk" namespace that can
// store and fetch public key records from the given datastore. The stored and
// returned records must be of type [*recpb.Record]. The cfg parameter can be
// nil, in which case the [DefaultRecordBackendConfig] will be used.
func NewBackendPublicKey(ds ds.TxnDatastore, cfg *RecordBackendConfig) *RecordBackend {
func NewBackendPublicKey(ds ds.TxnDatastore, cfg *RecordBackendConfig) (be *RecordBackend, err error) {
if cfg == nil {
cfg = DefaultRecordBackendConfig()
if cfg, err = DefaultRecordBackendConfig(); err != nil {
return nil, fmt.Errorf("default public key backend config: %w", err)
}
}

return &RecordBackend{
Expand All @@ -89,7 +93,7 @@ func NewBackendPublicKey(ds ds.TxnDatastore, cfg *RecordBackendConfig) *RecordBa
namespace: namespacePublicKey,
datastore: ds,
validator: record.PublicKeyValidator{},
}
}, nil
}

// NewBackendProvider initializes a new backend for the "providers" namespace
Expand All @@ -98,9 +102,11 @@ func NewBackendPublicKey(ds ds.TxnDatastore, cfg *RecordBackendConfig) *RecordBa
// The values returned from [ProvidersBackend.Fetch] will be of type
// [*providerSet] (unexported). The cfg parameter can be nil, in which case the
// [DefaultProviderBackendConfig] will be used.
func NewBackendProvider(pstore peerstore.Peerstore, dstore ds.Batching, cfg *ProvidersBackendConfig) (*ProvidersBackend, error) {
func NewBackendProvider(pstore peerstore.Peerstore, dstore ds.Batching, cfg *ProvidersBackendConfig) (be *ProvidersBackend, err error) {
if cfg == nil {
cfg = DefaultProviderBackendConfig()
if cfg, err = DefaultProviderBackendConfig(); err != nil {
return nil, fmt.Errorf("default provider backend config: %w", err)
}
}

cache, err := lru.New[string, providerSet](cfg.CacheSize)
Expand All @@ -114,7 +120,7 @@ func NewBackendProvider(pstore peerstore.Peerstore, dstore ds.Batching, cfg *Pro
cache: cache,
namespace: namespaceProviders,
addrBook: pstore,
datastore: autobatch.NewAutoBatching(dstore, cfg.BatchSize),
datastore: dstore,
}

return p, nil
Expand Down
40 changes: 21 additions & 19 deletions v2/backend_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,21 @@ import (
"fmt"
"io"
"path"
"strconv"
"strings"
"sync"
"time"

"github.com/benbjohnson/clock"
lru "github.com/hashicorp/golang-lru/v2"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/autobatch"
dsq "github.com/ipfs/go-datastore/query"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/multiformats/go-base32"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.opentelemetry.io/otel/metric"
"golang.org/x/exp/slog"

"github.com/libp2p/go-libp2p-kad-dht/v2/metrics"
"github.com/libp2p/go-libp2p-kad-dht/v2/tele"
)

// ProvidersBackend implements the [Backend] interface and handles provider
Expand All @@ -48,8 +45,9 @@ type ProvidersBackend struct {
// fetch peer multiaddresses from (we don't save them in the datastore).
addrBook peerstore.AddrBook

// datastore is where we save the peer IDs providing a certain multihash
datastore *autobatch.Datastore
// datastore is where we save the peer IDs providing a certain multihash.
// The datastore must be thread-safe.
datastore ds.Datastore

// gcSkip is a sync map that marks records as to-be-skipped by the garbage
// collection process. TODO: this is a sub-optimal pattern.
Expand Down Expand Up @@ -83,9 +81,6 @@ type ProvidersBackendConfig struct {
// requesting peers' side.
AddressTTL time.Duration

// BatchSize specifies how many provider record writes should be batched
BatchSize int

// CacheSize specifies the LRU cache size
CacheSize int

Expand All @@ -95,6 +90,10 @@ type ProvidersBackendConfig struct {
// Logger is the logger to use
Logger *slog.Logger

// Tele holds a reference to the telemetry struct to capture metrics and
// traces.
Tele *tele.Telemetry

// AddressFilter is a filter function that any addresses that we attempt to
// store or fetch from the peerstore's address book need to pass through.
// If you're manually configuring this backend, make sure to align the
Expand All @@ -106,17 +105,22 @@ type ProvidersBackendConfig struct {
// configuration. Use this as a starting point and modify it. If a nil
// configuration is passed to [NewBackendProvider], this default configuration
// here is used.
func DefaultProviderBackendConfig() *ProvidersBackendConfig {
func DefaultProviderBackendConfig() (*ProvidersBackendConfig, error) {
telemetry, err := tele.NewWithGlobalProviders()
if err != nil {
return nil, fmt.Errorf("new telemetry: %w", err)
}

return &ProvidersBackendConfig{
clk: clock.New(),
ProvideValidity: 48 * time.Hour, // empirically measured in: https://github.com/plprobelab/network-measurements/blob/master/results/rfm17-provider-record-liveness.md
AddressTTL: 24 * time.Hour, // MAGIC
BatchSize: 256, // MAGIC
CacheSize: 256, // MAGIC
GCInterval: time.Hour, // MAGIC
Logger: slog.Default(),
Tele: telemetry,
AddressFilter: AddrFilterIdentity, // verify alignment with [Config.AddressFilter]
}
}, nil
}

// Store implements the [Backend] interface. In the case of a [ProvidersBackend]
Expand Down Expand Up @@ -346,13 +350,11 @@ func (p *ProvidersBackend) collectGarbage(ctx context.Context) {

// trackCacheQuery updates the prometheus metrics about cache hit/miss performance
func (p *ProvidersBackend) trackCacheQuery(ctx context.Context, hit bool) {
_ = stats.RecordWithTags(ctx,
[]tag.Mutator{
tag.Upsert(metrics.KeyCacheHit, strconv.FormatBool(hit)),
tag.Upsert(metrics.KeyRecordType, "provider"),
},
metrics.LRUCache.M(1),
set := tele.FromContext(ctx,
tele.AttrCacheHit(hit),
tele.AttrRecordType("provider"),
)
p.cfg.Tele.LRUCache.Add(ctx, 1, metric.WithAttributeSet(set))
}

// delete is a convenience method to delete the record at the given datastore
Expand Down
18 changes: 11 additions & 7 deletions v2/backend_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ func newBackendProvider(t testing.TB, cfg *ProvidersBackendConfig) *ProvidersBac

func TestProvidersBackend_GarbageCollection(t *testing.T) {
mockClock := clock.NewMock()
cfg := DefaultProviderBackendConfig()
cfg, err := DefaultProviderBackendConfig()
require.NoError(t, err)

cfg.clk = mockClock
cfg.Logger = devnull

Expand All @@ -58,28 +60,28 @@ func TestProvidersBackend_GarbageCollection(t *testing.T) {
// write to datastore
dsKey := newDatastoreKey(namespaceProviders, "random-key", string(p.ID))
rec := expiryRecord{expiry: mockClock.Now()}
err := b.datastore.Put(ctx, dsKey, rec.MarshalBinary())
err = b.datastore.Put(ctx, dsKey, rec.MarshalBinary())
require.NoError(t, err)

// write to peerstore
b.addrBook.AddAddrs(p.ID, p.Addrs, time.Hour)

// advance clock half the gc time and check if record is still there
// advance clock half the validity time and check if record is still there
mockClock.Add(cfg.ProvideValidity / 2)

// sync autobatching datastore to have all put/deletes visible
err = b.datastore.Sync(ctx, ds.NewKey(namespaceProviders))
err = b.datastore.Sync(ctx, ds.NewKey(""))
require.NoError(t, err)

// we expect the record to still be there after half the ProvideValidity
_, err = b.datastore.Get(ctx, dsKey)
require.NoError(t, err)

// advance clock another gc time and check if record was GC'd now
// advance clock another time and check if the record was GC'd now
mockClock.Add(cfg.ProvideValidity + cfg.GCInterval)

// sync autobatching datastore to have all put/deletes visible
err = b.datastore.Sync(ctx, ds.NewKey(namespaceProviders))
err = b.datastore.Sync(ctx, ds.NewKey(""))
require.NoError(t, err)

// we expect the record to be GC'd now
Expand All @@ -90,7 +92,9 @@ func TestProvidersBackend_GarbageCollection(t *testing.T) {
}

func TestProvidersBackend_GarbageCollection_lifecycle_thread_safe(t *testing.T) {
cfg := DefaultProviderBackendConfig()
cfg, err := DefaultProviderBackendConfig()
require.NoError(t, err)

cfg.Logger = devnull

b := newBackendProvider(t, cfg)
Expand Down
22 changes: 17 additions & 5 deletions v2/backend_record.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@ import (
"fmt"
"time"

"github.com/benbjohnson/clock"
ds "github.com/ipfs/go-datastore"
record "github.com/libp2p/go-libp2p-record"
recpb "github.com/libp2p/go-libp2p-record/pb"
"golang.org/x/exp/slog"

"github.com/libp2p/go-libp2p-kad-dht/v2/tele"
)

type RecordBackend struct {
Expand All @@ -23,15 +26,24 @@ type RecordBackend struct {
var _ Backend = (*RecordBackend)(nil)

type RecordBackendConfig struct {
clk clock.Clock
MaxRecordAge time.Duration
Logger *slog.Logger
Tele *tele.Telemetry
}

func DefaultRecordBackendConfig() *RecordBackendConfig {
func DefaultRecordBackendConfig() (*RecordBackendConfig, error) {
telemetry, err := tele.NewWithGlobalProviders()
if err != nil {
return nil, fmt.Errorf("new telemetry: %w", err)
}

return &RecordBackendConfig{
clk: clock.New(),
Logger: slog.Default(),
Tele: telemetry,
MaxRecordAge: 48 * time.Hour, // empirically measured in: https://github.com/plprobelab/network-measurements/blob/master/results/rfm17-provider-record-liveness.md
}
}, nil
}

func (r *RecordBackend) Store(ctx context.Context, key string, value any) (any, error) {
Expand Down Expand Up @@ -59,7 +71,7 @@ func (r *RecordBackend) Store(ctx context.Context, key string, value any) (any,
}

// avoid storing arbitrary data, so overwrite that field
rec.TimeReceived = time.Now().UTC().Format(time.RFC3339Nano)
rec.TimeReceived = r.cfg.clk.Now().UTC().Format(time.RFC3339Nano)

data, err := rec.Marshal()
if err != nil {
Expand Down Expand Up @@ -101,7 +113,7 @@ func (r *RecordBackend) Fetch(ctx context.Context, key string) (any, error) {

// validate that we don't serve stale records.
receivedAt, err := time.Parse(time.RFC3339Nano, rec.GetTimeReceived())
if err != nil || time.Since(receivedAt) > r.cfg.MaxRecordAge {
if err != nil || r.cfg.clk.Since(receivedAt) > r.cfg.MaxRecordAge {
errStr := ""
if err != nil {
errStr = err.Error()
Expand All @@ -128,7 +140,7 @@ func (r *RecordBackend) Fetch(ctx context.Context, key string) (any, error) {
// If unmarshalling or validation fails, this function (alongside an error) also
// returns true because the existing record should be replaced.
func (r *RecordBackend) shouldReplaceExistingRecord(ctx context.Context, txn ds.Read, dsKey ds.Key, value []byte) (bool, error) {
ctx, span := tracer.Start(ctx, "DHT.shouldReplaceExistingRecord")
ctx, span := r.cfg.Tele.Tracer.Start(ctx, "RecordBackend.shouldReplaceExistingRecord")
defer span.End()

existingBytes, err := txn.Get(ctx, dsKey)
Expand Down
23 changes: 16 additions & 7 deletions v2/backend_trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,32 @@ import (

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
otel "go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace"
)

// tracedBackend wraps a [Backend] in calls to open telemetry tracing
// directives. In [New] all backends configured in [Config] or automatically
// configured if none are given will be wrapped with this tracedBackend.
type tracedBackend struct {
namespace string // the namespace the backend operates in. Used as a tracing attribute.
backend Backend // the [Backend] to be traced
namespace string // the namespace the backend operates in. Used as a tracing attribute.
backend Backend // the [Backend] to be traced
tracer trace.Tracer // the tracer to be used
}

var _ Backend = (*tracedBackend)(nil)

func traceWrapBackend(namespace string, backend Backend, tracer trace.Tracer) Backend {
return &tracedBackend{
namespace: namespace,
backend: backend,
tracer: tracer,
}
}

// Store implements the [Backend] interface, forwards the call to the wrapped
// backend and manages the trace span.
func (t *tracedBackend) Store(ctx context.Context, key string, value any) (any, error) {
ctx, span := tracer.Start(ctx, "Store", t.traceAttributes(key))
ctx, span := t.tracer.Start(ctx, "Store", t.traceAttributes(key))
defer span.End()

result, err := t.backend.Store(ctx, key, value)
Expand All @@ -36,7 +45,7 @@ func (t *tracedBackend) Store(ctx context.Context, key string, value any) (any,
// Fetch implements the [Backend] interface, forwards the call to the wrapped
// backend and manages the trace span.
func (t *tracedBackend) Fetch(ctx context.Context, key string) (any, error) {
ctx, span := tracer.Start(ctx, "Fetch", t.traceAttributes(key))
ctx, span := t.tracer.Start(ctx, "Fetch", t.traceAttributes(key))
defer span.End()

result, err := t.backend.Fetch(ctx, key)
Expand All @@ -49,6 +58,6 @@ func (t *tracedBackend) Fetch(ctx context.Context, key string) (any, error) {
}

// traceAttributes is a helper to build the trace attributes.
func (t *tracedBackend) traceAttributes(key string) otel.SpanStartEventOption {
return otel.WithAttributes(attribute.String("namespace", t.namespace), attribute.String("key", key))
func (t *tracedBackend) traceAttributes(key string) trace.SpanStartEventOption {
return trace.WithAttributes(attribute.String("namespace", t.namespace), attribute.String("key", key))
}
Loading
Loading