Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Give dht and coordinator their own telemetry instances #891

Merged
merged 2 commits into from
Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions v2/backend_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ type ProvidersBackendConfig struct {

// Tele holds a reference to the telemetry struct to capture metrics and
// traces.
Tele *tele.Telemetry
Tele *Telemetry

// AddressFilter is a filter function that any addresses that we attempt to
// store or fetch from the peerstore's address book need to pass through.
Expand All @@ -106,7 +106,7 @@ type ProvidersBackendConfig struct {
// configuration is passed to [NewBackendProvider], this default configuration
// here is used.
func DefaultProviderBackendConfig() (*ProvidersBackendConfig, error) {
telemetry, err := tele.NewWithGlobalProviders()
telemetry, err := NewWithGlobalProviders()
if err != nil {
return nil, fmt.Errorf("new telemetry: %w", err)
}
Expand Down
6 changes: 2 additions & 4 deletions v2/backend_record.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import (
record "github.com/libp2p/go-libp2p-record"
recpb "github.com/libp2p/go-libp2p-record/pb"
"golang.org/x/exp/slog"

"github.com/libp2p/go-libp2p-kad-dht/v2/tele"
)

type RecordBackend struct {
Expand All @@ -29,11 +27,11 @@ type RecordBackendConfig struct {
clk clock.Clock
MaxRecordAge time.Duration
Logger *slog.Logger
Tele *tele.Telemetry
Tele *Telemetry
}

func DefaultRecordBackendConfig() (*RecordBackendConfig, error) {
telemetry, err := tele.NewWithGlobalProviders()
telemetry, err := NewWithGlobalProviders()
if err != nil {
return nil, fmt.Errorf("new telemetry: %w", err)
}
Expand Down
7 changes: 4 additions & 3 deletions v2/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/libp2p/go-libp2p/core/protocol"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/plprobelab/go-kademlia/coord"
"github.com/plprobelab/go-kademlia/kad"
"github.com/plprobelab/go-kademlia/key"
"github.com/plprobelab/go-kademlia/routing"
Expand All @@ -21,6 +20,8 @@ import (
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap/exp/zapslog"
"golang.org/x/exp/slog"

"github.com/libp2p/go-libp2p-kad-dht/v2/coord"
)

// ServiceName is used to scope incoming streams for the resource manager.
Expand Down Expand Up @@ -112,7 +113,7 @@ type Config struct {
Mode ModeOpt

// Kademlia holds the configuration of the underlying Kademlia implementation.
Kademlia *coord.Config
Kademlia *coord.CoordinatorConfig

// BucketSize determines the number of closer peers to return
BucketSize int
Expand Down Expand Up @@ -182,7 +183,7 @@ func DefaultConfig() *Config {
return &Config{
Clock: clock.New(),
Mode: ModeOptAutoClient,
Kademlia: coord.DefaultConfig(),
Kademlia: coord.DefaultCoordinatorConfig(),
BucketSize: 20, // MAGIC
ProtocolID: ProtocolIPFS,
RoutingTable: nil, // nil because a routing table requires information about the local node. triert.TrieRT will be used if this field is nil.
Expand Down
64 changes: 38 additions & 26 deletions v2/coord/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
"github.com/plprobelab/go-kademlia/network/address"
"github.com/plprobelab/go-kademlia/query"
"github.com/plprobelab/go-kademlia/routing"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap/exp/zapslog"
"golang.org/x/exp/slog"

"github.com/libp2p/go-libp2p-kad-dht/v2/kadt"
"github.com/libp2p/go-libp2p-kad-dht/v2/tele"
)

// A Coordinator coordinates the state machines that comprise a Kademlia DHT
Expand Down Expand Up @@ -51,6 +52,9 @@

// queryBehaviour is the behaviour responsible for running user-submitted queries
queryBehaviour Behaviour[BehaviourEvent, BehaviourEvent]

// tele provides tracing and metric reporting capabilities
tele *Telemetry
}

type CoordinatorConfig struct {
Expand All @@ -64,8 +68,10 @@
RequestConcurrency int // the maximum number of concurrent requests that each query may have in flight
RequestTimeout time.Duration // the timeout queries should use for contacting a single node

Logger *slog.Logger // a structured logger that should be used when logging.
Tele *tele.Telemetry // a struct holding a reference to various metric counters/histograms and a tracer
Logger *slog.Logger // a structured logger that should be used when logging.

MeterProvider metric.MeterProvider // the meter provider to use when initialising metric instruments
TracerProvider trace.TracerProvider // the tracer provider to use when initialising tracing
}

// Validate checks the configuration options and returns an error if any have invalid values.
Expand Down Expand Up @@ -111,22 +117,24 @@
}
}

if cfg.Tele == nil {
if cfg.MeterProvider == nil {
return &kaderr.ConfigurationError{
Component: "CoordinatorConfig",
Err: fmt.Errorf("meter provider must not be nil"),
}
}

if cfg.TracerProvider == nil {
return &kaderr.ConfigurationError{
Component: "CoordinatorConfig",
Err: fmt.Errorf("telemetry must not be nil"),
Err: fmt.Errorf("tracer provider must not be nil"),
}
}

return nil
}

func DefaultCoordinatorConfig() (*CoordinatorConfig, error) {
telemetry, err := tele.NewWithGlobalProviders()
if err != nil {
return nil, fmt.Errorf("new telemetry: %w", err)
}

func DefaultCoordinatorConfig() *CoordinatorConfig {
return &CoordinatorConfig{
Clock: clock.New(),
PeerstoreTTL: 10 * time.Minute,
Expand All @@ -135,21 +143,24 @@
RequestConcurrency: 3,
RequestTimeout: time.Minute,
Logger: slog.New(zapslog.NewHandler(logging.Logger("coord").Desugar().Core())),
Tele: telemetry,
}, nil
MeterProvider: otel.GetMeterProvider(),
TracerProvider: otel.GetTracerProvider(),
}
}

func NewCoordinator(self peer.ID, rtr Router, rt routing.RoutingTableCpl[KadKey, kad.NodeID[KadKey]], cfg *CoordinatorConfig) (*Coordinator, error) {
if cfg == nil {
c, err := DefaultCoordinatorConfig()
if err != nil {
return nil, fmt.Errorf("default config: %w", err)
}
cfg = c
cfg = DefaultCoordinatorConfig()

Check warning on line 153 in v2/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/coordinator.go#L153

Added line #L153 was not covered by tests
} else if err := cfg.Validate(); err != nil {
return nil, err
}

// initialize a new telemetry struct
tele, err := NewTelemetry(cfg.MeterProvider, cfg.TracerProvider)
if err != nil {
return nil, fmt.Errorf("init telemetry: %w", err)
}

Check warning on line 162 in v2/coord/coordinator.go

View check run for this annotation

Codecov / codecov/patch

v2/coord/coordinator.go#L161-L162

Added lines #L161 - L162 were not covered by tests

qpCfg := query.DefaultPoolConfig()
qpCfg.Clock = cfg.Clock
qpCfg.Concurrency = cfg.QueryConcurrency
Expand All @@ -161,7 +172,7 @@
if err != nil {
return nil, fmt.Errorf("query pool: %w", err)
}
queryBehaviour := NewPooledQueryBehaviour(qp, cfg.Logger, cfg.Tele.Tracer)
queryBehaviour := NewPooledQueryBehaviour(qp, cfg.Logger, tele.Tracer)

bootstrapCfg := routing.DefaultBootstrapConfig[KadKey, ma.Multiaddr]()
bootstrapCfg.Clock = cfg.Clock
Expand Down Expand Up @@ -199,14 +210,15 @@
return nil, fmt.Errorf("probe: %w", err)
}

routingBehaviour := NewRoutingBehaviour(self, bootstrap, include, probe, cfg.Logger, cfg.Tele.Tracer)
routingBehaviour := NewRoutingBehaviour(self, bootstrap, include, probe, cfg.Logger, tele.Tracer)

networkBehaviour := NewNetworkBehaviour(rtr, cfg.Logger, cfg.Tele.Tracer)
networkBehaviour := NewNetworkBehaviour(rtr, cfg.Logger, tele.Tracer)

ctx, cancel := context.WithCancel(context.Background())

d := &Coordinator{
self: self,
tele: tele,
cfg: *cfg,
rtr: rtr,
rt: rt,
Expand Down Expand Up @@ -248,7 +260,7 @@
}

func (c *Coordinator) eventLoop(ctx context.Context) {
ctx, span := c.cfg.Tele.Tracer.Start(ctx, "Coordinator.eventLoop")
ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.eventLoop")
defer span.End()
for {
var ev BehaviourEvent
Expand All @@ -272,7 +284,7 @@
}

func (c *Coordinator) dispatchEvent(ctx context.Context, ev BehaviourEvent) {
ctx, span := c.cfg.Tele.Tracer.Start(ctx, "Coordinator.dispatchEvent", trace.WithAttributes(attribute.String("event_type", fmt.Sprintf("%T", ev))))
ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.dispatchEvent", trace.WithAttributes(attribute.String("event_type", fmt.Sprintf("%T", ev))))
defer span.End()

switch ev := ev.(type) {
Expand Down Expand Up @@ -335,7 +347,7 @@

// Query traverses the DHT calling fn for each node visited.
func (c *Coordinator) Query(ctx context.Context, target KadKey, fn QueryFunc) (QueryStats, error) {
ctx, span := c.cfg.Tele.Tracer.Start(ctx, "Coordinator.Query")
ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.Query")
defer span.End()

ctx, cancel := context.WithCancel(ctx)
Expand Down Expand Up @@ -419,7 +431,7 @@
// If the routing table is updated as a result of this operation an EventRoutingUpdated notification
// is emitted on the routing notification channel.
func (c *Coordinator) AddNodes(ctx context.Context, ais []peer.AddrInfo, ttl time.Duration) error {
ctx, span := c.cfg.Tele.Tracer.Start(ctx, "Coordinator.AddNodes")
ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.AddNodes")
defer span.End()
for _, ai := range ais {
if ai.ID == c.self {
Expand All @@ -441,7 +453,7 @@

// Bootstrap instructs the dht to begin bootstrapping the routing table.
func (c *Coordinator) Bootstrap(ctx context.Context, seeds []peer.ID) error {
ctx, span := c.cfg.Tele.Tracer.Start(ctx, "Coordinator.Bootstrap")
ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.Bootstrap")
defer span.End()
c.routingBehaviour.Notify(ctx, &EventStartBootstrap{
// Bootstrap state machine uses the message
Expand Down
50 changes: 21 additions & 29 deletions v2/coord/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/libp2p/go-libp2p-kad-dht/v2/coord/internal/nettest"
"github.com/libp2p/go-libp2p-kad-dht/v2/internal/kadtest"
"github.com/libp2p/go-libp2p-kad-dht/v2/kadt"
"github.com/libp2p/go-libp2p-kad-dht/v2/tele"
)

const peerstoreTTL = 10 * time.Minute
Expand Down Expand Up @@ -75,8 +74,8 @@ func (w *notificationWatcher) Expect(ctx context.Context, expected RoutingNotifi
}

// TracingTelemetry may be used to create a Telemetry that traces a test
func TracingTelemetry(t *testing.T) *tele.Telemetry {
telemetry, err := tele.New(otel.GetMeterProvider(), kadtest.JaegerTracerProvider(t))
func TracingTelemetry(t *testing.T) *Telemetry {
telemetry, err := NewTelemetry(otel.GetMeterProvider(), kadtest.JaegerTracerProvider(t))
if err != nil {
t.Fatalf("unexpected error creating telemetry: %v", err)
}
Expand All @@ -86,23 +85,20 @@ func TracingTelemetry(t *testing.T) *tele.Telemetry {

func TestConfigValidate(t *testing.T) {
t.Run("default is valid", func(t *testing.T) {
cfg, err := DefaultCoordinatorConfig()
require.NoError(t, err)
cfg := DefaultCoordinatorConfig()

require.NoError(t, cfg.Validate())
})

t.Run("clock is not nil", func(t *testing.T) {
cfg, err := DefaultCoordinatorConfig()
require.NoError(t, err)
cfg := DefaultCoordinatorConfig()

cfg.Clock = nil
require.Error(t, cfg.Validate())
})

t.Run("query concurrency positive", func(t *testing.T) {
cfg, err := DefaultCoordinatorConfig()
require.NoError(t, err)
cfg := DefaultCoordinatorConfig()

cfg.QueryConcurrency = 0
require.Error(t, cfg.Validate())
Expand All @@ -111,8 +107,7 @@ func TestConfigValidate(t *testing.T) {
})

t.Run("query timeout positive", func(t *testing.T) {
cfg, err := DefaultCoordinatorConfig()
require.NoError(t, err)
cfg := DefaultCoordinatorConfig()

cfg.QueryTimeout = 0
require.Error(t, cfg.Validate())
Expand All @@ -121,8 +116,7 @@ func TestConfigValidate(t *testing.T) {
})

t.Run("request concurrency positive", func(t *testing.T) {
cfg, err := DefaultCoordinatorConfig()
require.NoError(t, err)
cfg := DefaultCoordinatorConfig()

cfg.RequestConcurrency = 0
require.Error(t, cfg.Validate())
Expand All @@ -131,8 +125,7 @@ func TestConfigValidate(t *testing.T) {
})

t.Run("request timeout positive", func(t *testing.T) {
cfg, err := DefaultCoordinatorConfig()
require.NoError(t, err)
cfg := DefaultCoordinatorConfig()

cfg.RequestTimeout = 0
require.Error(t, cfg.Validate())
Expand All @@ -141,18 +134,21 @@ func TestConfigValidate(t *testing.T) {
})

t.Run("logger not nil", func(t *testing.T) {
cfg, err := DefaultCoordinatorConfig()
require.NoError(t, err)
cfg := DefaultCoordinatorConfig()

cfg.Logger = nil
require.Error(t, cfg.Validate())
})

t.Run("telemetry not nil", func(t *testing.T) {
cfg, err := DefaultCoordinatorConfig()
require.NoError(t, err)
t.Run("meter provider not nil", func(t *testing.T) {
cfg := DefaultCoordinatorConfig()
cfg.MeterProvider = nil
require.Error(t, cfg.Validate())
})

cfg.Tele = nil
t.Run("tracer provider not nil", func(t *testing.T) {
cfg := DefaultCoordinatorConfig()
cfg.TracerProvider = nil
require.Error(t, cfg.Validate())
})
}
Expand All @@ -163,8 +159,7 @@ func TestExhaustiveQuery(t *testing.T) {
clk := clock.NewMock()
_, nodes, err := nettest.LinearTopology(4, clk)
require.NoError(t, err)
ccfg, err := DefaultCoordinatorConfig()
require.NoError(t, err)
ccfg := DefaultCoordinatorConfig()

ccfg.Clock = clk
ccfg.PeerstoreTTL = peerstoreTTL
Expand Down Expand Up @@ -203,8 +198,7 @@ func TestRoutingUpdatedEventEmittedForCloserNodes(t *testing.T) {
_, nodes, err := nettest.LinearTopology(4, clk)
require.NoError(t, err)

ccfg, err := DefaultCoordinatorConfig()
require.NoError(t, err)
ccfg := DefaultCoordinatorConfig()

ccfg.Clock = clk
ccfg.PeerstoreTTL = peerstoreTTL
Expand Down Expand Up @@ -265,8 +259,7 @@ func TestBootstrap(t *testing.T) {
_, nodes, err := nettest.LinearTopology(4, clk)
require.NoError(t, err)

ccfg, err := DefaultCoordinatorConfig()
require.NoError(t, err)
ccfg := DefaultCoordinatorConfig()

ccfg.Clock = clk
ccfg.PeerstoreTTL = peerstoreTTL
Expand Down Expand Up @@ -318,8 +311,7 @@ func TestIncludeNode(t *testing.T) {
_, nodes, err := nettest.LinearTopology(4, clk)
require.NoError(t, err)

ccfg, err := DefaultCoordinatorConfig()
require.NoError(t, err)
ccfg := DefaultCoordinatorConfig()

ccfg.Clock = clk
ccfg.PeerstoreTTL = peerstoreTTL
Expand Down
Loading
Loading