Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve query capabilities #932

Merged
merged 5 commits into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 121 additions & 22 deletions v2/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
"go.uber.org/zap/exp/zapslog"
"golang.org/x/exp/slog"

"github.com/libp2p/go-libp2p-kad-dht/v2/coord"
"github.com/libp2p/go-libp2p-kad-dht/v2/coord/routing"
"github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/routing"
"github.com/libp2p/go-libp2p-kad-dht/v2/kadt"
)

Expand Down Expand Up @@ -112,8 +111,8 @@
// between both automatically (see ModeOpt).
Mode ModeOpt

// Kademlia holds the configuration of the underlying Kademlia implementation.
Kademlia *coord.CoordinatorConfig
// Query holds the configuration used for queries managed by the DHT.
Query *QueryConfig

// BucketSize determines the number of closer peers to return
BucketSize int
Expand All @@ -132,7 +131,7 @@
// [triert.TrieRT] routing table will be used. This field will be nil
// in the default configuration because a routing table requires information
// about the local node.
RoutingTable routing.RoutingTableCpl[kadt.Key, kadt.PeerID]
RoutingTable kadt.RoutingTable

// The Backends field holds a map of key namespaces to their corresponding
// backend implementation. For example, if we received an IPNS record, the
Expand Down Expand Up @@ -193,7 +192,6 @@
return &Config{
Clock: clock.New(),
Mode: ModeOptAutoClient,
Kademlia: coord.DefaultCoordinatorConfig(),
BucketSize: 20, // MAGIC
BootstrapPeers: DefaultBootstrapPeers(),
ProtocolID: ProtocolIPFS,
Expand All @@ -205,6 +203,7 @@
AddressFilter: AddrFilterPrivate,
MeterProvider: otel.GetMeterProvider(),
TracerProvider: otel.GetTracerProvider(),
Query: DefaultQueryConfig(),
}
}

Expand Down Expand Up @@ -242,62 +241,104 @@
return fmt.Errorf("invalid mode option: %s", c.Mode)
}

