Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: instrument failed chunk encoding/decoding #13684

Merged
merged 2 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions pkg/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ingester

import (
"bytes"
"errors"
"fmt"
"net/http"
"sync"
Expand Down Expand Up @@ -62,7 +63,7 @@ func (i *Ingester) Flush() {
}

// TransferOut implements ring.FlushTransferer
// Noop implemenetation because ingesters have a WAL now that does not require transferring chunks any more.
// Noop implementation because ingesters have a WAL now that does not require transferring chunks any more.
// We return ErrTransferDisabled to indicate that we don't support transfers, and therefore we may flush on shutdown if configured to do so.
func (i *Ingester) TransferOut(_ context.Context) error {
return ring.ErrTransferDisabled
Expand Down Expand Up @@ -179,7 +180,6 @@ func (i *Ingester) flushLoop(j int) {

m := util_log.WithUserID(op.userID, l)
err := i.flushOp(m, op)

if err != nil {
level.Error(m).Log("msg", "failed to flush", "err", err)
}
Expand Down Expand Up @@ -410,10 +410,15 @@ func (i *Ingester) encodeChunk(ctx context.Context, ch *chunk.Chunk, desc *chunk
}
start := time.Now()
chunkBytesSize := desc.chunk.BytesSize() + 4*1024 // size + 4kB should be enough room for cortex header
if err := ch.EncodeTo(bytes.NewBuffer(make([]byte, 0, chunkBytesSize))); err != nil {
return fmt.Errorf("chunk encoding: %w", err)
if err := ch.EncodeTo(bytes.NewBuffer(make([]byte, 0, chunkBytesSize)), i.logger); err != nil {
if !errors.Is(err, chunk.ErrChunkDecode) {
return fmt.Errorf("chunk encoding: %w", err)
}

i.metrics.chunkDecodeFailures.WithLabelValues(ch.UserID).Inc()
}
i.metrics.chunkEncodeTime.Observe(time.Since(start).Seconds())
i.metrics.chunksEncoded.WithLabelValues(ch.UserID).Inc()
return nil
}

Expand Down
30 changes: 24 additions & 6 deletions pkg/ingester/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type ingesterMetrics struct {
chunksFlushFailures prometheus.Counter
chunksFlushedPerReason *prometheus.CounterVec
chunkLifespan prometheus.Histogram
chunksEncoded *prometheus.CounterVec
chunkDecodeFailures *prometheus.CounterVec
flushedChunksStats *analytics.Counter
flushedChunksBytesStats *analytics.Statistics
flushedChunksLinesStats *analytics.Statistics
Expand Down Expand Up @@ -252,12 +254,28 @@ func newIngesterMetrics(r prometheus.Registerer, metricsNamespace string) *inges
// 1h -> 8hr
Buckets: prometheus.LinearBuckets(1, 1, 8),
}),
flushedChunksStats: analytics.NewCounter("ingester_flushed_chunks"),
flushedChunksBytesStats: analytics.NewStatistics("ingester_flushed_chunks_bytes"),
flushedChunksLinesStats: analytics.NewStatistics("ingester_flushed_chunks_lines"),
flushedChunksAgeStats: analytics.NewStatistics("ingester_flushed_chunks_age_seconds"),
flushedChunksLifespanStats: analytics.NewStatistics("ingester_flushed_chunks_lifespan_seconds"),
flushedChunksUtilizationStats: analytics.NewStatistics("ingester_flushed_chunks_utilization"),
chunksEncoded: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: constants.Loki,
Name: "ingester_chunks_encoded_total",
Help: "The total number of chunks encoded in the ingester.",
}, []string{"user"}),
chunkDecodeFailures: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: constants.Loki,
Name: "ingester_chunk_decode_failures_total",
Help: "The number of freshly encoded chunks that failed to decode.",
}, []string{"user"}),
flushedChunksStats: analytics.NewCounter("ingester_flushed_chunks"),
flushedChunksBytesStats: analytics.NewStatistics("ingester_flushed_chunks_bytes"),
flushedChunksLinesStats: analytics.NewStatistics("ingester_flushed_chunks_lines"),
flushedChunksAgeStats: analytics.NewStatistics(
"ingester_flushed_chunks_age_seconds",
),
flushedChunksLifespanStats: analytics.NewStatistics(
"ingester_flushed_chunks_lifespan_seconds",
),
flushedChunksUtilizationStats: analytics.NewStatistics(
"ingester_flushed_chunks_utilization",
),
chunksCreatedTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: constants.Loki,
Name: "ingester_chunks_created_total",
Expand Down
35 changes: 33 additions & 2 deletions pkg/storage/chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package chunk
import (
"bytes"
"encoding/binary"
"fmt"
"hash/crc32"
"reflect"
"strconv"
Expand All @@ -12,13 +13,16 @@ import (

errs "errors"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/golang/snappy"
jsoniter "github.com/json-iterator/go"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/v3/pkg/logproto"
util_log "github.com/grafana/loki/v3/pkg/util/log"
)

var (
Expand All @@ -27,6 +31,7 @@ var (
ErrMetadataLength = errs.New("chunk metadata wrong length")
ErrDataLength = errs.New("chunk data wrong length")
ErrSliceOutOfRange = errs.New("chunk can't be sliced out of its data range")
ErrChunkDecode = errs.New("error decoding freshly created chunk")
)

var castagnoliTable = crc32.MakeTable(crc32.Castagnoli)
Expand Down Expand Up @@ -227,11 +232,11 @@ var writerPool = sync.Pool{

// Encode writes the chunk into a buffer, and calculates the checksum.
func (c *Chunk) Encode() error {
return c.EncodeTo(nil)
return c.EncodeTo(nil, util_log.Logger)
}

// EncodeTo is like Encode but you can provide your own buffer to use.
func (c *Chunk) EncodeTo(buf *bytes.Buffer) error {
func (c *Chunk) EncodeTo(buf *bytes.Buffer, log log.Logger) error {
if buf == nil {
buf = bytes.NewBuffer(nil)
}
Expand Down Expand Up @@ -275,6 +280,32 @@ func (c *Chunk) EncodeTo(buf *bytes.Buffer) error {
// Now work out the checksum
c.encoded = buf.Bytes()
c.Checksum = crc32.Checksum(c.encoded, castagnoliTable)

newCh := Chunk{
ChunkRef: logproto.ChunkRef{
UserID: c.UserID,
Fingerprint: c.Fingerprint,
From: c.From,
Through: c.Through,
Checksum: c.Checksum,
},
}

if err := newCh.Decode(NewDecodeContext(), c.encoded); err != nil {
externalKey := fmt.Sprintf(
"%s/%x/%x:%x:%x",
c.UserID,
c.Fingerprint,
int64(c.From),
int64(c.Through),
c.Checksum,
)
level.Error(log).
Log("msg", "error decoding freshly created chunk", "err", err, "key", externalKey)

return ErrChunkDecode
}

return nil
}

Expand Down
17 changes: 17 additions & 0 deletions pkg/storage/chunk/client/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type ChunkClientMetrics struct {
chunksSizePutPerUser *prometheus.CounterVec
chunksFetchedPerUser *prometheus.CounterVec
chunksSizeFetchedPerUser *prometheus.CounterVec
chunkDecodeFailures *prometheus.CounterVec
}

func NewChunkClientMetrics(reg prometheus.Registerer) ChunkClientMetrics {
Expand All @@ -53,6 +54,11 @@ func NewChunkClientMetrics(reg prometheus.Registerer) ChunkClientMetrics {
Name: "chunk_store_fetched_chunk_bytes_total",
Help: "Total bytes fetched in chunks per user.",
}, []string{"user"}),
chunkDecodeFailures: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: constants.Loki,
Name: "chunk_store_decode_failures_total",
Help: "Total chunk decoding failures.",
}, []string{"user"}),
}
}

Expand Down Expand Up @@ -85,6 +91,17 @@ func (c MetricsChunkClient) PutChunks(ctx context.Context, chunks []chunk.Chunk)
func (c MetricsChunkClient) GetChunks(ctx context.Context, chunks []chunk.Chunk) ([]chunk.Chunk, error) {
chks, err := c.Client.GetChunks(ctx, chunks)
if err != nil {
// Get chunks fetches chunks in parallel, and returns any error. As a result we don't know which chunk failed,
// so we increment the metric for all tenants with chunks in the request. I think in practice we're only ever
// fetching chunks for a single tenant at a time anyway?
affectedUsers := map[string]struct{}{}
for _, chk := range chks {
affectedUsers[chk.UserID] = struct{}{}
}
for user := range affectedUsers {
c.metrics.chunkDecodeFailures.WithLabelValues(user).Inc()
}

return chks, err
}

Expand Down
9 changes: 8 additions & 1 deletion pkg/storage/chunk/client/object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,14 @@ func (o *client) getChunk(ctx context.Context, decodeContext *chunk.DecodeContex
}

if err := c.Decode(decodeContext, buf.Bytes()); err != nil {
return chunk.Chunk{}, errors.WithStack(fmt.Errorf("failed to decode chunk '%s': %w", key, err))
return chunk.Chunk{}, errors.WithStack(
fmt.Errorf(
"failed to decode chunk '%s' for tenant `%s`: %w",
key,
c.ChunkRef.UserID,
err,
),
)
}
return c, nil
}
Expand Down
Loading