Skip to content

Commit

Permalink
Integrate waku and api prometheus metrics correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
snormore committed Sep 27, 2023
1 parent a1811d3 commit 7181771
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 227 deletions.
55 changes: 20 additions & 35 deletions pkg/metrics/api-limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,46 +3,31 @@ package metrics
import (
"context"

"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.uber.org/zap"
"github.com/prometheus/client_golang/prometheus"
)

var bucketsNameKey = newTagKey("name")
var bucketsNameKey = "name"

var ratelimiterBucketsGaugeMeasure = stats.Int64("ratelimiter_buckets", "size of ratelimiter buckets map", stats.UnitDimensionless)
var ratelimiterBucketsGaugeView = &view.View{
Name: "xmtp_ratelimiter_buckets",
Measure: ratelimiterBucketsGaugeMeasure,
Description: "Size of rate-limiter buckets maps",
Aggregation: view.LastValue(),
TagKeys: []tag.Key{bucketsNameKey},
}
var ratelimiterBuckets = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "xmtp_ratelimiter_buckets",
Help: "Size of rate-limiter buckets maps",
},
[]string{bucketsNameKey},
)

func EmitRatelimiterBucketsSize(ctx context.Context, log *zap.Logger, name string, size int) {
err := recordWithTags(ctx, []tag.Mutator{tag.Insert(bucketsNameKey, name)}, ratelimiterBucketsGaugeMeasure.M(int64(size)))
if err != nil {
log.Warn("recording metric",
zap.String("metric", ratelimiterBucketsGaugeMeasure.Name()),
zap.Error(err))
}
func EmitRatelimiterBucketsSize(ctx context.Context, name string, size int) {
ratelimiterBuckets.WithLabelValues(name).Add(float64(size))
}

var ratelimiterBucketsDeletedCounterMeasure = stats.Int64("xmtp_ratelimiter_entries_deleted", "Count of deleted entries from ratelimiter buckets map", stats.UnitDimensionless)
var ratelimiterBucketsDeletedCounterView = &view.View{
Name: "xmtp_ratelimiter_entries_deleted",
Measure: ratelimiterBucketsDeletedCounterMeasure,
Description: "Count of deleted entries from rate-limiter buckets maps",
Aggregation: view.Count(),
TagKeys: []tag.Key{bucketsNameKey},
}
var ratelimiterBucketsDeleted = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "xmtp_ratelimiter_entries_deleted",
Help: "Count of deleted entries from rate-limiter buckets maps",
},
[]string{bucketsNameKey},
)