if c.Kademlia == nil {
return fmt.Errorf("kademlia configuration must not be nil")
if c.Query == nil {
return &ConfigurationError{
Component: "Config",
Err: fmt.Errorf("query configuration must not be nil"),
}
}

if err := c.Kademlia.Validate(); err != nil {
return fmt.Errorf("invalid kademlia configuration: %w", err)
if err := c.Query.Validate(); err != nil {
return &ConfigurationError{
Component: "Config",
Err: fmt.Errorf("invalid query configuration: %w", err),
}
}

if c.BucketSize == 0 {
return fmt.Errorf("bucket size must not be 0")
return &ConfigurationError{
Component: "Config",
Err: fmt.Errorf("bucket size must not be 0"),
}
}

if len(c.BootstrapPeers) == 0 {
return fmt.Errorf("no bootstrap peer")
return &ConfigurationError{
Component: "Config",
Err: fmt.Errorf("no bootstrap peer"),
}
}

if c.ProtocolID == "" {
return fmt.Errorf("protocolID must not be empty")
return &ConfigurationError{
Component: "Config",
Err: fmt.Errorf("protocolID must not be empty"),
}
}

if c.Logger == nil {
return fmt.Errorf("logger must not be nil")
return &ConfigurationError{
Component: "Config",
Err: fmt.Errorf("logger must not be nil"),
}
}

if c.TimeoutStreamIdle <= 0 {
return fmt.Errorf("stream idle timeout must be a positive duration")
return &ConfigurationError{
Component: "Config",
Err: fmt.Errorf("stream idle timeout must be a positive duration"),
}
}

if c.ProtocolID == ProtocolIPFS && len(c.Backends) != 0 {
if len(c.Backends) != 3 {
return fmt.Errorf("ipfs protocol requires exactly three backends")
return &ConfigurationError{
Component: "Config",
Err: fmt.Errorf("ipfs protocol requires exactly three backends"),
}
}

if _, found := c.Backends[namespaceIPNS]; !found {
return fmt.Errorf("ipfs protocol requires an IPNS backend")
return &ConfigurationError{
Component: "Config",
Err: fmt.Errorf("ipfs protocol requires an IPNS backend"),
}

Check warning on line 305 in v2/config.go

View check run for this annotation

Codecov / codecov/patch

v2/config.go#L302-L305

Added lines #L302 - L305 were not covered by tests
}

if _, found := c.Backends[namespacePublicKey]; !found {
return fmt.Errorf("ipfs protocol requires a public key backend")
return &ConfigurationError{
Component: "Config",
Err: fmt.Errorf("ipfs protocol requires a public key backend"),
}

Check warning on line 312 in v2/config.go

View check run for this annotation

Codecov / codecov/patch

v2/config.go#L309-L312

Added lines #L309 - L312 were not covered by tests
}

if _, found := c.Backends[namespaceProviders]; !found {
return fmt.Errorf("ipfs protocol requires a providers backend")
return &ConfigurationError{
Component: "Config",
Err: fmt.Errorf("ipfs protocol requires a providers backend"),
}

Check warning on line 319 in v2/config.go

View check run for this annotation

Codecov / codecov/patch

v2/config.go#L316-L319

Added lines #L316 - L319 were not covered by tests
}
}

if c.AddressFilter == nil {
return fmt.Errorf("address filter must not be nil - use AddrFilterIdentity to disable filtering")
return &ConfigurationError{
Component: "Config",
Err: fmt.Errorf("address filter must not be nil - use AddrFilterIdentity to disable filtering"),
}
}

if c.MeterProvider == nil {
return fmt.Errorf("opentelemetry meter provider must not be nil")
return &ConfigurationError{
Component: "Config",
Err: fmt.Errorf("opentelemetry meter provider must not be nil"),
}
}

if c.TracerProvider == nil {
return fmt.Errorf("opentelemetry tracer provider must not be nil")
return &ConfigurationError{
Component: "Config",
Err: fmt.Errorf("opentelemetry tracer provider must not be nil"),
}
}

return nil
Expand All @@ -322,3 +363,61 @@
func AddrFilterPublic(maddrs []ma.Multiaddr) []ma.Multiaddr {
return ma.FilterAddrs(maddrs, func(maddr ma.Multiaddr) bool { return !manet.IsIPLoopback(maddr) })
}

// QueryConfig contains the configuration options for queries managed by a [DHT].
type QueryConfig struct {
// Concurrency defines the maximum number of in-flight queries that may be waiting for message responses at any one time.
Concurrency int

// Timeout defines the time to wait before terminating a query that is not making progress
Timeout time.Duration

// RequestConcurrency defines the maximum number of concurrent requests that each query may have in flight.
// The maximum number of concurrent requests is equal to [RequestConcurrency] multiplied by [Concurrency].
RequestConcurrency int

// RequestTimeout defines the time to wait before terminating a request to a node that has not responded.
RequestTimeout time.Duration
}

// DefaultQueryConfig returns the default query configuration options for a DHT.
func DefaultQueryConfig() *QueryConfig {
return &QueryConfig{
Concurrency: 3, // MAGIC
Timeout: 5 * time.Minute, // MAGIC
RequestConcurrency: 3, // MAGIC
RequestTimeout: time.Minute, // MAGIC
}
}

// Validate checks the configuration options and returns an error if any have invalid values.
func (cfg *QueryConfig) Validate() error {
if cfg.Concurrency < 1 {
return &ConfigurationError{
Component: "QueryConfig",
Err: fmt.Errorf("concurrency must be greater than zero"),
}
}
if cfg.Timeout < 1 {
return &ConfigurationError{
Component: "QueryConfig",
Err: fmt.Errorf("timeout must be greater than zero"),
}
}

if cfg.RequestConcurrency < 1 {
return &ConfigurationError{
Component: "QueryConfig",
Err: fmt.Errorf("request concurrency must be greater than zero"),
}
}

if cfg.RequestTimeout < 1 {
return &ConfigurationError{
Component: "QueryConfig",
Err: fmt.Errorf("request timeout must be greater than zero"),
}
}

return nil
}
51 changes: 47 additions & 4 deletions v2/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ func TestConfig_Validate(t *testing.T) {
assert.Error(t, cfg.Validate())
})

t.Run("nil Kademlia configuration", func(t *testing.T) {
t.Run("nil Query configuration", func(t *testing.T) {
cfg := DefaultConfig()
cfg.Kademlia = nil
cfg.Query = nil
assert.Error(t, cfg.Validate())
})

t.Run("invalid Kademlia configuration", func(t *testing.T) {
t.Run("invalid Query configuration", func(t *testing.T) {
cfg := DefaultConfig()
cfg.Kademlia.Clock = nil
cfg.Query.Concurrency = -1
assert.Error(t, cfg.Validate())
})

Expand Down Expand Up @@ -114,3 +114,46 @@ func TestConfig_Validate(t *testing.T) {
assert.Error(t, cfg.Validate())
})
}

func TestQueryConfig_Validate(t *testing.T) {
t.Run("default is valid", func(t *testing.T) {
cfg := DefaultQueryConfig()
assert.NoError(t, cfg.Validate())
})

t.Run("concurrency positive", func(t *testing.T) {
cfg := DefaultQueryConfig()

cfg.Concurrency = 0
assert.Error(t, cfg.Validate())
cfg.Concurrency = -1
assert.Error(t, cfg.Validate())
})

t.Run("timeout positive", func(t *testing.T) {
cfg := DefaultQueryConfig()

cfg.Timeout = 0
assert.Error(t, cfg.Validate())
cfg.Timeout = -1
assert.Error(t, cfg.Validate())
})

t.Run("request concurrency positive", func(t *testing.T) {
cfg := DefaultQueryConfig()

cfg.RequestConcurrency = 0
assert.Error(t, cfg.Validate())
cfg.RequestConcurrency = -1
assert.Error(t, cfg.Validate())
})

t.Run("request timeout positive", func(t *testing.T) {
cfg := DefaultQueryConfig()

cfg.RequestTimeout = 0
assert.Error(t, cfg.Validate())
cfg.RequestTimeout = -1
assert.Error(t, cfg.Validate())
})
}
12 changes: 8 additions & 4 deletions v2/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
"github.com/plprobelab/go-kademlia/key"
"golang.org/x/exp/slog"

"github.com/libp2p/go-libp2p-kad-dht/v2/coord"
"github.com/libp2p/go-libp2p-kad-dht/v2/coord/routing"
"github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord"
"github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/routing"
"github.com/libp2p/go-libp2p-kad-dht/v2/kadt"
)

Expand Down Expand Up @@ -97,7 +97,7 @@
} else if cfg.ProtocolID == ProtocolIPFS {
d.backends, err = d.initAminoBackends()
if err != nil {
return nil, fmt.Errorf("init amino backends: %w", err)

Check warning on line 100 in v2/dht.go

View check run for this annotation

Codecov / codecov/patch

v2/dht.go#L100

Added line #L100 was not covered by tests
}
}

Expand All @@ -107,12 +107,16 @@
}

