Skip to content

Commit

Permalink
Remove snappy compression and add more metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
cyriltovena committed Sep 6, 2024
1 parent f27711c commit d8dbf09
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 109 deletions.
124 changes: 32 additions & 92 deletions pkg/kafka/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@
package kafka

import (
"bytes"
"errors"
"fmt"
math_bits "math/bits"

"github.com/golang/snappy"
"github.com/twmb/franz-go/pkg/kgo"

lru "github.com/hashicorp/golang-lru"
Expand All @@ -17,33 +15,15 @@ import (
"github.com/grafana/loki/v3/pkg/logql/syntax"
)

const defaultMaxSize = 10 << 20 // 10 MB

// Encoder is responsible for encoding logproto.Stream data into Kafka records.
// It handles compression and splitting of large streams into multiple records.
// It handles splitting of large streams into multiple records.
type Encoder struct {
writer *snappy.Writer
buff []byte
batch logproto.Stream
maxSize int
}

// NewEncoderWithMaxSize creates a new Encoder with a specified maximum record size.
// If maxSize is <= 0, it defaults to 10MB.
func NewEncoderWithMaxSize(maxSize int) *Encoder {
if maxSize <= 0 {
maxSize = defaultMaxSize
}
return &Encoder{
writer: snappy.NewBufferedWriter(nil),
buff: make([]byte, 0, maxSize),
maxSize: maxSize,
}
batch logproto.Stream
}

// NewEncoder creates a new Encoder with the default maximum record size of 10MB.
// NewEncoder creates a new Encoder.
func NewEncoder() *Encoder {
return NewEncoderWithMaxSize(defaultMaxSize)
return &Encoder{}
}

// Encode converts a logproto.Stream into one or more Kafka records.
Expand All @@ -52,19 +32,24 @@ func NewEncoder() *Encoder {
// The encoding process works as follows:
// 1. If the stream size is smaller than maxSize, it's encoded into a single record.
// 2. For larger streams, it splits the entries into multiple batches, each under maxSize.
// 3. Each batch is compressed using Snappy compression.
// 4. The compressed data is wrapped in a Kafka record with the tenant ID as the key.
// 3. The data is wrapped in a Kafka record with the tenant ID as the key.
//
// The format of each record is:
// - Key: Tenant ID (used for routing, not for partitioning)
// - Value: Snappy-compressed protobuf serialized logproto.Stream
// - Value: Protobuf serialized logproto.Stream
// - Partition: As specified in the partitionID parameter
func (e *Encoder) Encode(partitionID int32, tenantID string, stream logproto.Stream) ([]*kgo.Record, error) {
//
// Parameters:
// - partitionID: The Kafka partition ID for the record
// - tenantID: The tenant ID for the stream
// - stream: The logproto.Stream to be encoded
// - maxSize: The maximum size of each Kafka record
func (e *Encoder) Encode(partitionID int32, tenantID string, stream logproto.Stream, maxSize int) ([]*kgo.Record, error) {
reqSize := stream.Size()

// Fast path for small requests
if reqSize <= e.maxSize {
rec, err := e.marshalWriteRequestToRecord(partitionID, tenantID, stream, reqSize)
if reqSize <= maxSize {
rec, err := e.marshalWriteRequestToRecord(partitionID, tenantID, stream)
if err != nil {
return nil, err
}
Expand All @@ -88,14 +73,14 @@ func (e *Encoder) Encode(partitionID int32, tenantID string, stream logproto.Str
entrySize := 1 + l + sovPush(uint64(l))

// Check if a single entry is too big
if entrySize > e.maxSize || (i == 0 && currentSize+entrySize > e.maxSize) {
return nil, fmt.Errorf("single entry size (%d) exceeds maximum allowed size (%d)", entrySize, e.maxSize)
if entrySize > maxSize || (i == 0 && currentSize+entrySize > maxSize) {
return nil, fmt.Errorf("single entry size (%d) exceeds maximum allowed size (%d)", entrySize, maxSize)
}

if currentSize+entrySize > e.maxSize {
if currentSize+entrySize > maxSize {
// Current stream is full, create a record and start a new stream
if len(e.batch.Entries) > 0 {
rec, err := e.marshalWriteRequestToRecord(partitionID, tenantID, e.batch, currentSize)
rec, err := e.marshalWriteRequestToRecord(partitionID, tenantID, e.batch)
if err != nil {
return nil, err
}
Expand All @@ -111,7 +96,7 @@ func (e *Encoder) Encode(partitionID int32, tenantID string, stream logproto.Str

// Handle any remaining entries
if len(e.batch.Entries) > 0 {
rec, err := e.marshalWriteRequestToRecord(partitionID, tenantID, e.batch, currentSize)
rec, err := e.marshalWriteRequestToRecord(partitionID, tenantID, e.batch)
if err != nil {
return nil, err
}
Expand All @@ -125,83 +110,46 @@ func (e *Encoder) Encode(partitionID int32, tenantID string, stream logproto.Str
return records, nil
}

func (e *Encoder) SetMaxRecordSize(maxSize int) {
e.maxSize = maxSize
}

func (e *Encoder) marshalWriteRequestToRecord(partitionID int32, tenantID string, stream logproto.Stream, size int) (*kgo.Record, error) {
// todo(cyriltovena): We could consider a better format to store the data avoiding all the allocations.
// Using Apache Arrow could be a good option.
e.buff = e.buff[:size]
n, err := stream.MarshalToSizedBuffer(e.buff)
func (e *Encoder) marshalWriteRequestToRecord(partitionID int32, tenantID string, stream logproto.Stream) (*kgo.Record, error) {
data, err := stream.Marshal()
if err != nil {
return nil, fmt.Errorf("failed to serialise write request: %w", err)
}
e.buff = e.buff[:n]
buffer := bytes.NewBuffer(make([]byte, 0, n/2))
e.writer.Reset(buffer)
if _, err := e.writer.Write(e.buff); err != nil {
return nil, fmt.Errorf("failed to write data to buffer: %w", err)
}
if err := e.writer.Flush(); err != nil {
return nil, fmt.Errorf("failed to flush writer: %w", err)
return nil, fmt.Errorf("failed to marshal stream: %w", err)
}

return &kgo.Record{
// We don't partition based on the key, so the value here doesn't make any difference.
Key: []byte(tenantID),
Value: buffer.Bytes(),
Value: data,
Partition: partitionID,
}, nil
}

// Decoder is responsible for decoding Kafka record data back into logproto.Stream format.
// It handles decompression and caches parsed labels for efficiency.
// It caches parsed labels for efficiency.
type Decoder struct {
stream *logproto.Stream
cache *lru.Cache
snappReader *snappy.Reader
reader *bytes.Reader

buff *bytes.Buffer
stream *logproto.Stream
cache *lru.Cache
}

// NewDecoder creates a new Decoder with default settings.
func NewDecoder() (*Decoder, error) {
return NewDecoderWithExpectedSize(defaultMaxSize)
}

// NewDecoderWithExpectedSize creates a new Decoder with a specified expected record size.
func NewDecoderWithExpectedSize(expectedSize int) (*Decoder, error) {
cache, err := lru.New(5000) // Set LRU size to 5000, adjust as needed
if err != nil {
return nil, fmt.Errorf("failed to create LRU cache: %w", err)
}
return &Decoder{
stream: &logproto.Stream{},
cache: cache,
snappReader: snappy.NewReader(nil),
reader: bytes.NewReader(nil),
buff: bytes.NewBuffer(make([]byte, 0, expectedSize)),
stream: &logproto.Stream{},
cache: cache,
}, nil
}

// Decode converts a Kafka record's byte data back into a logproto.Stream and labels.Labels.
// The decoding process works as follows:
// 1. Decompress the data using Snappy decompression.
// 2. Unmarshal the decompressed data into a logproto.Stream.
// 3. Parse and cache the labels for efficiency in future decodes.
// 1. Unmarshal the data into a logproto.Stream.
// 2. Parse and cache the labels for efficiency in future decodes.
//
// Returns the decoded logproto.Stream, parsed labels, and any error encountered.
func (d *Decoder) Decode(data []byte) (logproto.Stream, labels.Labels, error) {
d.stream.Entries = d.stream.Entries[:0]
d.reader.Reset(data)
d.snappReader.Reset(d.reader)
d.buff.Reset()
if _, err := d.buff.ReadFrom(d.snappReader); err != nil {
return logproto.Stream{}, nil, fmt.Errorf("failed to read from snappy reader: %w", err)
}
if err := d.stream.Unmarshal(d.buff.Bytes()); err != nil {
if err := d.stream.Unmarshal(data); err != nil {
return logproto.Stream{}, nil, fmt.Errorf("failed to unmarshal stream: %w", err)
}

Expand All @@ -223,14 +171,6 @@ func (d *Decoder) Decode(data []byte) (logproto.Stream, labels.Labels, error) {
// sovPush calculates the size of varint-encoded uint64.
// It is used to determine the number of bytes needed to encode a uint64 value
// in Protocol Buffers' variable-length integer format.
//
// The function works as follows:
// 1. It uses math_bits.Len64 to find the position of the most significant bit.
// 2. It adds 1 to ensure non-zero values are handled correctly.
// 3. It adds 6 and divides by 7 to calculate the number of 7-bit groups needed.
//
// This is an optimization for Protocol Buffers encoding, avoiding the need to
// actually encode the number to determine its encoded size.
func sovPush(x uint64) (n int) {
return (math_bits.Len64(x|1) + 6) / 7
}
12 changes: 6 additions & 6 deletions pkg/kafka/encoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ func TestEncoderDecoder(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
encoder := NewEncoderWithMaxSize(tt.maxSize)
encoder := NewEncoder()
decoder, err := NewDecoder()
require.NoError(t, err)

records, err := encoder.Encode(0, "test-tenant", tt.stream)
records, err := encoder.Encode(0, "test-tenant", tt.stream, tt.maxSize)
require.NoError(t, err)

if tt.expectSplit {
Expand Down Expand Up @@ -72,10 +72,10 @@ func TestEncoderDecoder(t *testing.T) {
}

func TestEncoderSingleEntryTooLarge(t *testing.T) {
encoder := NewEncoderWithMaxSize(100)
encoder := NewEncoder()
stream := generateStream(1, 1000)

_, err := encoder.Encode(0, "test-tenant", stream)
_, err := encoder.Encode(0, "test-tenant", stream, 100)
require.Error(t, err)
require.Contains(t, err.Error(), "single entry size")
}
Expand All @@ -97,7 +97,7 @@ func TestEncoderDecoderEmptyStream(t *testing.T) {
Labels: `{app="test"}`,
}

records, err := encoder.Encode(0, "test-tenant", stream)
records, err := encoder.Encode(0, "test-tenant", stream, 10<<20)
require.NoError(t, err)
require.Len(t, records, 1)

Expand All @@ -114,7 +114,7 @@ func BenchmarkEncodeDecode(b *testing.B) {

b.ResetTimer()
for i := 0; i < b.N; i++ {
records, err := encoder.Encode(0, "test-tenant", stream)
records, err := encoder.Encode(0, "test-tenant", stream, 10<<20)
if err != nil {
b.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kafka/ingester/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestConsumer(t *testing.T) {
var records []record
for i, stream := range streams {
// Encode the stream
encodedRecords, err := encoder.Encode(int32(i), fmt.Sprintf("tenant%d", i+1), stream)
encodedRecords, err := encoder.Encode(int32(i), fmt.Sprintf("tenant%d", i+1), stream, 10<<20)
require.NoError(t, err)

// Convert encoded records to our test record format
Expand Down
10 changes: 5 additions & 5 deletions pkg/kafka/ingester/partition_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestPartitionReader_BasicFunctionality(t *testing.T) {
}
encoder := kafka.NewEncoder()

records, err := encoder.Encode(0, "test-tenant", stream)
records, err := encoder.Encode(0, "test-tenant", stream, 10<<20)
require.NoError(t, err)
require.Len(t, records, 1)

Expand Down Expand Up @@ -89,7 +89,7 @@ func TestPartitionReader_ConsumerError(t *testing.T) {
}
encoder := kafka.NewEncoder()

records, err := encoder.Encode(0, "test-tenant", stream)
records, err := encoder.Encode(0, "test-tenant", stream, 10<<20)
require.NoError(t, err)

consumerError := errors.New("consumer error")
Expand Down Expand Up @@ -129,7 +129,7 @@ func TestPartitionReader_FlushAndCommit(t *testing.T) {
}
encoder := kafka.NewEncoder()

records, err := encoder.Encode(0, "test-tenant", stream)
records, err := encoder.Encode(0, "test-tenant", stream, 10<<20)
require.NoError(t, err)

consumer.On("Consume", mock.Anything, mock.Anything, mock.Anything).Return(nil)
Expand Down Expand Up @@ -210,9 +210,9 @@ func TestPartitionReader_MultipleRecords(t *testing.T) {
}
encoder := kafka.NewEncoder()

records1, err := encoder.Encode(0, "test-tenant", stream1)
records1, err := encoder.Encode(0, "test-tenant", stream1, 10<<20)
require.NoError(t, err)
records2, err := encoder.Encode(0, "test-tenant", stream2)
records2, err := encoder.Encode(0, "test-tenant", stream2, 10<<20)
require.NoError(t, err)

consumer.On("Consume", mock.Anything, mock.Anything, mock.Anything).Return(nil)
Expand Down
Loading

0 comments on commit d8dbf09

Please sign in to comment.