Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: default to prometheus.DefaultRegisterer #722

Merged
merged 1 commit into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ The following emojis are used to highlight certain changes:

- The above is only necessary if content routing is needed. Otherwise:

```
```go
// Create network: no contentRouter anymore
bswapnet := network.NewFromIpfsHost(host)
// Create Bitswap: a new "discovery" parameter set to nil (disable content discovery)
Expand All @@ -64,6 +64,8 @@ The following emojis are used to highlight certain changes:

- `routing/http/server`: added built-in Prometheus instrumentation to http delegated `/routing/v1/` endpoints, with custom buckets for response size and duration to match real world data observed at [the `delegated-ipfs.dev` instance](https://docs.ipfs.tech/concepts/public-utilities/#delegated-routing). [#718](https://github.com/ipfs/boxo/pull/718) [#724](https://github.com/ipfs/boxo/pull/724)
- `routing/http/server`: added configurable routing timeout (`DefaultRoutingTimeout` being 30s) to prevent indefinite hangs during content/peer routing. Set custom duration via `WithRoutingTimeout`. [#720](https://github.com/ipfs/boxo/pull/720)
- `routing/http/server`: exposes Prometheus metrics on `prometheus.DefaultRegisterer` and a custom one can be provided via `WithPrometheusRegistry` [#722](https://github.com/ipfs/boxo/pull/722)
- `gateway`: `NewCacheBlockStore` and `NewCarBackend` will use `prometheus.DefaultRegisterer` when a custom one is not specified via `WithPrometheusRegistry` [#722](https://github.com/ipfs/boxo/pull/722)

### Changed

Expand Down
19 changes: 12 additions & 7 deletions gateway/backend_car.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
return nil, err
}

var promReg prometheus.Registerer = prometheus.NewRegistry()
var promReg prometheus.Registerer = prometheus.DefaultRegisterer
if compiledOptions.promRegistry != nil {
promReg = compiledOptions.promRegistry
}
Expand Down Expand Up @@ -117,6 +117,11 @@
}

func registerCarBackendMetrics(promReg prometheus.Registerer) *CarBackendMetrics {
// make sure we have functional registry
if promReg == nil {
promReg = prometheus.DefaultRegisterer
}

Check warning on line 123 in gateway/backend_car.go

View check run for this annotation

Codecov / codecov/patch

gateway/backend_car.go#L122-L123

Added lines #L122 - L123 were not covered by tests

// How many CAR Fetch attempts we had? Need this to calculate % of various car request types.
// We only count attempts here, because success/failure with/without retries are provided by caboose:
// - ipfs_caboose_fetch_duration_car_success_count
Expand All @@ -129,15 +134,15 @@
Name: "car_fetch_attempts",
Help: "The number of times a CAR fetch was attempted by IPFSBackend.",
})
promReg.MustRegister(carFetchAttemptMetric)
registerMetric(promReg, carFetchAttemptMetric)

contextAlreadyCancelledMetric := prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "ipfs",
Subsystem: "gw_car_backend",
Name: "car_fetch_context_already_cancelled",
Help: "The number of times context is already cancelled when a CAR fetch was attempted by IPFSBackend.",
})
promReg.MustRegister(contextAlreadyCancelledMetric)
registerMetric(promReg, contextAlreadyCancelledMetric)

// How many blocks were read via CARs?
// Need this as a baseline to reason about error ratio vs raw_block_recovery_attempts.
Expand All @@ -147,15 +152,15 @@
Name: "car_blocks_fetched",
Help: "The number of blocks successfully read via CAR fetch.",
})
promReg.MustRegister(carBlocksFetchedMetric)
registerMetric(promReg, carBlocksFetchedMetric)

carParamsMetric := prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "ipfs",
Subsystem: "gw_car_backend",
Name: "car_fetch_params",
Help: "How many times specific CAR parameter was used during CAR data fetch.",
}, []string{"dagScope", "entityRanges"}) // we use 'ranges' instead of 'bytes' here because we only count the number of ranges present
promReg.MustRegister(carParamsMetric)
registerMetric(promReg, carParamsMetric)

bytesRangeStartMetric := prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "ipfs",
Expand All @@ -164,7 +169,7 @@
Help: "Tracks where did the range request start.",
Buckets: prometheus.ExponentialBuckets(1024, 2, 24), // 1024 bytes to 8 GiB
})
promReg.MustRegister(bytesRangeStartMetric)
registerMetric(promReg, bytesRangeStartMetric)