func EmitRatelimiterDeletedEntries(ctx context.Context, log *zap.Logger, name string, count int) {
err := recordWithTags(ctx, []tag.Mutator{tag.Insert(bucketsNameKey, name)}, ratelimiterBucketsDeletedCounterMeasure.M(int64(count)))
if err != nil {
log.Warn("recording metric",
zap.String("metric", ratelimiterBucketsDeletedCounterMeasure.Name()),
zap.Error(err))
}
func EmitRatelimiterDeletedEntries(ctx context.Context, name string, count int) {
ratelimiterBucketsDeleted.WithLabelValues(name).Add(float64(count))
}
201 changes: 77 additions & 124 deletions pkg/metrics/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,176 +4,129 @@ import (
"context"
"time"

"github.com/prometheus/client_golang/prometheus"
proto "github.com/xmtp/proto/v3/go/message_api/v1"
apicontext "github.com/xmtp/xmtp-node-go/pkg/api/message/v1/context"
"github.com/xmtp/xmtp-node-go/pkg/logging"
"github.com/xmtp/xmtp-node-go/pkg/topic"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

var (
appClientVersionTagKeys = []tag.Key{
newTagKey("client"),
newTagKey("client_version"),
newTagKey("app"),
newTagKey("app_version"),
appClientVersionTagKeys = []string{
"client",
"client_version",
"app",
"app_version",
}
apiRequestTagKeys = append([]tag.Key{
newTagKey("service"),
newTagKey("method"),
newTagKey("error_code"),
apiRequestTagKeys = append([]string{
"service",
"method",
"error_code",
}, appClientVersionTagKeys...)

apiRequestTagKeysByName = buildTagKeysByName(apiRequestTagKeys)
)
apiRequestTagKeysByName = buildTagKeysMap(apiRequestTagKeys)

var apiRequestsMeasure = stats.Int64("api_requests", "Count api requests", stats.UnitDimensionless)
topicCategoryTag = "topic-category"
queryParametersTag = "parameters"
queryErrorTag = "error"
)

var apiRequestsView = &view.View{
Name: "xmtp_api_requests",
Measure: apiRequestsMeasure,
Description: "Count of api requests",
Aggregation: view.Count(),
TagKeys: apiRequestTagKeys,
}
var apiRequests = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "xmtp_api_requests",
Help: "Count of api requests",
},
apiRequestTagKeys,
)

func EmitAPIRequest(ctx context.Context, log *zap.Logger, fields []zapcore.Field) {
mutators := make([]tag.Mutator, 0, len(fields))
labels := prometheus.Labels{}
for _, field := range fields {
key, ok := apiRequestTagKeysByName[field.Key]
if !ok {
if !apiRequestTagKeysByName[field.Key] {
continue
}
mutators = append(mutators, tag.Insert(key, field.String))
}
err := recordWithTags(ctx, mutators, apiRequestsMeasure.M(1))
if err != nil {
log.Error("recording metric", fields...)
labels[field.Key] = field.String
}
apiRequests.With(labels).Inc()
}

var topicCategoryTag, _ = tag.NewKey("topic-category")
var publishedEnvelopeMeasure = stats.Int64("published_envelope", "size of a published envelope", stats.UnitBytes)
var publishedEnvelopeView = &view.View{
Name: "xmtp_published_envelope",
Measure: publishedEnvelopeMeasure,
Description: "Size of a published envelope",
Aggregation: view.Distribution(100, 1000, 10000, 100000),
TagKeys: append([]tag.Key{topicCategoryTag}, appClientVersionTagKeys...),
}
var publishedEnvelopeSize = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "xmtp_published_envelope",
Help: "Size of a published envelope in bytes",
Buckets: []float64{100, 1000, 10000, 100000},
},
append([]string{topicCategoryTag}, appClientVersionTagKeys...),
)

var publishedEnvelopeCounterMeasure = stats.Int64("published_envelopes", "published envelopes", stats.UnitDimensionless)
var publishedEnvelopeCounterView = &view.View{
Name: "xmtp_published_envelopes",
Measure: publishedEnvelopeCounterMeasure,
Description: "Count of published envelopes",
Aggregation: view.Count(),
TagKeys: append([]tag.Key{topicCategoryTag}, appClientVersionTagKeys...),
}
var publishedEnvelopeCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "xmtp_published_envelopes",
Help: "Count of published envelopes",
},
append([]string{topicCategoryTag}, appClientVersionTagKeys...),
)

func EmitPublishedEnvelope(ctx context.Context, log *zap.Logger, env *proto.Envelope) {
mutators := contextMutators(ctx)
labels := contextLabels(ctx)
topicCategory := topic.Category(env.ContentTopic)
mutators = append(mutators, tag.Insert(topicCategoryTag, topicCategory))
size := int64(len(env.Message))
err := recordWithTags(ctx, mutators, publishedEnvelopeMeasure.M(size))
if err != nil {
log.Error("recording metric",
zap.Error(err),
zap.String("metric", publishedEnvelopeView.Name),
zap.Int64("size", size),
zap.String("topic_category", topicCategory),
)
}
err = recordWithTags(ctx, mutators, publishedEnvelopeCounterMeasure.M(1))
if err != nil {
log.Error("recording metric",
zap.Error(err),
zap.String("metric", publishedEnvelopeCounterView.Name),
zap.Int64("size", size),
zap.String("topic_category", topicCategory),
)
}
labels[topicCategoryTag] = topicCategory
publishedEnvelopeSize.With(labels).Observe(float64(len(env.Message)))
publishedEnvelopeCount.With(labels).Inc()
}

