Skip to content

Commit

Permalink
Add cortex_frontend_query_result_cache_requests_total and cortex_fron…
Browse files Browse the repository at this point in the history
…tend_query_result_cache_hits_total metrics (#5235)

Signed-off-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
pracucci authored Jun 14, 2023
1 parent 42c90fd commit ce6dd45
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 16 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
* [CHANGE] Store-gateway: skip verifying index header integrity upon loading. To enable verification set `blocks_storage.bucket_store.index_header.verify_on_load: true`.
* [CHANGE] Querier: change the default value of the experimental `-querier.streaming-chunks-per-ingester-buffer-size` flag to 256. #5203
* [FEATURE] Cardinality API: Add a new `count_method` parameter which enables counting active series #5136
* [FEATURE] Query-frontend: added experimental support to cache cardinality query responses. The cache will be used when `-query-frontend.cache-results` is enabled and `-query-frontend.results-cache-ttl-for-cardinality-query` set to a value greater than 0. #5212
* [FEATURE] Query-frontend: added experimental support to cache cardinality query responses. The cache will be used when `-query-frontend.cache-results` is enabled and `-query-frontend.results-cache-ttl-for-cardinality-query` set to a value greater than 0. The following metrics have been added to track the query results cache hit ratio per `request_type`: #5212 #5235
* `cortex_frontend_query_result_cache_requests_total{request_type="query_range|cardinality"}`
* `cortex_frontend_query_result_cache_hits_total{request_type="query_range|cardinality"}`
* [ENHANCEMENT] Cardinality API: When zone aware replication is enabled, the label values cardinality API can now tolerate single zone failure #5178
* [ENHANCEMENT] Distributor: optimize sending requests to ingesters when incoming requests don't need to be modified. For now this feature can be disabled by setting `-timeseries-unmarshal-caching-optimization-enabled=false`. #5137
* [ENHANCEMENT] Add advanced CLI flags to control gRPC client behaviour: #5161
Expand Down
23 changes: 14 additions & 9 deletions pkg/frontend/querymiddleware/cardinality_query_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/go-kit/log/level"
"github.com/grafana/dskit/cache"
"github.com/grafana/dskit/tenant"
"github.com/prometheus/client_golang/prometheus"

apierror "github.com/grafana/mimir/pkg/api/error"
"github.com/grafana/mimir/pkg/cardinality"
Expand All @@ -39,18 +40,20 @@ type cardinalityQueryRequest interface {
// cardinalityQueryCache is a http.RoundTripped wrapping the downstream with an HTTP response cache.
// This RoundTripper is used to add caching support to cardinality analysis API endpoints.
type cardinalityQueryCache struct {
cache cache.Cache
limits Limits
next http.RoundTripper
logger log.Logger
cache cache.Cache
limits Limits
metrics *resultsCacheMetrics
next http.RoundTripper
logger log.Logger
}

func newCardinalityQueryCacheRoundTripper(cache cache.Cache, limits Limits, next http.RoundTripper, logger log.Logger) http.RoundTripper {
func newCardinalityQueryCacheRoundTripper(cache cache.Cache, limits Limits, next http.RoundTripper, logger log.Logger, reg prometheus.Registerer) http.RoundTripper {
return &cardinalityQueryCache{
cache: cache,
limits: limits,
next: next,
logger: logger,
cache: cache,
limits: limits,
metrics: newResultsCacheMetrics("cardinality", reg),
next: next,
logger: logger,
}
}

Expand Down Expand Up @@ -92,9 +95,11 @@ func (c *cardinalityQueryCache) RoundTrip(req *http.Request) (*http.Response, er
}

// Lookup the cache.
c.metrics.cacheRequests.Inc()
cacheKey, hashedCacheKey := generateCardinalityQueryRequestCacheKey(tenantIDs, queryReq)
res := c.fetchCachedResponse(ctx, cacheKey, hashedCacheKey)
if res != nil {
c.metrics.cacheHits.Inc()
level.Debug(spanLog).Log("msg", "response fetched from the cache")
return res, nil
}
Expand Down
44 changes: 40 additions & 4 deletions pkg/frontend/querymiddleware/cardinality_query_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package querymiddleware
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"net/url"
Expand All @@ -14,6 +15,8 @@ import (

"github.com/grafana/dskit/cache"
"github.com/grafana/dskit/tenant"
"github.com/prometheus/client_golang/prometheus"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/util/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -47,6 +50,7 @@ func TestCardinalityQueryCache_RoundTrip(t *testing.T) {
expectedHeader http.Header
expectedBody []byte
expectedDownstreamCalled bool
expectedLookupFromCache bool
expectedStoredToCache bool
}{
"should fetch the response from the downstream and store it the cache if the downstream request succeed": {
Expand All @@ -56,6 +60,7 @@ func TestCardinalityQueryCache_RoundTrip(t *testing.T) {
expectedHeader: http.Header{"Content-Type": []string{"application/json"}},
expectedBody: []byte(`{content:"fresh"}`),
expectedDownstreamCalled: true,
expectedLookupFromCache: true,
expectedStoredToCache: true,
},
"should not store the response in the cache if disabled for the tenant": {
Expand All @@ -65,6 +70,7 @@ func TestCardinalityQueryCache_RoundTrip(t *testing.T) {
expectedHeader: http.Header{"Content-Type": []string{"application/json"}},
expectedBody: []byte(`{content:"fresh"}`),
expectedDownstreamCalled: true,
expectedLookupFromCache: false,
expectedStoredToCache: false,
},
"should not store the response in the cache if disabled for the request": {
Expand All @@ -75,24 +81,27 @@ func TestCardinalityQueryCache_RoundTrip(t *testing.T) {
expectedHeader: http.Header{"Content-Type": []string{"application/json"}},
expectedBody: []byte(`{content:"fresh"}`),
expectedDownstreamCalled: true,
expectedLookupFromCache: false,
expectedStoredToCache: false,
},
"should not store the response in the cache if the downstream returned a 4xx status code": {
cacheTTL: 0,
cacheTTL: time.Minute,
downstreamRes: downstreamRes(400, []byte(`{error:"400"}`)),
expectedStatusCode: 400,
expectedHeader: http.Header{"Content-Type": []string{"application/json"}},
expectedBody: []byte(`{error:"400"}`),
expectedDownstreamCalled: true,
expectedLookupFromCache: true,
expectedStoredToCache: false,
},
"should not store the response in the cache if the downstream returned a 5xx status code": {
cacheTTL: 0,
cacheTTL: time.Minute,
downstreamRes: downstreamRes(500, []byte(`{error:"500"}`)),
expectedStatusCode: 500,
expectedHeader: http.Header{"Content-Type": []string{"application/json"}},
expectedBody: []byte(`{error:"500"}`),
expectedDownstreamCalled: true,
expectedLookupFromCache: true,
expectedStoredToCache: false,
},
"should fetch the response from the cache if the cached response is not expired": {
Expand All @@ -109,6 +118,7 @@ func TestCardinalityQueryCache_RoundTrip(t *testing.T) {
expectedHeader: http.Header{"Content-Type": []string{"application/json"}},
expectedBody: []byte(`{content:"cached"}`),
expectedDownstreamCalled: false, // Should not call the downstream.
expectedLookupFromCache: true,
expectedStoredToCache: false, // Should not store anything to the cache.
},
"should fetch the response from the downstream and overwrite the cached response if corrupted": {
Expand All @@ -121,6 +131,7 @@ func TestCardinalityQueryCache_RoundTrip(t *testing.T) {
expectedHeader: http.Header{"Content-Type": []string{"application/json"}},
expectedBody: []byte(`{content:"fresh"}`),
expectedDownstreamCalled: true,
expectedLookupFromCache: true,
expectedStoredToCache: true,
},
"should fetch the response from the downstream and overwrite the cached response if a key collision was detected": {
Expand All @@ -137,6 +148,7 @@ func TestCardinalityQueryCache_RoundTrip(t *testing.T) {
expectedHeader: http.Header{"Content-Type": []string{"application/json"}},
expectedBody: []byte(`{content:"fresh"}`),
expectedDownstreamCalled: true,
expectedLookupFromCache: true,
expectedStoredToCache: true,
},
}
Expand Down Expand Up @@ -187,7 +199,8 @@ func TestCardinalityQueryCache_RoundTrip(t *testing.T) {
}
initialStoreCallsCount := cacheBackend.CountStoreCalls()

rt := newCardinalityQueryCacheRoundTripper(cacheBackend, limits, downstream, testutil.NewLogger(t))
reg := prometheus.NewPedanticRegistry()
rt := newCardinalityQueryCacheRoundTripper(cacheBackend, limits, downstream, testutil.NewLogger(t), reg)
res, err := rt.RoundTrip(req)
require.NoError(t, err)

Expand Down Expand Up @@ -220,6 +233,29 @@ func TestCardinalityQueryCache_RoundTrip(t *testing.T) {
} else {
assert.Equal(t, initialStoreCallsCount, cacheBackend.CountStoreCalls())
}

// Assert on metrics.
expectedRequestsCount := 0
expectedHitsCount := 0
if testData.expectedLookupFromCache {
expectedRequestsCount = 1
if !testData.expectedDownstreamCalled {
expectedHitsCount = 1
}
}

assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(`
# HELP cortex_frontend_query_result_cache_requests_total Total number of requests (or partial requests) looked up in the results cache.
# TYPE cortex_frontend_query_result_cache_requests_total counter
cortex_frontend_query_result_cache_requests_total{request_type="cardinality"} %d
# HELP cortex_frontend_query_result_cache_hits_total Total number of requests (or partial requests) fetched from the results cache.
# TYPE cortex_frontend_query_result_cache_hits_total counter
cortex_frontend_query_result_cache_hits_total{request_type="cardinality"} %d
`, expectedRequestsCount, expectedHitsCount)),
"cortex_frontend_query_result_cache_requests_total",
"cortex_frontend_query_result_cache_hits_total",
))
})
}
})
Expand Down Expand Up @@ -279,7 +315,7 @@ func TestCardinalityQueryCache_RoundTrip_WithTenantFederation(t *testing.T) {
cacheBackend := cache.NewInstrumentedMockCache()
limits := multiTenantMockLimits{byTenant: testData.limits}

rt := newCardinalityQueryCacheRoundTripper(cacheBackend, limits, downstream, testutil.NewLogger(t))
rt := newCardinalityQueryCacheRoundTripper(cacheBackend, limits, downstream, testutil.NewLogger(t), nil)
res, err := rt.RoundTrip(req)
require.NoError(t, err)

Expand Down
21 changes: 21 additions & 0 deletions pkg/frontend/querymiddleware/results_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/promql"
Expand Down Expand Up @@ -93,6 +94,26 @@ func errUnsupportedResultsCacheBackend(backend string) error {
return fmt.Errorf("%w: %q, supported values: %v", errUnsupportedBackend, backend, supportedResultsCacheBackends)
}

type resultsCacheMetrics struct {
cacheRequests prometheus.Counter
cacheHits prometheus.Counter
}

func newResultsCacheMetrics(requestType string, reg prometheus.Registerer) *resultsCacheMetrics {
return &resultsCacheMetrics{
cacheRequests: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_frontend_query_result_cache_requests_total",
Help: "Total number of requests (or partial requests) looked up in the results cache.",
ConstLabels: map[string]string{"request_type": requestType},
}),
cacheHits: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_frontend_query_result_cache_hits_total",
Help: "Total number of requests (or partial requests) fetched from the results cache.",
ConstLabels: map[string]string{"request_type": requestType},
}),
}
}

