From d8dbf09db33414575d5bafb26014b5e6891bdcd9 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 6 Sep 2024 11:37:16 +0200 Subject: [PATCH] Remove snappy compression and add more metrics --- pkg/kafka/encoding.go | 124 +++++--------------- pkg/kafka/encoding_test.go | 12 +- pkg/kafka/ingester/consumer_test.go | 2 +- pkg/kafka/ingester/partition_reader_test.go | 10 +- pkg/kafka/tee/tee.go | 54 ++++++++- 5 files changed, 93 insertions(+), 109 deletions(-) diff --git a/pkg/kafka/encoding.go b/pkg/kafka/encoding.go index a2634c8d427f..49de2644ccb4 100644 --- a/pkg/kafka/encoding.go +++ b/pkg/kafka/encoding.go @@ -2,12 +2,10 @@ package kafka import ( - "bytes" "errors" "fmt" math_bits "math/bits" - "github.com/golang/snappy" "github.com/twmb/franz-go/pkg/kgo" lru "github.com/hashicorp/golang-lru" @@ -17,33 +15,15 @@ import ( "github.com/grafana/loki/v3/pkg/logql/syntax" ) -const defaultMaxSize = 10 << 20 // 10 MB - // Encoder is responsible for encoding logproto.Stream data into Kafka records. -// It handles compression and splitting of large streams into multiple records. +// It handles splitting of large streams into multiple records. type Encoder struct { - writer *snappy.Writer - buff []byte - batch logproto.Stream - maxSize int -} - -// NewEncoderWithMaxSize creates a new Encoder with a specified maximum record size. -// If maxSize is <= 0, it defaults to 10MB. -func NewEncoderWithMaxSize(maxSize int) *Encoder { - if maxSize <= 0 { - maxSize = defaultMaxSize - } - return &Encoder{ - writer: snappy.NewBufferedWriter(nil), - buff: make([]byte, 0, maxSize), - maxSize: maxSize, - } + batch logproto.Stream } -// NewEncoder creates a new Encoder with the default maximum record size of 10MB. +// NewEncoder creates a new Encoder. func NewEncoder() *Encoder { - return NewEncoderWithMaxSize(defaultMaxSize) + return &Encoder{} } // Encode converts a logproto.Stream into one or more Kafka records. @@ -52,19 +32,24 @@ func NewEncoder() *Encoder { // The encoding process works as follows: // 1. If the stream size is smaller than maxSize, it's encoded into a single record. // 2. For larger streams, it splits the entries into multiple batches, each under maxSize. -// 3. Each batch is compressed using Snappy compression. -// 4. The compressed data is wrapped in a Kafka record with the tenant ID as the key. +// 3. The data is wrapped in a Kafka record with the tenant ID as the key. // // The format of each record is: // - Key: Tenant ID (used for routing, not for partitioning) -// - Value: Snappy-compressed protobuf serialized logproto.Stream +// - Value: Protobuf serialized logproto.Stream // - Partition: As specified in the partitionID parameter -func (e *Encoder) Encode(partitionID int32, tenantID string, stream logproto.Stream) ([]*kgo.Record, error) { +// +// Parameters: +// - partitionID: The Kafka partition ID for the record +// - tenantID: The tenant ID for the stream +// - stream: The logproto.Stream to be encoded +// - maxSize: The maximum size of each Kafka record +func (e *Encoder) Encode(partitionID int32, tenantID string, stream logproto.Stream, maxSize int) ([]*kgo.Record, error) { reqSize := stream.Size() // Fast path for small requests - if reqSize <= e.maxSize { - rec, err := e.marshalWriteRequestToRecord(partitionID, tenantID, stream, reqSize) + if reqSize <= maxSize { + rec, err := e.marshalWriteRequestToRecord(partitionID, tenantID, stream) if err != nil { return nil, err } @@ -88,14 +73,14 @@ func (e *Encoder) Encode(partitionID int32, tenantID string, stream logproto.Str entrySize := 1 + l + sovPush(uint64(l)) // Check if a single entry is too big - if entrySize > e.maxSize || (i == 0 && currentSize+entrySize > e.maxSize) { - return nil, fmt.Errorf("single entry size (%d) exceeds maximum allowed size (%d)", entrySize, e.maxSize) + if entrySize > maxSize || (i == 0 && currentSize+entrySize > maxSize) { + return nil, fmt.Errorf("single entry size (%d) exceeds maximum allowed size (%d)", entrySize, maxSize) } - if currentSize+entrySize > e.maxSize { + if currentSize+entrySize > maxSize { // Current stream is full, create a record and start a new stream if len(e.batch.Entries) > 0 { - rec, err := e.marshalWriteRequestToRecord(partitionID, tenantID, e.batch, currentSize) + rec, err := e.marshalWriteRequestToRecord(partitionID, tenantID, e.batch) if err != nil { return nil, err } @@ -111,7 +96,7 @@ func (e *Encoder) Encode(partitionID int32, tenantID string, stream logproto.Str // Handle any remaining entries if len(e.batch.Entries) > 0 { - rec, err := e.marshalWriteRequestToRecord(partitionID, tenantID, e.batch, currentSize) + rec, err := e.marshalWriteRequestToRecord(partitionID, tenantID, e.batch) if err != nil { return nil, err } @@ -125,83 +110,46 @@ func (e *Encoder) Encode(partitionID int32, tenantID string, stream logproto.Str return records, nil } -func (e *Encoder) SetMaxRecordSize(maxSize int) { - e.maxSize = maxSize -} - -func (e *Encoder) marshalWriteRequestToRecord(partitionID int32, tenantID string, stream logproto.Stream, size int) (*kgo.Record, error) { - // todo(cyriltovena): We could consider a better format to store the data avoiding all the allocations. - // Using Apache Arrow could be a good option. - e.buff = e.buff[:size] - n, err := stream.MarshalToSizedBuffer(e.buff) +func (e *Encoder) marshalWriteRequestToRecord(partitionID int32, tenantID string, stream logproto.Stream) (*kgo.Record, error) { + data, err := stream.Marshal() if err != nil { - return nil, fmt.Errorf("failed to serialise write request: %w", err) - } - e.buff = e.buff[:n] - buffer := bytes.NewBuffer(make([]byte, 0, n/2)) - e.writer.Reset(buffer) - if _, err := e.writer.Write(e.buff); err != nil { - return nil, fmt.Errorf("failed to write data to buffer: %w", err) - } - if err := e.writer.Flush(); err != nil { - return nil, fmt.Errorf("failed to flush writer: %w", err) + return nil, fmt.Errorf("failed to marshal stream: %w", err) } return &kgo.Record{ - // We don't partition based on the key, so the value here doesn't make any difference. Key: []byte(tenantID), - Value: buffer.Bytes(), + Value: data, Partition: partitionID, }, nil } // Decoder is responsible for decoding Kafka record data back into logproto.Stream format. -// It handles decompression and caches parsed labels for efficiency. +// It caches parsed labels for efficiency. type Decoder struct { - stream *logproto.Stream - cache *lru.Cache - snappReader *snappy.Reader - reader *bytes.Reader - - buff *bytes.Buffer + stream *logproto.Stream + cache *lru.Cache } -// NewDecoder creates a new Decoder with default settings. func NewDecoder() (*Decoder, error) { - return NewDecoderWithExpectedSize(defaultMaxSize) -} - -// NewDecoderWithExpectedSize creates a new Decoder with a specified expected record size. -func NewDecoderWithExpectedSize(expectedSize int) (*Decoder, error) { cache, err := lru.New(5000) // Set LRU size to 5000, adjust as needed if err != nil { return nil, fmt.Errorf("failed to create LRU cache: %w", err) } return &Decoder{ - stream: &logproto.Stream{}, - cache: cache, - snappReader: snappy.NewReader(nil), - reader: bytes.NewReader(nil), - buff: bytes.NewBuffer(make([]byte, 0, expectedSize)), + stream: &logproto.Stream{}, + cache: cache, }, nil } // Decode converts a Kafka record's byte data back into a logproto.Stream and labels.Labels. // The decoding process works as follows: -// 1. Decompress the data using Snappy decompression. -// 2. Unmarshal the decompressed data into a logproto.Stream. -// 3. Parse and cache the labels for efficiency in future decodes. +// 1. Unmarshal the data into a logproto.Stream. +// 2. Parse and cache the labels for efficiency in future decodes. // // Returns the decoded logproto.Stream, parsed labels, and any error encountered. func (d *Decoder) Decode(data []byte) (logproto.Stream, labels.Labels, error) { d.stream.Entries = d.stream.Entries[:0] - d.reader.Reset(data) - d.snappReader.Reset(d.reader) - d.buff.Reset() - if _, err := d.buff.ReadFrom(d.snappReader); err != nil { - return logproto.Stream{}, nil, fmt.Errorf("failed to read from snappy reader: %w", err) - } - if err := d.stream.Unmarshal(d.buff.Bytes()); err != nil { + if err := d.stream.Unmarshal(data); err != nil { return logproto.Stream{}, nil, fmt.Errorf("failed to unmarshal stream: %w", err) } @@ -223,14 +171,6 @@ func (d *Decoder) Decode(data []byte) (logproto.Stream, labels.Labels, error) { // sovPush calculates the size of varint-encoded uint64. // It is used to determine the number of bytes needed to encode a uint64 value // in Protocol Buffers' variable-length integer format. -// -// The function works as follows: -// 1. It uses math_bits.Len64 to find the position of the most significant bit. -// 2. It adds 1 to ensure non-zero values are handled correctly. -// 3. It adds 6 and divides by 7 to calculate the number of 7-bit groups needed. -// -// This is an optimization for Protocol Buffers encoding, avoiding the need to -// actually encode the number to determine its encoded size. func sovPush(x uint64) (n int) { return (math_bits.Len64(x|1) + 6) / 7 } diff --git a/pkg/kafka/encoding_test.go b/pkg/kafka/encoding_test.go index f15d791de117..dc8f18a05444 100644 --- a/pkg/kafka/encoding_test.go +++ b/pkg/kafka/encoding_test.go @@ -34,11 +34,11 @@ func TestEncoderDecoder(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - encoder := NewEncoderWithMaxSize(tt.maxSize) + encoder := NewEncoder() decoder, err := NewDecoder() require.NoError(t, err) - records, err := encoder.Encode(0, "test-tenant", tt.stream) + records, err := encoder.Encode(0, "test-tenant", tt.stream, tt.maxSize) require.NoError(t, err) if tt.expectSplit { @@ -72,10 +72,10 @@ func TestEncoderDecoder(t *testing.T) { } func TestEncoderSingleEntryTooLarge(t *testing.T) { - encoder := NewEncoderWithMaxSize(100) + encoder := NewEncoder() stream := generateStream(1, 1000) - _, err := encoder.Encode(0, "test-tenant", stream) + _, err := encoder.Encode(0, "test-tenant", stream, 100) require.Error(t, err) require.Contains(t, err.Error(), "single entry size") } @@ -97,7 +97,7 @@ func TestEncoderDecoderEmptyStream(t *testing.T) { Labels: `{app="test"}`, } - records, err := encoder.Encode(0, "test-tenant", stream) + records, err := encoder.Encode(0, "test-tenant", stream, 10<<20) require.NoError(t, err) require.Len(t, records, 1) @@ -114,7 +114,7 @@ func BenchmarkEncodeDecode(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - records, err := encoder.Encode(0, "test-tenant", stream) + records, err := encoder.Encode(0, "test-tenant", stream, 10<<20) if err != nil { b.Fatal(err) } diff --git a/pkg/kafka/ingester/consumer_test.go b/pkg/kafka/ingester/consumer_test.go index d7a97b8f7eff..ed0c0ca01746 100644 --- a/pkg/kafka/ingester/consumer_test.go +++ b/pkg/kafka/ingester/consumer_test.go @@ -49,7 +49,7 @@ func TestConsumer(t *testing.T) { var records []record for i, stream := range streams { // Encode the stream - encodedRecords, err := encoder.Encode(int32(i), fmt.Sprintf("tenant%d", i+1), stream) + encodedRecords, err := encoder.Encode(int32(i), fmt.Sprintf("tenant%d", i+1), stream, 10<<20) require.NoError(t, err) // Convert encoded records to our test record format diff --git a/pkg/kafka/ingester/partition_reader_test.go b/pkg/kafka/ingester/partition_reader_test.go index 208c3a8199cf..3126d1488a7f 100644 --- a/pkg/kafka/ingester/partition_reader_test.go +++ b/pkg/kafka/ingester/partition_reader_test.go @@ -53,7 +53,7 @@ func TestPartitionReader_BasicFunctionality(t *testing.T) { } encoder := kafka.NewEncoder() - records, err := encoder.Encode(0, "test-tenant", stream) + records, err := encoder.Encode(0, "test-tenant", stream, 10<<20) require.NoError(t, err) require.Len(t, records, 1) @@ -89,7 +89,7 @@ func TestPartitionReader_ConsumerError(t *testing.T) { } encoder := kafka.NewEncoder() - records, err := encoder.Encode(0, "test-tenant", stream) + records, err := encoder.Encode(0, "test-tenant", stream, 10<<20) require.NoError(t, err) consumerError := errors.New("consumer error") @@ -129,7 +129,7 @@ func TestPartitionReader_FlushAndCommit(t *testing.T) { } encoder := kafka.NewEncoder() - records, err := encoder.Encode(0, "test-tenant", stream) + records, err := encoder.Encode(0, "test-tenant", stream, 10<<20) require.NoError(t, err) consumer.On("Consume", mock.Anything, mock.Anything, mock.Anything).Return(nil) @@ -210,9 +210,9 @@ func TestPartitionReader_MultipleRecords(t *testing.T) { } encoder := kafka.NewEncoder() - records1, err := encoder.Encode(0, "test-tenant", stream1) + records1, err := encoder.Encode(0, "test-tenant", stream1, 10<<20) require.NoError(t, err) - records2, err := encoder.Encode(0, "test-tenant", stream2) + records2, err := encoder.Encode(0, "test-tenant", stream2, 10<<20) require.NoError(t, err) consumer.On("Consume", mock.Anything, mock.Anything, mock.Anything).Return(nil) diff --git a/pkg/kafka/tee/tee.go b/pkg/kafka/tee/tee.go index 3c6a45eaae6d..2fc1c6610dc8 100644 --- a/pkg/kafka/tee/tee.go +++ b/pkg/kafka/tee/tee.go @@ -12,6 +12,7 @@ import ( "github.com/grafana/dskit/user" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/twmb/franz-go/pkg/kgo" "github.com/grafana/loki/v3/pkg/distributor" "github.com/grafana/loki/v3/pkg/kafka" @@ -26,7 +27,10 @@ type Tee struct { partitionRing ring.PartitionRingReader cfg kafka.Config - ingesterAppends *prometheus.CounterVec + ingesterAppends *prometheus.CounterVec + writeLatency prometheus.Histogram + writeBytesTotal prometheus.Counter + recordsPerRequest prometheus.Histogram } // NewTee creates and initializes a new Tee instance. @@ -54,17 +58,35 @@ func NewTee( return nil, fmt.Errorf("failed to start kafka client: %w", err) } producer := kafka.NewProducer(kafkaClient, cfg.ProducerMaxBufferedBytes, - prometheus.WrapRegistererWithPrefix(metricsNamespace+"_kafka_ingester_", registerer)) + prometheus.WrapRegistererWithPrefix("_kafka_ingester_", registerer)) t := &Tee{ logger: log.With(logger, "component", "kafka-tee"), ingesterAppends: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ - Name: "loki_kafka_ingester_appends_total", + Name: "kafka_ingester_appends_total", Help: "The total number of appends sent to kafka ingest path.", }, []string{"partition", "status"}), producer: producer, partitionRing: partitionRing, cfg: cfg, + // Metrics. + writeLatency: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{ + Name: "kafka_ingester_tee_latency_seconds", + Help: "Latency to write an incoming request to the ingest storage.", + NativeHistogramBucketFactor: 1.1, + NativeHistogramMinResetDuration: 1 * time.Hour, + NativeHistogramMaxBucketNumber: 100, + Buckets: prometheus.DefBuckets, + }), + writeBytesTotal: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Name: "kafka_ingester_tee_sent_bytes_total", + Help: "Total number of bytes sent to the ingest storage.", + }), + recordsPerRequest: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{ + Name: "kafka_ingester_tee_records_per_write_request", + Help: "The number of records a single per-partition write request has been split into.", + Buckets: prometheus.ExponentialBuckets(1, 2, 8), + }), } return t, nil @@ -106,25 +128,36 @@ var encoderPool = sync.Pool{ // Returns: // - An error if the stream couldn't be sent successfully func (t *Tee) sendStream(tenant string, stream distributor.KeyedStream) error { + if len(stream.Stream.Entries) == 0 { + return nil + } partitionID, err := t.partitionRing.PartitionRing().ActivePartitionForKey(stream.HashKey) if err != nil { t.ingesterAppends.WithLabelValues("partition_unknown", "fail").Inc() return fmt.Errorf("failed to find active partition for stream: %w", err) } + startTime := time.Now() encoder := encoderPool.Get().(*kafka.Encoder) - encoder.SetMaxRecordSize(t.cfg.ProducerMaxRecordSizeBytes) - records, err := encoder.Encode(partitionID, tenant, stream.Stream) + records, err := encoder.Encode(partitionID, tenant, stream.Stream, t.cfg.ProducerMaxRecordSizeBytes) if err != nil { t.ingesterAppends.WithLabelValues(fmt.Sprintf("partition_%d", partitionID), "fail").Inc() + encoderPool.Put(encoder) return fmt.Errorf("failed to marshal write request to records: %w", err) } encoderPool.Put(encoder) + t.recordsPerRequest.Observe(float64(len(records))) + ctx, cancel := context.WithTimeout(user.InjectOrgID(context.Background(), tenant), writeTimeout) defer cancel() produceResults := t.producer.ProduceSync(ctx, records) + if count, sizeBytes := successfulProduceRecordsStats(produceResults); count > 0 { + t.writeLatency.Observe(time.Since(startTime).Seconds()) + t.writeBytesTotal.Add(float64(sizeBytes)) + } + var finalErr error for _, result := range produceResults { if result.Err != nil { @@ -137,3 +170,14 @@ func (t *Tee) sendStream(tenant string, stream distributor.KeyedStream) error { return finalErr } + +func successfulProduceRecordsStats(results kgo.ProduceResults) (count, sizeBytes int) { + for _, res := range results { + if res.Err == nil && res.Record != nil { + count++ + sizeBytes += len(res.Record.Value) + } + } + + return +}