// instantiate a new Kademlia DHT coordinator.
coordCfg := cfg.Kademlia
coordCfg := coord.DefaultCoordinatorConfig()
coordCfg.QueryConcurrency = cfg.Query.Concurrency
coordCfg.QueryTimeout = cfg.Query.Timeout
coordCfg.RequestConcurrency = cfg.Query.RequestConcurrency
coordCfg.RequestTimeout = cfg.Query.RequestTimeout
coordCfg.Clock = cfg.Clock
coordCfg.MeterProvider = cfg.MeterProvider
coordCfg.TracerProvider = cfg.TracerProvider

d.kad, err = coord.NewCoordinator(kadt.PeerID(d.host.ID()), &Router{host: h, ProtocolID: cfg.ProtocolID}, d.rt, coordCfg)
d.kad, err = coord.NewCoordinator(kadt.PeerID(d.host.ID()), &router{host: h, ProtocolID: cfg.ProtocolID}, d.rt, coordCfg)
if err != nil {
return nil, fmt.Errorf("new coordinator: %w", err)
}
Expand Down Expand Up @@ -150,18 +154,18 @@
)

if d.cfg.Datastore != nil {
dstore = d.cfg.Datastore

Check warning on line 157 in v2/dht.go

View check run for this annotation

Codecov / codecov/patch

v2/dht.go#L157

Added line #L157 was not covered by tests
} else if dstore, err = InMemoryDatastore(); err != nil {
return nil, fmt.Errorf("new default datastore: %w", err)
}