// newResultsCache creates a new results cache based on the input configuration.
func newResultsCache(cfg ResultsCacheConfig, logger log.Logger, reg prometheus.Registerer) (cache.Cache, error) {
// Add the "component" label similarly to other components, so that metrics don't clash and have the same labels set
Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/querymiddleware/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func newQueryTripperware(
// Inject the cardinality query cache roundtripper only if the query results cache is enabled.
cardinality := next
if cfg.CacheResults {
cardinality = newCardinalityQueryCacheRoundTripper(c, limits, next, log)
cardinality = newCardinalityQueryCacheRoundTripper(c, limits, next, log, registerer)
}

return RoundTripFunc(func(r *http.Request) (*http.Response, error) {
Expand Down
6 changes: 6 additions & 0 deletions pkg/frontend/querymiddleware/split_and_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,16 @@ var (
)

type splitAndCacheMiddlewareMetrics struct {
*resultsCacheMetrics

splitQueriesCount prometheus.Counter
queryResultCacheAttemptedCount prometheus.Counter
queryResultCacheSkippedCount *prometheus.CounterVec
}

func newSplitAndCacheMiddlewareMetrics(reg prometheus.Registerer) *splitAndCacheMiddlewareMetrics {
m := &splitAndCacheMiddlewareMetrics{
resultsCacheMetrics: newResultsCacheMetrics("query_range", reg),
splitQueriesCount: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_frontend_split_queries_total",
Help: "Total number of underlying query requests after the split by interval is applied.",
Expand Down Expand Up @@ -344,7 +347,10 @@ func (s *splitAndCacheMiddleware) fetchCacheExtents(ctx context.Context, now tim
spanLog.LogKV("key", key, "hashedKey", hashed)
}

// Lookup the cache.
s.metrics.cacheRequests.Add(float64(len(keys)))
founds := s.cache.Fetch(ctx, hashedKeys)
s.metrics.cacheHits.Add(float64(len(founds)))

// Decode all cached responses.
extents := make([][]Extent, len(keys))
Expand Down
47 changes: 46 additions & 1 deletion pkg/frontend/querymiddleware/split_and_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,12 @@ func TestSplitAndCacheMiddleware_SplitByInterval(t *testing.T) {
# HELP cortex_frontend_split_queries_total Total number of underlying query requests after the split by interval is applied.
# TYPE cortex_frontend_split_queries_total counter
cortex_frontend_split_queries_total 4
# HELP cortex_frontend_query_result_cache_hits_total Total number of requests (or partial requests) fetched from the results cache.
# TYPE cortex_frontend_query_result_cache_hits_total counter
cortex_frontend_query_result_cache_hits_total{request_type="query_range"} 0
# HELP cortex_frontend_query_result_cache_requests_total Total number of requests (or partial requests) looked up in the results cache.
# TYPE cortex_frontend_query_result_cache_requests_total counter
cortex_frontend_query_result_cache_requests_total{request_type="query_range"} 0
`)))

// Assert query stats from context
Expand All @@ -231,6 +237,7 @@ func TestSplitAndCacheMiddleware_SplitByInterval(t *testing.T) {
func TestSplitAndCacheMiddleware_ResultsCache(t *testing.T) {
cacheBackend := cache.NewInstrumentedMockCache()

reg := prometheus.NewPedanticRegistry()
mw := newSplitAndCacheMiddleware(
true,
true,
Expand All @@ -243,7 +250,7 @@ func TestSplitAndCacheMiddleware_ResultsCache(t *testing.T) {
PrometheusResponseExtractor{},
resultsCacheAlwaysEnabled,
log.NewNopLogger(),
prometheus.NewPedanticRegistry(),
reg,
)

expectedResponse := &PrometheusResponse{
Expand Down Expand Up @@ -333,6 +340,31 @@ func TestSplitAndCacheMiddleware_ResultsCache(t *testing.T) {
// Assert query stats from context
queryStats = stats.FromContext(ctx)
assert.Equal(t, uint32(2), queryStats.LoadSplitQueries())

// Assert metrics
assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_frontend_query_result_cache_attempted_total Total number of queries that were attempted to be fetched from cache.
# TYPE cortex_frontend_query_result_cache_attempted_total counter
cortex_frontend_query_result_cache_attempted_total 3
# HELP cortex_frontend_query_result_cache_skipped_total Total number of times a query was not cacheable because of a reason. This metric is tracked for each partial query when time-splitting is enabled.
# TYPE cortex_frontend_query_result_cache_skipped_total counter
cortex_frontend_query_result_cache_skipped_total{reason="has-modifiers"} 0
cortex_frontend_query_result_cache_skipped_total{reason="too-new"} 0
cortex_frontend_query_result_cache_skipped_total{reason="unaligned-time-range"} 0
# HELP cortex_frontend_split_queries_total Total number of underlying query requests after the split by interval is applied.
# TYPE cortex_frontend_split_queries_total counter
cortex_frontend_split_queries_total 3
# HELP cortex_frontend_query_result_cache_requests_total Total number of requests (or partial requests) looked up in the results cache.
# TYPE cortex_frontend_query_result_cache_requests_total counter
cortex_frontend_query_result_cache_requests_total{request_type="query_range"} 3
# HELP cortex_frontend_query_result_cache_hits_total Total number of requests (or partial requests) fetched from the results cache.
# TYPE cortex_frontend_query_result_cache_hits_total counter
cortex_frontend_query_result_cache_hits_total{request_type="query_range"} 2
`)))
}

func TestSplitAndCacheMiddleware_ResultsCache_ShouldNotLookupCacheIfStepIsNotAligned(t *testing.T) {
Expand Down Expand Up @@ -423,6 +455,7 @@ func TestSplitAndCacheMiddleware_ResultsCache_ShouldNotLookupCacheIfStepIsNotAli
// Assert query stats from context
queryStats := stats.FromContext(ctx)
assert.Equal(t, uint32(1), queryStats.LoadSplitQueries())

// Assert metrics
assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_frontend_query_result_cache_attempted_total Total number of queries that were attempted to be fetched from cache.
Expand All @@ -436,6 +469,12 @@ func TestSplitAndCacheMiddleware_ResultsCache_ShouldNotLookupCacheIfStepIsNotAli
# HELP cortex_frontend_split_queries_total Total number of underlying query requests after the split by interval is applied.
# TYPE cortex_frontend_split_queries_total counter
cortex_frontend_split_queries_total 1
# HELP cortex_frontend_query_result_cache_hits_total Total number of requests (or partial requests) fetched from the results cache.
# TYPE cortex_frontend_query_result_cache_hits_total counter
cortex_frontend_query_result_cache_hits_total{request_type="query_range"} 0
# HELP cortex_frontend_query_result_cache_requests_total Total number of requests (or partial requests) looked up in the results cache.
# TYPE cortex_frontend_query_result_cache_requests_total counter
cortex_frontend_query_result_cache_requests_total{request_type="query_range"} 0
`)))
}

Expand Down Expand Up @@ -572,6 +611,12 @@ func TestSplitAndCacheMiddleware_ResultsCache_ShouldNotCacheRequestEarlierThanMa
# HELP cortex_frontend_split_queries_total Total number of underlying query requests after the split by interval is applied.
# TYPE cortex_frontend_split_queries_total counter
cortex_frontend_split_queries_total 0
# HELP cortex_frontend_query_result_cache_hits_total Total number of requests (or partial requests) fetched from the results cache.
# TYPE cortex_frontend_query_result_cache_hits_total counter
cortex_frontend_query_result_cache_hits_total{request_type="query_range"} 0
# HELP cortex_frontend_query_result_cache_requests_total Total number of requests (or partial requests) looked up in the results cache.
# TYPE cortex_frontend_query_result_cache_requests_total counter
cortex_frontend_query_result_cache_requests_total{request_type="query_range"} 0
`,
},
"should cache a response up until max cache freshness time ago": {
Expand Down

0 comments on commit ce6dd45

Please sign in to comment.