bytesRangeSizeMetric := prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "ipfs",
Expand All @@ -173,7 +178,7 @@
Help: "Tracks the size of range requests.",
Buckets: prometheus.ExponentialBuckets(256*1024, 2, 10), // From 256KiB to 100MiB
})
promReg.MustRegister(bytesRangeSizeMetric)
registerMetric(promReg, bytesRangeSizeMetric)

return &CarBackendMetrics{
contextAlreadyCancelledMetric,
Expand Down
16 changes: 4 additions & 12 deletions gateway/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,15 @@
// in memory using a two queue cache. It can be useful, for example, when paired
// with a proxy blockstore (see [NewRemoteBlockstore]).
//
// If the given [prometheus.Registerer] is nil, a new one will be created using
// [prometheus.NewRegistry].
// If the given [prometheus.Registerer] is nil, a [prometheus.DefaultRegisterer] will be used.
func NewCacheBlockStore(size int, reg prometheus.Registerer) (blockstore.Blockstore, error) {
c, err := lru.New2Q[string, []byte](size)
if err != nil {
return nil, err
}

if reg == nil {
reg = prometheus.NewRegistry()
reg = prometheus.DefaultRegisterer

Check warning on line 46 in gateway/blockstore.go

View check run for this annotation

Codecov / codecov/patch

gateway/blockstore.go#L46

Added line #L46 was not covered by tests
}

cacheHitsMetric := prometheus.NewCounter(prometheus.CounterOpts{
Expand All @@ -61,15 +60,8 @@
Help: "The number of global block cache requests.",
})

err = reg.Register(cacheHitsMetric)
if err != nil {
return nil, err
}

err = reg.Register(cacheRequestsMetric)
if err != nil {
return nil, err
}
registerMetric(reg, cacheHitsMetric)
registerMetric(reg, cacheRequestsMetric)

Check warning on line 64 in gateway/blockstore.go

View check run for this annotation

Codecov / codecov/patch

gateway/blockstore.go#L63-L64

Added lines #L63 - L64 were not covered by tests

return &cacheBlockStore{
cache: c,
Expand Down
11 changes: 11 additions & 0 deletions gateway/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,3 +307,14 @@ var tracer = otel.Tracer("boxo/gateway")
func spanTrace(ctx context.Context, spanName string, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
return tracer.Start(ctx, "Gateway."+spanName, opts...)
}

// registerMetric registers metrics in registry or logs an error.
//
// Registration may error if metric is alreadyregistered. we are not using
// MustRegister here to allow people to run tests in parallel without having to
// write tedious glue code that creates unique registry for each unit test
func registerMetric(registry prometheus.Registerer, metric prometheus.Collector) {
if err := registry.Register(metric); err != nil {
log.Errorf("failed to register %v: %v", metric, err)
}
}
18 changes: 16 additions & 2 deletions routing/http/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"mime"
"net/http"
"strings"
"sync/atomic"
"time"

"github.com/cespare/xxhash/v2"
Expand Down Expand Up @@ -152,14 +153,25 @@ func Handler(svc ContentRouter, opts ...Option) http.Handler {
}

if server.promRegistry == nil {
server.promRegistry = prometheus.NewRegistry()
server.promRegistry = prometheus.DefaultRegisterer
}

// Workaround due to https://github.com/slok/go-http-metrics
// using egistry.MustRegister internally.
// In production there will be only one handler, however we append counter
// to ensure duplicate metric registration will not panic in parallel tests
// when global prometheus.DefaultRegisterer is used.
metricsPrefix := "delegated_routing_server"
c := handlerCount.Add(1)
if c > 1 {
metricsPrefix = fmt.Sprintf("%s_%d", metricsPrefix, c)
}

// Create middleware with prometheus recorder
mdlw := middleware.New(middleware.Config{
Recorder: metrics.NewRecorder(metrics.Config{
Registry: server.promRegistry,
Prefix: "delegated_routing_server",
Prefix: metricsPrefix,
SizeBuckets: prometheus.ExponentialBuckets(100, 4, 8), // [100 400 1600 6400 25600 102400 409600 1.6384e+06]
DurationBuckets: []float64{0.1, 0.5, 1, 2, 5, 8, 10, 20, 30},
}),
Expand All @@ -176,6 +188,8 @@ func Handler(svc ContentRouter, opts ...Option) http.Handler {
return r
}

var handlerCount atomic.Int32

type server struct {
svc ContentRouter
disableNDJSON bool
Expand Down
Loading