Check warning on line 160 in v2/dht.go

View check run for this annotation

Codecov / codecov/patch

v2/dht.go#L159-L160

Added lines #L159 - L160 were not covered by tests

// wrap datastore in open telemetry tracing
dstore = trace.New(dstore, d.tele.Tracer)

pbeCfg, err := DefaultProviderBackendConfig()
if err != nil {
return nil, fmt.Errorf("default provider config: %w", err)
}

Check warning on line 168 in v2/dht.go

View check run for this annotation

Codecov / codecov/patch

v2/dht.go#L167-L168

Added lines #L167 - L168 were not covered by tests
pbeCfg.Logger = d.cfg.Logger
pbeCfg.AddressFilter = d.cfg.AddressFilter
pbeCfg.Tele = d.tele
Expand All @@ -169,26 +173,26 @@

pbe, err := NewBackendProvider(d.host.Peerstore(), dstore, pbeCfg)
if err != nil {
return nil, fmt.Errorf("new provider backend: %w", err)
}

Check warning on line 177 in v2/dht.go

View check run for this annotation

Codecov / codecov/patch

v2/dht.go#L176-L177

Added lines #L176 - L177 were not covered by tests

rbeCfg, err := DefaultRecordBackendConfig()
if err != nil {
return nil, fmt.Errorf("default provider config: %w", err)
}

Check warning on line 182 in v2/dht.go

View check run for this annotation

Codecov / codecov/patch

v2/dht.go#L181-L182

Added lines #L181 - L182 were not covered by tests
rbeCfg.Logger = d.cfg.Logger
rbeCfg.Tele = d.tele
rbeCfg.clk = d.cfg.Clock

ipnsBe, err := NewBackendIPNS(dstore, d.host.Peerstore(), rbeCfg)
if err != nil {
return nil, fmt.Errorf("new ipns backend: %w", err)
}

Check warning on line 190 in v2/dht.go

View check run for this annotation

Codecov / codecov/patch

v2/dht.go#L189-L190

Added lines #L189 - L190 were not covered by tests

pkBe, err := NewBackendPublicKey(dstore, rbeCfg)
if err != nil {
return nil, fmt.Errorf("new public key backend: %w", err)
}

Check warning on line 195 in v2/dht.go

View check run for this annotation

Codecov / codecov/patch

v2/dht.go#L194-L195

Added lines #L194 - L195 were not covered by tests

return map[string]Backend{
namespaceIPNS: ipnsBe,
Expand Down
2 changes: 1 addition & 1 deletion v2/dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/libp2p/go-libp2p-kad-dht/v2/coord"
"github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord"
"github.com/libp2p/go-libp2p-kad-dht/v2/internal/kadtest"
"github.com/libp2p/go-libp2p-kad-dht/v2/kadt"
)
Expand Down
22 changes: 22 additions & 0 deletions v2/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package dht

import "fmt"

// A ConfigurationError is returned when a component's configuration is found to be invalid or unusable.
type ConfigurationError struct {
Component string
Err error
}

var _ error = (*ConfigurationError)(nil)

func (e *ConfigurationError) Error() string {
if e.Err == nil {
return fmt.Sprintf("configuration error: %s", e.Component)
}

Check warning on line 16 in v2/errors.go

View check run for this annotation

Codecov / codecov/patch

v2/errors.go#L15-L16

Added lines #L15 - L16 were not covered by tests
return fmt.Sprintf("configuration error: %s: %s", e.Component, e.Err.Error())
}

func (e *ConfigurationError) Unwrap() error {
return e.Err

Check warning on line 21 in v2/errors.go

View check run for this annotation

Codecov / codecov/patch

v2/errors.go#L20-L21

Added lines #L20 - L21 were not covered by tests
}
File renamed without changes.
File renamed without changes.
Loading
Loading