func contextMutators(ctx context.Context) []tag.Mutator {
func contextLabels(ctx context.Context) prometheus.Labels {
ri := apicontext.NewRequesterInfo(ctx)
fields := ri.ZapFields()
mutators := make([]tag.Mutator, 0, len(fields)+1)
labels := prometheus.Labels{}
for _, field := range fields {
key, ok := apiRequestTagKeysByName[field.Key]
if !ok {
if !apiRequestTagKeysByName[field.Key] {
continue
}
mutators = append(mutators, tag.Insert(key, field.String))
labels[field.Key] = field.String
}
return mutators
return labels
}

func buildTagKeysByName(keys []tag.Key) map[string]tag.Key {
m := map[string]tag.Key{}
func buildTagKeysMap(keys []string) map[string]bool {
m := map[string]bool{}
for _, key := range keys {
m[key.Name()] = key
m[key] = true
}
return m
}

func newTagKey(str string) tag.Key {
key, _ := tag.NewKey(str)
return key
}

var queryParametersTag, _ = tag.NewKey("parameters")
var queryErrorTag, _ = tag.NewKey("error")
var queryDurationMeasure = stats.Int64("api_query_duration", "duration of API query", stats.UnitMilliseconds)
var queryDurationView = &view.View{
Name: "xmtp_api_query_duration",
Measure: queryDurationMeasure,
Description: "duration of API query (ms)",
Aggregation: view.Distribution(1, 10, 100, 1000, 10000, 100000),
TagKeys: append([]tag.Key{topicCategoryTag, queryErrorTag, queryParametersTag}, appClientVersionTagKeys...),
}
var queryDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "xmtp_api_query_duration",
Help: "Duration of API query (ms)",
Buckets: []float64{1, 10, 100, 1000, 10000, 100000},
},
append([]string{topicCategoryTag, queryErrorTag, queryParametersTag}, appClientVersionTagKeys...),
)

var queryResultMeasure = stats.Int64("api_query_result", "number of events returned by an API query", stats.UnitNone)
var queryResultView = &view.View{
Name: "xmtp_api_query_result",
Measure: queryResultMeasure,
Description: "number of events returned by an API query",
Aggregation: view.Distribution(1, 10, 100, 1000, 10000, 100000),
TagKeys: append([]tag.Key{topicCategoryTag, queryErrorTag, queryParametersTag}, appClientVersionTagKeys...),
}
var queryResultLength = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "xmtp_api_query_result",
Help: "Number of events returned by an API query",
Buckets: []float64{1, 10, 100, 1000, 10000, 100000},
},
append([]string{topicCategoryTag, queryErrorTag, queryParametersTag}, appClientVersionTagKeys...),
)

func EmitQuery(ctx context.Context, log *zap.Logger, req *proto.QueryRequest, results int, err error, duration time.Duration) {
mutators := []tag.Mutator{}
func EmitQuery(ctx context.Context, req *proto.QueryRequest, results int, err error, duration time.Duration) {
labels := prometheus.Labels{}
if len(req.ContentTopics) > 0 {
topicCategory := topic.Category(req.ContentTopics[0])
mutators = append(mutators, tag.Insert(topicCategoryTag, topicCategory))
labels[topicCategoryTag] = topic.Category(req.ContentTopics[0])
}
if err != nil {
mutators = append(mutators, tag.Insert(queryErrorTag, "internal"))
labels[queryErrorTag] = "internal"
}
parameters := logging.QueryShape(req)
mutators = append(mutators, tag.Insert(queryParametersTag, parameters))
err = recordWithTags(ctx, mutators, queryDurationMeasure.M(duration.Milliseconds()))
if err != nil {
log.Error("recording metric",
zap.Error(err),
zap.Duration("duration", duration),
zap.String("parameters", parameters),
zap.String("metric", queryDurationView.Name),
)
}
err = recordWithTags(ctx, mutators, queryResultMeasure.M(int64(results)))
if err != nil {
log.Error("recording metric",
zap.Error(err),
zap.Int("results", results),
zap.String("parameters", parameters),
zap.String("metric", queryResultView.Name),
)
}
labels[queryParametersTag] = parameters

queryDuration.With(labels).Observe(float64(duration.Milliseconds()))
queryResultLength.With(labels).Observe(float64(results))
}
41 changes: 16 additions & 25 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@ import (
"context"
"net/http"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/waku-org/go-waku/waku/metrics"
"github.com/xmtp/xmtp-node-go/pkg/tracing"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -45,27 +43,20 @@ func (s *Server) Stop(ctx context.Context) error {
return err
}

func RegisterViews(logger *zap.Logger) {
if err := view.Register(
PeersByProtoView,
BootstrapPeersView,
StoredMessageView,
apiRequestsView,
publishedEnvelopeView,
publishedEnvelopeCounterView,
queryDurationView,
queryResultView,
ratelimiterBucketsGaugeView,
ratelimiterBucketsDeletedCounterView,
); err != nil {
logger.Fatal("registering metrics views", zap.Error(err))
func RegisterViews(logger *zap.Logger, reg prometheus.Registerer) {
cols := []prometheus.Collector{
PeersByProto,
BootstrapPeers,
StoredMessages,
apiRequests,
publishedEnvelopeSize,
publishedEnvelopeCount,
queryDuration,
queryResultLength,
// ratelimiterBucketsGauge,
// ratelimiterBucketsDeletedCounter,
}
for _, col := range cols {
reg.MustRegister(col)
}
}

func record(ctx context.Context, measurement stats.Measurement) {
stats.Record(ctx, measurement)
}

func recordWithTags(ctx context.Context, mutators []tag.Mutator, measurement stats.Measurement) error {
return stats.RecordWithTags(ctx, mutators, measurement)
}
Loading

0 comments on commit 7181771

Please sign in to comment.