diff --git a/v2/backend_provider.go b/v2/backend_provider.go index aee03322..1ddd764f 100644 --- a/v2/backend_provider.go +++ b/v2/backend_provider.go @@ -92,7 +92,7 @@ type ProvidersBackendConfig struct { // Tele holds a reference to the telemetry struct to capture metrics and // traces. - Tele *tele.Telemetry + Tele *Telemetry // AddressFilter is a filter function that any addresses that we attempt to // store or fetch from the peerstore's address book need to pass through. @@ -106,7 +106,7 @@ type ProvidersBackendConfig struct { // configuration is passed to [NewBackendProvider], this default configuration // here is used. func DefaultProviderBackendConfig() (*ProvidersBackendConfig, error) { - telemetry, err := tele.NewWithGlobalProviders() + telemetry, err := NewWithGlobalProviders() if err != nil { return nil, fmt.Errorf("new telemetry: %w", err) } diff --git a/v2/backend_record.go b/v2/backend_record.go index 9655d2b7..ba4a94ba 100644 --- a/v2/backend_record.go +++ b/v2/backend_record.go @@ -11,8 +11,6 @@ import ( record "github.com/libp2p/go-libp2p-record" recpb "github.com/libp2p/go-libp2p-record/pb" "golang.org/x/exp/slog" - - "github.com/libp2p/go-libp2p-kad-dht/v2/tele" ) type RecordBackend struct { @@ -29,11 +27,11 @@ type RecordBackendConfig struct { clk clock.Clock MaxRecordAge time.Duration Logger *slog.Logger - Tele *tele.Telemetry + Tele *Telemetry } func DefaultRecordBackendConfig() (*RecordBackendConfig, error) { - telemetry, err := tele.NewWithGlobalProviders() + telemetry, err := NewWithGlobalProviders() if err != nil { return nil, fmt.Errorf("new telemetry: %w", err) } diff --git a/v2/config.go b/v2/config.go index 98136cca..932951d9 100644 --- a/v2/config.go +++ b/v2/config.go @@ -11,7 +11,6 @@ import ( "github.com/libp2p/go-libp2p/core/protocol" ma "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" - "github.com/plprobelab/go-kademlia/coord" "github.com/plprobelab/go-kademlia/kad" "github.com/plprobelab/go-kademlia/key" "github.com/plprobelab/go-kademlia/routing" @@ -21,6 +20,8 @@ import ( "go.opentelemetry.io/otel/trace" "go.uber.org/zap/exp/zapslog" "golang.org/x/exp/slog" + + "github.com/libp2p/go-libp2p-kad-dht/v2/coord" ) // ServiceName is used to scope incoming streams for the resource manager. @@ -112,7 +113,7 @@ type Config struct { Mode ModeOpt // Kademlia holds the configuration of the underlying Kademlia implementation. - Kademlia *coord.Config + Kademlia *coord.CoordinatorConfig // BucketSize determines the number of closer peers to return BucketSize int @@ -182,7 +183,7 @@ func DefaultConfig() *Config { return &Config{ Clock: clock.New(), Mode: ModeOptAutoClient, - Kademlia: coord.DefaultConfig(), + Kademlia: coord.DefaultCoordinatorConfig(), BucketSize: 20, // MAGIC ProtocolID: ProtocolIPFS, RoutingTable: nil, // nil because a routing table requires information about the local node. triert.TrieRT will be used if this field is nil. diff --git a/v2/coord/coordinator.go b/v2/coord/coordinator.go index 579701a4..ee5d5011 100644 --- a/v2/coord/coordinator.go +++ b/v2/coord/coordinator.go @@ -15,13 +15,14 @@ import ( "github.com/plprobelab/go-kademlia/network/address" "github.com/plprobelab/go-kademlia/query" "github.com/plprobelab/go-kademlia/routing" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/trace" "go.uber.org/zap/exp/zapslog" "golang.org/x/exp/slog" "github.com/libp2p/go-libp2p-kad-dht/v2/kadt" - "github.com/libp2p/go-libp2p-kad-dht/v2/tele" ) // A Coordinator coordinates the state machines that comprise a Kademlia DHT @@ -51,6 +52,9 @@ type Coordinator struct { // queryBehaviour is the behaviour responsible for running user-submitted queries queryBehaviour Behaviour[BehaviourEvent, BehaviourEvent] + + // tele provides tracing and metric reporting capabilities + tele *Telemetry } type CoordinatorConfig struct { @@ -64,8 +68,10 @@ type CoordinatorConfig struct { RequestConcurrency int // the maximum number of concurrent requests that each query may have in flight RequestTimeout time.Duration // the timeout queries should use for contacting a single node - Logger *slog.Logger // a structured logger that should be used when logging. - Tele *tele.Telemetry // a struct holding a reference to various metric counters/histograms and a tracer + Logger *slog.Logger // a structured logger that should be used when logging. + + MeterProvider metric.MeterProvider // the meter provider to use when initialising metric instruments + TracerProvider trace.TracerProvider // the tracer provider to use when initialising tracing } // Validate checks the configuration options and returns an error if any have invalid values. @@ -111,22 +117,24 @@ func (cfg *CoordinatorConfig) Validate() error { } } - if cfg.Tele == nil { + if cfg.MeterProvider == nil { + return &kaderr.ConfigurationError{ + Component: "CoordinatorConfig", + Err: fmt.Errorf("meter provider must not be nil"), + } + } + + if cfg.TracerProvider == nil { return &kaderr.ConfigurationError{ Component: "CoordinatorConfig", - Err: fmt.Errorf("telemetry must not be nil"), + Err: fmt.Errorf("tracer provider must not be nil"), } } return nil } -func DefaultCoordinatorConfig() (*CoordinatorConfig, error) { - telemetry, err := tele.NewWithGlobalProviders() - if err != nil { - return nil, fmt.Errorf("new telemetry: %w", err) - } - +func DefaultCoordinatorConfig() *CoordinatorConfig { return &CoordinatorConfig{ Clock: clock.New(), PeerstoreTTL: 10 * time.Minute, @@ -135,21 +143,24 @@ func DefaultCoordinatorConfig() (*CoordinatorConfig, error) { RequestConcurrency: 3, RequestTimeout: time.Minute, Logger: slog.New(zapslog.NewHandler(logging.Logger("coord").Desugar().Core())), - Tele: telemetry, - }, nil + MeterProvider: otel.GetMeterProvider(), + TracerProvider: otel.GetTracerProvider(), + } } func NewCoordinator(self peer.ID, rtr Router, rt routing.RoutingTableCpl[KadKey, kad.NodeID[KadKey]], cfg *CoordinatorConfig) (*Coordinator, error) { if cfg == nil { - c, err := DefaultCoordinatorConfig() - if err != nil { - return nil, fmt.Errorf("default config: %w", err) - } - cfg = c + cfg = DefaultCoordinatorConfig() } else if err := cfg.Validate(); err != nil { return nil, err } + // initialize a new telemetry struct + tele, err := NewTelemetry(cfg.MeterProvider, cfg.TracerProvider) + if err != nil { + return nil, fmt.Errorf("init telemetry: %w", err) + } + qpCfg := query.DefaultPoolConfig() qpCfg.Clock = cfg.Clock qpCfg.Concurrency = cfg.QueryConcurrency @@ -161,7 +172,7 @@ func NewCoordinator(self peer.ID, rtr Router, rt routing.RoutingTableCpl[KadKey, if err != nil { return nil, fmt.Errorf("query pool: %w", err) } - queryBehaviour := NewPooledQueryBehaviour(qp, cfg.Logger, cfg.Tele.Tracer) + queryBehaviour := NewPooledQueryBehaviour(qp, cfg.Logger, tele.Tracer) bootstrapCfg := routing.DefaultBootstrapConfig[KadKey, ma.Multiaddr]() bootstrapCfg.Clock = cfg.Clock @@ -199,14 +210,15 @@ func NewCoordinator(self peer.ID, rtr Router, rt routing.RoutingTableCpl[KadKey, return nil, fmt.Errorf("probe: %w", err) } - routingBehaviour := NewRoutingBehaviour(self, bootstrap, include, probe, cfg.Logger, cfg.Tele.Tracer) + routingBehaviour := NewRoutingBehaviour(self, bootstrap, include, probe, cfg.Logger, tele.Tracer) - networkBehaviour := NewNetworkBehaviour(rtr, cfg.Logger, cfg.Tele.Tracer) + networkBehaviour := NewNetworkBehaviour(rtr, cfg.Logger, tele.Tracer) ctx, cancel := context.WithCancel(context.Background()) d := &Coordinator{ self: self, + tele: tele, cfg: *cfg, rtr: rtr, rt: rt, @@ -248,7 +260,7 @@ func (c *Coordinator) RoutingNotifications() <-chan RoutingNotification { } func (c *Coordinator) eventLoop(ctx context.Context) { - ctx, span := c.cfg.Tele.Tracer.Start(ctx, "Coordinator.eventLoop") + ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.eventLoop") defer span.End() for { var ev BehaviourEvent @@ -272,7 +284,7 @@ func (c *Coordinator) eventLoop(ctx context.Context) { } func (c *Coordinator) dispatchEvent(ctx context.Context, ev BehaviourEvent) { - ctx, span := c.cfg.Tele.Tracer.Start(ctx, "Coordinator.dispatchEvent", trace.WithAttributes(attribute.String("event_type", fmt.Sprintf("%T", ev)))) + ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.dispatchEvent", trace.WithAttributes(attribute.String("event_type", fmt.Sprintf("%T", ev)))) defer span.End() switch ev := ev.(type) { @@ -335,7 +347,7 @@ func (c *Coordinator) PutValue(ctx context.Context, r Value, q int) error { // Query traverses the DHT calling fn for each node visited. func (c *Coordinator) Query(ctx context.Context, target KadKey, fn QueryFunc) (QueryStats, error) { - ctx, span := c.cfg.Tele.Tracer.Start(ctx, "Coordinator.Query") + ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.Query") defer span.End() ctx, cancel := context.WithCancel(ctx) @@ -419,7 +431,7 @@ func (c *Coordinator) Query(ctx context.Context, target KadKey, fn QueryFunc) (Q // If the routing table is updated as a result of this operation an EventRoutingUpdated notification // is emitted on the routing notification channel. func (c *Coordinator) AddNodes(ctx context.Context, ais []peer.AddrInfo, ttl time.Duration) error { - ctx, span := c.cfg.Tele.Tracer.Start(ctx, "Coordinator.AddNodes") + ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.AddNodes") defer span.End() for _, ai := range ais { if ai.ID == c.self { @@ -441,7 +453,7 @@ func (c *Coordinator) AddNodes(ctx context.Context, ais []peer.AddrInfo, ttl tim // Bootstrap instructs the dht to begin bootstrapping the routing table. func (c *Coordinator) Bootstrap(ctx context.Context, seeds []peer.ID) error { - ctx, span := c.cfg.Tele.Tracer.Start(ctx, "Coordinator.Bootstrap") + ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.Bootstrap") defer span.End() c.routingBehaviour.Notify(ctx, &EventStartBootstrap{ // Bootstrap state machine uses the message diff --git a/v2/coord/coordinator_test.go b/v2/coord/coordinator_test.go index ec62bedd..694ea7fc 100644 --- a/v2/coord/coordinator_test.go +++ b/v2/coord/coordinator_test.go @@ -17,7 +17,6 @@ import ( "github.com/libp2p/go-libp2p-kad-dht/v2/coord/internal/nettest" "github.com/libp2p/go-libp2p-kad-dht/v2/internal/kadtest" "github.com/libp2p/go-libp2p-kad-dht/v2/kadt" - "github.com/libp2p/go-libp2p-kad-dht/v2/tele" ) const peerstoreTTL = 10 * time.Minute @@ -75,8 +74,8 @@ func (w *notificationWatcher) Expect(ctx context.Context, expected RoutingNotifi } // TracingTelemetry may be used to create a Telemetry that traces a test -func TracingTelemetry(t *testing.T) *tele.Telemetry { - telemetry, err := tele.New(otel.GetMeterProvider(), kadtest.JaegerTracerProvider(t)) +func TracingTelemetry(t *testing.T) *Telemetry { + telemetry, err := NewTelemetry(otel.GetMeterProvider(), kadtest.JaegerTracerProvider(t)) if err != nil { t.Fatalf("unexpected error creating telemetry: %v", err) } @@ -86,23 +85,20 @@ func TracingTelemetry(t *testing.T) *tele.Telemetry { func TestConfigValidate(t *testing.T) { t.Run("default is valid", func(t *testing.T) { - cfg, err := DefaultCoordinatorConfig() - require.NoError(t, err) + cfg := DefaultCoordinatorConfig() require.NoError(t, cfg.Validate()) }) t.Run("clock is not nil", func(t *testing.T) { - cfg, err := DefaultCoordinatorConfig() - require.NoError(t, err) + cfg := DefaultCoordinatorConfig() cfg.Clock = nil require.Error(t, cfg.Validate()) }) t.Run("query concurrency positive", func(t *testing.T) { - cfg, err := DefaultCoordinatorConfig() - require.NoError(t, err) + cfg := DefaultCoordinatorConfig() cfg.QueryConcurrency = 0 require.Error(t, cfg.Validate()) @@ -111,8 +107,7 @@ func TestConfigValidate(t *testing.T) { }) t.Run("query timeout positive", func(t *testing.T) { - cfg, err := DefaultCoordinatorConfig() - require.NoError(t, err) + cfg := DefaultCoordinatorConfig() cfg.QueryTimeout = 0 require.Error(t, cfg.Validate()) @@ -121,8 +116,7 @@ func TestConfigValidate(t *testing.T) { }) t.Run("request concurrency positive", func(t *testing.T) { - cfg, err := DefaultCoordinatorConfig() - require.NoError(t, err) + cfg := DefaultCoordinatorConfig() cfg.RequestConcurrency = 0 require.Error(t, cfg.Validate()) @@ -131,8 +125,7 @@ func TestConfigValidate(t *testing.T) { }) t.Run("request timeout positive", func(t *testing.T) { - cfg, err := DefaultCoordinatorConfig() - require.NoError(t, err) + cfg := DefaultCoordinatorConfig() cfg.RequestTimeout = 0 require.Error(t, cfg.Validate()) @@ -141,18 +134,21 @@ func TestConfigValidate(t *testing.T) { }) t.Run("logger not nil", func(t *testing.T) { - cfg, err := DefaultCoordinatorConfig() - require.NoError(t, err) + cfg := DefaultCoordinatorConfig() cfg.Logger = nil require.Error(t, cfg.Validate()) }) - t.Run("telemetry not nil", func(t *testing.T) { - cfg, err := DefaultCoordinatorConfig() - require.NoError(t, err) + t.Run("meter provider not nil", func(t *testing.T) { + cfg := DefaultCoordinatorConfig() + cfg.MeterProvider = nil + require.Error(t, cfg.Validate()) + }) - cfg.Tele = nil + t.Run("tracer provider not nil", func(t *testing.T) { + cfg := DefaultCoordinatorConfig() + cfg.TracerProvider = nil require.Error(t, cfg.Validate()) }) } @@ -163,8 +159,7 @@ func TestExhaustiveQuery(t *testing.T) { clk := clock.NewMock() _, nodes, err := nettest.LinearTopology(4, clk) require.NoError(t, err) - ccfg, err := DefaultCoordinatorConfig() - require.NoError(t, err) + ccfg := DefaultCoordinatorConfig() ccfg.Clock = clk ccfg.PeerstoreTTL = peerstoreTTL @@ -203,8 +198,7 @@ func TestRoutingUpdatedEventEmittedForCloserNodes(t *testing.T) { _, nodes, err := nettest.LinearTopology(4, clk) require.NoError(t, err) - ccfg, err := DefaultCoordinatorConfig() - require.NoError(t, err) + ccfg := DefaultCoordinatorConfig() ccfg.Clock = clk ccfg.PeerstoreTTL = peerstoreTTL @@ -265,8 +259,7 @@ func TestBootstrap(t *testing.T) { _, nodes, err := nettest.LinearTopology(4, clk) require.NoError(t, err) - ccfg, err := DefaultCoordinatorConfig() - require.NoError(t, err) + ccfg := DefaultCoordinatorConfig() ccfg.Clock = clk ccfg.PeerstoreTTL = peerstoreTTL @@ -318,8 +311,7 @@ func TestIncludeNode(t *testing.T) { _, nodes, err := nettest.LinearTopology(4, clk) require.NoError(t, err) - ccfg, err := DefaultCoordinatorConfig() - require.NoError(t, err) + ccfg := DefaultCoordinatorConfig() ccfg.Clock = clk ccfg.PeerstoreTTL = peerstoreTTL diff --git a/v2/coord/telemetry.go b/v2/coord/telemetry.go new file mode 100644 index 00000000..11873503 --- /dev/null +++ b/v2/coord/telemetry.go @@ -0,0 +1,28 @@ +package coord + +import ( + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/trace" + + "github.com/libp2p/go-libp2p-kad-dht/v2/tele" +) + +// Telemetry is the struct that holds a reference to all metrics and the tracer used +// by the coordinator and its components. +// Make sure to also register the [MeterProviderOpts] with your custom or the global +// [metric.MeterProvider]. +type Telemetry struct { + Tracer trace.Tracer + // TODO: define metrics produced by coordinator +} + +// NewTelemetry initializes a Telemetry struct with the given meter and tracer providers. +func NewTelemetry(meterProvider metric.MeterProvider, tracerProvider trace.TracerProvider) (*Telemetry, error) { + t := &Telemetry{ + Tracer: tracerProvider.Tracer(tele.TracerName), + } + + // TODO: Initalize metrics produced by the coordinator + + return t, nil +} diff --git a/v2/dht.go b/v2/dht.go index 31787ae5..5191349f 100644 --- a/v2/dht.go +++ b/v2/dht.go @@ -20,7 +20,6 @@ import ( "github.com/libp2p/go-libp2p-kad-dht/v2/coord" "github.com/libp2p/go-libp2p-kad-dht/v2/kadt" - "github.com/libp2p/go-libp2p-kad-dht/v2/tele" ) // DHT is an implementation of Kademlia with S/Kademlia modifications. @@ -58,7 +57,7 @@ type DHT struct { sub event.Subscription // tele holds a reference to a telemetry struct - tele *tele.Telemetry + tele *Telemetry } // New constructs a new [DHT] for the given underlying host and with the given @@ -88,7 +87,7 @@ func New(h host.Host, cfg *Config) (*DHT, error) { } // initialize a new telemetry struct - d.tele, err = tele.New(cfg.MeterProvider, cfg.TracerProvider) + d.tele, err = NewTelemetry(cfg.MeterProvider, cfg.TracerProvider) if err != nil { return nil, fmt.Errorf("init telemetry: %w", err) } @@ -152,11 +151,9 @@ func New(h host.Host, cfg *Config) (*DHT, error) { } // instantiate a new Kademlia DHT coordinator. - coordCfg, err := coord.DefaultCoordinatorConfig() - if err != nil { - return nil, fmt.Errorf("new coordinator config: %w", err) - } - coordCfg.Tele = d.tele + coordCfg := coord.DefaultCoordinatorConfig() + coordCfg.MeterProvider = cfg.MeterProvider + coordCfg.TracerProvider = cfg.TracerProvider d.kad, err = coord.NewCoordinator(d.host.ID(), &Router{host: h}, d.rt, coordCfg) if err != nil { diff --git a/v2/tele/tele.go b/v2/tele/tele.go index a3571ffb..5ee01666 100644 --- a/v2/tele/tele.go +++ b/v2/tele/tele.go @@ -2,141 +2,24 @@ package tele import ( "context" - "fmt" - "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/sdk/instrumentation" motel "go.opentelemetry.io/otel/sdk/metric" - "go.opentelemetry.io/otel/trace" ) // ctxKey is an unexported type alias for the value of a context key. This is // used to attach metric values to a context and get them out of a context. type ctxKey struct{} -var ( - meterName = "github.com/libp2p/go-libp2p-kad-dht/v2" - tracerName = "go-libp2p-kad-dht" - - // attrsCtxKey is the actual context key value that's used as a key for - // metric values that are attached to a context. - attrsCtxKey = ctxKey{} +const ( + MeterName = "github.com/libp2p/go-libp2p-kad-dht/v2" + TracerName = "go-libp2p-kad-dht" ) -// Telemetry is the struct that holds a reference to all metrics and the tracer. -// Initialize this struct with [New] or [NewWithGlobalProviders]. Make sure -// to also register the [MeterProviderOpts] with your custom or the global -// [metric.MeterProvider]. -// -// To see the documentation for each metric below, check out [New] and the -// metric.WithDescription() calls when initializing each metric. -type Telemetry struct { - Tracer trace.Tracer - ReceivedMessages metric.Int64Counter - ReceivedMessageErrors metric.Int64Counter - ReceivedBytes metric.Int64Histogram - InboundRequestLatency metric.Float64Histogram - OutboundRequestLatency metric.Float64Histogram - SentMessages metric.Int64Counter - SentMessageErrors metric.Int64Counter - SentRequests metric.Int64Counter - SentRequestErrors metric.Int64Counter - SentBytes metric.Int64Histogram - LRUCache metric.Int64Counter - NetworkSize metric.Int64Counter -} - -// NewWithGlobalProviders uses the global meter and tracer providers from -// opentelemetry. Check out the documentation of [MeterProviderOpts] for -// implications of using this constructor. -func NewWithGlobalProviders() (*Telemetry, error) { - return New(otel.GetMeterProvider(), otel.GetTracerProvider()) -} - -// New initializes a Telemetry struct with the given meter and tracer providers. -// It constructs the different metric counters and histograms. The histograms -// have custom boundaries. Therefore, the given [metric.MeterProvider] should -// have the custom view registered that [MeterProviderOpts] returns. -func New(meterProvider metric.MeterProvider, tracerProvider trace.TracerProvider) (*Telemetry, error) { - var err error - - if meterProvider == nil { - meterProvider = otel.GetMeterProvider() - } - - if tracerProvider == nil { - tracerProvider = otel.GetTracerProvider() - } - - t := &Telemetry{ - Tracer: tracerProvider.Tracer(tracerName), - } - - meter := meterProvider.Meter(meterName) - t.ReceivedMessages, err = meter.Int64Counter("received_messages", metric.WithDescription("Total number of messages received per RPC"), metric.WithUnit("1")) - if err != nil { - return nil, fmt.Errorf("received_messages counter: %w", err) - } - - t.ReceivedMessageErrors, err = meter.Int64Counter("received_message_errors", metric.WithDescription("Total number of errors for messages received per RPC"), metric.WithUnit("1")) - if err != nil { - return nil, fmt.Errorf("received_message_errors counter: %w", err) - } - - t.ReceivedBytes, err = meter.Int64Histogram("received_bytes", metric.WithDescription("Total received bytes per RPC"), metric.WithUnit("By")) - if err != nil { - return nil, fmt.Errorf("received_bytes histogram: %w", err) - } - - t.InboundRequestLatency, err = meter.Float64Histogram("inbound_request_latency", metric.WithDescription("Latency per RPC"), metric.WithUnit("ms")) - if err != nil { - return nil, fmt.Errorf("inbound_request_latency histogram: %w", err) - } - - t.OutboundRequestLatency, err = meter.Float64Histogram("outbound_request_latency", metric.WithDescription("Latency per RPC"), metric.WithUnit("ms")) - if err != nil { - return nil, fmt.Errorf("outbound_request_latency histogram: %w", err) - } - - t.SentMessages, err = meter.Int64Counter("sent_messages", metric.WithDescription("Total number of messages sent per RPC"), metric.WithUnit("1")) - if err != nil { - return nil, fmt.Errorf("sent_messages counter: %w", err) - } - - t.SentMessageErrors, err = meter.Int64Counter("sent_message_errors", metric.WithDescription("Total number of errors for messages sent per RPC"), metric.WithUnit("1")) - if err != nil { - return nil, fmt.Errorf("sent_message_errors counter: %w", err) - } - - t.SentRequests, err = meter.Int64Counter("sent_requests", metric.WithDescription("Total number of requests sent per RPC"), metric.WithUnit("1")) - if err != nil { - return nil, fmt.Errorf("sent_requests counter: %w", err) - } - - t.SentRequestErrors, err = meter.Int64Counter("sent_request_errors", metric.WithDescription("Total number of errors for requests sent per RPC"), metric.WithUnit("1")) - if err != nil { - return nil, fmt.Errorf("sent_request_errors counter: %w", err) - } - - t.SentBytes, err = meter.Int64Histogram("sent_bytes", metric.WithDescription("Total sent bytes per RPC"), metric.WithUnit("By")) - if err != nil { - return nil, fmt.Errorf("sent_bytes histogram: %w", err) - } - - t.LRUCache, err = meter.Int64Counter("lru_cache", metric.WithDescription("Cache hit or miss counter"), metric.WithUnit("1")) - if err != nil { - return nil, fmt.Errorf("lru_cache counter: %w", err) - } - - t.NetworkSize, err = meter.Int64Counter("network_size", metric.WithDescription("Network size estimation"), metric.WithUnit("1")) - if err != nil { - return nil, fmt.Errorf("network_size counter: %w", err) - } - - return t, nil -} +// attrsCtxKey is the actual context key value that's used as a key for +// metric values that are attached to a context. +var attrsCtxKey = ctxKey{} // MeterProviderOpts is a method that returns metric options. Make sure // to register these options to your [metric.MeterProvider]. Unfortunately, @@ -157,7 +40,7 @@ func New(meterProvider metric.MeterProvider, tracerProvider trace.TracerProvider // go-libp2p-kad-dht. var MeterProviderOpts = []motel.Option{ motel.WithView(motel.NewView( - motel.Instrument{Name: "*_bytes", Scope: instrumentation.Scope{Name: meterName}}, + motel.Instrument{Name: "*_bytes", Scope: instrumentation.Scope{Name: MeterName}}, motel.Stream{ Aggregation: motel.AggregationExplicitBucketHistogram{ Boundaries: []float64{1024, 2048, 4096, 16384, 65536, 262144, 1048576, 4194304, 16777216, 67108864, 268435456, 1073741824, 4294967296}, @@ -165,7 +48,7 @@ var MeterProviderOpts = []motel.Option{ }, )), motel.WithView(motel.NewView( - motel.Instrument{Name: "*_request_latency", Scope: instrumentation.Scope{Name: meterName}}, + motel.Instrument{Name: "*_request_latency", Scope: instrumentation.Scope{Name: MeterName}}, motel.Stream{ Aggregation: motel.AggregationExplicitBucketHistogram{ Boundaries: []float64{0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, 800, 1000, 2000, 5000, 10000, 20000, 50000, 100000}, diff --git a/v2/telemetry.go b/v2/telemetry.go new file mode 100644 index 00000000..86e1f3fc --- /dev/null +++ b/v2/telemetry.go @@ -0,0 +1,127 @@ +package dht + +import ( + "fmt" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/trace" + + "github.com/libp2p/go-libp2p-kad-dht/v2/tele" +) + +// Telemetry is the struct that holds a reference to all metrics and the tracer. +// Initialize this struct with [NewTelemetry]. Make sure +// to also register the [MeterProviderOpts] with your custom or the global +// [metric.MeterProvider]. +// +// To see the documentation for each metric below, check out [NewTelemetry] and the +// metric.WithDescription() calls when initializing each metric. +type Telemetry struct { + Tracer trace.Tracer + ReceivedMessages metric.Int64Counter + ReceivedMessageErrors metric.Int64Counter + ReceivedBytes metric.Int64Histogram + InboundRequestLatency metric.Float64Histogram + OutboundRequestLatency metric.Float64Histogram + SentMessages metric.Int64Counter + SentMessageErrors metric.Int64Counter + SentRequests metric.Int64Counter + SentRequestErrors metric.Int64Counter + SentBytes metric.Int64Histogram + LRUCache metric.Int64Counter + NetworkSize metric.Int64Counter +} + +// NewWithGlobalProviders uses the global meter and tracer providers from +// opentelemetry. Check out the documentation of [MeterProviderOpts] for +// implications of using this constructor. +func NewWithGlobalProviders() (*Telemetry, error) { + return NewTelemetry(otel.GetMeterProvider(), otel.GetTracerProvider()) +} + +// NewTelemetry initializes a Telemetry struct with the given meter and tracer providers. +// It constructs the different metric counters and histograms. The histograms +// have custom boundaries. Therefore, the given [metric.MeterProvider] should +// have the custom view registered that [MeterProviderOpts] returns. +func NewTelemetry(meterProvider metric.MeterProvider, tracerProvider trace.TracerProvider) (*Telemetry, error) { + var err error + + if meterProvider == nil { + meterProvider = otel.GetMeterProvider() + } + + if tracerProvider == nil { + tracerProvider = otel.GetTracerProvider() + } + + t := &Telemetry{ + Tracer: tracerProvider.Tracer(tele.TracerName), + } + + meter := meterProvider.Meter(tele.MeterName) + + // Initalize metrics for the DHT + + t.ReceivedMessages, err = meter.Int64Counter("received_messages", metric.WithDescription("Total number of messages received per RPC"), metric.WithUnit("1")) + if err != nil { + return nil, fmt.Errorf("received_messages counter: %w", err) + } + + t.ReceivedMessageErrors, err = meter.Int64Counter("received_message_errors", metric.WithDescription("Total number of errors for messages received per RPC"), metric.WithUnit("1")) + if err != nil { + return nil, fmt.Errorf("received_message_errors counter: %w", err) + } + + t.ReceivedBytes, err = meter.Int64Histogram("received_bytes", metric.WithDescription("Total received bytes per RPC"), metric.WithUnit("By")) + if err != nil { + return nil, fmt.Errorf("received_bytes histogram: %w", err) + } + + t.InboundRequestLatency, err = meter.Float64Histogram("inbound_request_latency", metric.WithDescription("Latency per RPC"), metric.WithUnit("ms")) + if err != nil { + return nil, fmt.Errorf("inbound_request_latency histogram: %w", err) + } + + t.OutboundRequestLatency, err = meter.Float64Histogram("outbound_request_latency", metric.WithDescription("Latency per RPC"), metric.WithUnit("ms")) + if err != nil { + return nil, fmt.Errorf("outbound_request_latency histogram: %w", err) + } + + t.SentMessages, err = meter.Int64Counter("sent_messages", metric.WithDescription("Total number of messages sent per RPC"), metric.WithUnit("1")) + if err != nil { + return nil, fmt.Errorf("sent_messages counter: %w", err) + } + + t.SentMessageErrors, err = meter.Int64Counter("sent_message_errors", metric.WithDescription("Total number of errors for messages sent per RPC"), metric.WithUnit("1")) + if err != nil { + return nil, fmt.Errorf("sent_message_errors counter: %w", err) + } + + t.SentRequests, err = meter.Int64Counter("sent_requests", metric.WithDescription("Total number of requests sent per RPC"), metric.WithUnit("1")) + if err != nil { + return nil, fmt.Errorf("sent_requests counter: %w", err) + } + + t.SentRequestErrors, err = meter.Int64Counter("sent_request_errors", metric.WithDescription("Total number of errors for requests sent per RPC"), metric.WithUnit("1")) + if err != nil { + return nil, fmt.Errorf("sent_request_errors counter: %w", err) + } + + t.SentBytes, err = meter.Int64Histogram("sent_bytes", metric.WithDescription("Total sent bytes per RPC"), metric.WithUnit("By")) + if err != nil { + return nil, fmt.Errorf("sent_bytes histogram: %w", err) + } + + t.LRUCache, err = meter.Int64Counter("lru_cache", metric.WithDescription("Cache hit or miss counter"), metric.WithUnit("1")) + if err != nil { + return nil, fmt.Errorf("lru_cache counter: %w", err) + } + + t.NetworkSize, err = meter.Int64Counter("network_size", metric.WithDescription("Network size estimation"), metric.WithUnit("1")) + if err != nil { + return nil, fmt.Errorf("network_size counter: %w", err) + } + + return t, nil +}