Skip to content

Commit

Permalink
chore: Move compression utilities into separate package (#14167)
Browse files Browse the repository at this point in the history
Compression tooling has been part of the `chunkenc` (chunk encoding) package in the past for legacy reasons.

Since more components use this now, it's easier to keep it in a separate package. This also eliminates the confusion around "encoding", since this has been incorrectly used synonymously with "compression" in the past.

---
Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
Co-authored-by: Robert Fratto <robertfratto@gmail.com>
  • Loading branch information
chaudum authored Sep 19, 2024
1 parent ce2e6d5 commit 9637790
Show file tree
Hide file tree
Showing 54 changed files with 776 additions and 714 deletions.
4 changes: 2 additions & 2 deletions pkg/bloombuild/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (

"github.com/grafana/loki/v3/pkg/bloombuild/common"
"github.com/grafana/loki/v3/pkg/bloombuild/protos"
"github.com/grafana/loki/v3/pkg/chunkenc"
"github.com/grafana/loki/v3/pkg/compression"
iter "github.com/grafana/loki/v3/pkg/iter/v2"
"github.com/grafana/loki/v3/pkg/storage"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
Expand Down Expand Up @@ -333,7 +333,7 @@ func (b *Builder) processTask(
return nil, fmt.Errorf("failed to get client: %w", err)
}

blockEnc, err := chunkenc.ParseEncoding(b.limits.BloomBlockEncoding(task.Tenant))
blockEnc, err := compression.ParseEncoding(b.limits.BloomBlockEncoding(task.Tenant))
if err != nil {
return nil, fmt.Errorf("failed to parse block encoding: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloombuild/builder/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/chunkenc"
"github.com/grafana/loki/v3/pkg/compression"
v2 "github.com/grafana/loki/v3/pkg/iter/v2"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
Expand Down Expand Up @@ -115,7 +115,7 @@ func dummyBloomGen(t *testing.T, opts v1.BlockOptions, store v2.Iterator[*v1.Ser

func TestSimpleBloomGenerator(t *testing.T) {
const maxBlockSize = 100 << 20 // 100MB
for _, enc := range []chunkenc.Encoding{chunkenc.EncNone, chunkenc.EncGZIP, chunkenc.EncSnappy} {
for _, enc := range []compression.Encoding{compression.EncNone, compression.EncGZIP, compression.EncSnappy} {
for _, tc := range []struct {
desc string
fromSchema, toSchema v1.BlockOptions
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloombuild/common/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/v3/pkg/chunkenc"
"github.com/grafana/loki/v3/pkg/compression"
iter "github.com/grafana/loki/v3/pkg/iter/v2"
baseStore "github.com/grafana/loki/v3/pkg/storage"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
Expand Down Expand Up @@ -102,7 +102,7 @@ func (b *BloomTSDBStore) LoadTSDB(
}
defer data.Close()

decompressorPool := chunkenc.GetReaderPool(chunkenc.EncGZIP)
decompressorPool := compression.GetReaderPool(compression.EncGZIP)
decompressor, err := decompressorPool.GetReader(data)
if err != nil {
return nil, errors.Wrap(err, "failed to get decompressor")
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloombuild/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (

"github.com/grafana/loki/v3/pkg/bloombuild/common"
"github.com/grafana/loki/v3/pkg/bloombuild/protos"
"github.com/grafana/loki/v3/pkg/chunkenc"
"github.com/grafana/loki/v3/pkg/compression"
iter "github.com/grafana/loki/v3/pkg/iter/v2"
"github.com/grafana/loki/v3/pkg/storage"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
Expand Down Expand Up @@ -188,7 +188,7 @@ func genBlock(ref bloomshipper.BlockRef) (bloomshipper.Block, error) {
writer := v1.NewMemoryBlockWriter(indexBuf, bloomsBuf)
reader := v1.NewByteReader(indexBuf, bloomsBuf)

blockOpts := v1.NewBlockOptions(chunkenc.EncNone, 4, 1, 0, 0)
blockOpts := v1.NewBlockOptions(compression.EncNone, 4, 1, 0, 0)

builder, err := v1.NewBlockBuilder(blockOpts, writer)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/chunkenc/dumb_chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sort"
"time"

"github.com/grafana/loki/v3/pkg/compression"
"github.com/grafana/loki/v3/pkg/iter"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/log"
Expand Down Expand Up @@ -69,7 +70,7 @@ func (c *dumbChunk) Utilization() float64 {
return float64(len(c.entries)) / float64(tmpNumEntries)
}

func (c *dumbChunk) Encoding() Encoding { return EncNone }
func (c *dumbChunk) Encoding() compression.Encoding { return compression.EncNone }

// Returns an iterator that goes from _most_ recent to _least_ recent (ie,
// backwards).
Expand Down
84 changes: 2 additions & 82 deletions pkg/chunkenc/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (
"errors"
"fmt"
"io"
"strings"
"time"

"github.com/grafana/loki/v3/pkg/compression"
"github.com/grafana/loki/v3/pkg/iter"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/log"
Expand Down Expand Up @@ -48,86 +48,6 @@ func IsOutOfOrderErr(err error) bool {
return err == ErrOutOfOrder || IsErrTooFarBehind(err)
}

// Encoding is the identifier for a chunk encoding.
type Encoding byte

// The different available encodings.
// Make sure to preserve the order, as these numeric values are written to the chunks!
const (
EncNone Encoding = iota
EncGZIP
EncDumb
EncLZ4_64k
EncSnappy
EncLZ4_256k
EncLZ4_1M
EncLZ4_4M
EncFlate
EncZstd
)

var supportedEncoding = []Encoding{
EncNone,
EncGZIP,
EncLZ4_64k,
EncSnappy,
EncLZ4_256k,
EncLZ4_1M,
EncLZ4_4M,
EncFlate,
EncZstd,
}

func (e Encoding) String() string {
switch e {
case EncGZIP:
return "gzip"
case EncNone:
return "none"
case EncDumb:
return "dumb"
case EncLZ4_64k:
return "lz4-64k"
case EncLZ4_256k:
return "lz4-256k"
case EncLZ4_1M:
return "lz4-1M"
case EncLZ4_4M:
return "lz4"
case EncSnappy:
return "snappy"
case EncFlate:
return "flate"
case EncZstd:
return "zstd"
default:
return "unknown"
}
}

// ParseEncoding parses an chunk encoding (compression algorithm) by its name.
func ParseEncoding(enc string) (Encoding, error) {
for _, e := range supportedEncoding {
if strings.EqualFold(e.String(), enc) {
return e, nil
}
}
return 0, fmt.Errorf("invalid encoding: %s, supported: %s", enc, SupportedEncoding())

}

// SupportedEncoding returns the list of supported Encoding.
func SupportedEncoding() string {
var sb strings.Builder
for i := range supportedEncoding {
sb.WriteString(supportedEncoding[i].String())
if i != len(supportedEncoding)-1 {
sb.WriteString(", ")
}
}
return sb.String()
}

// Chunk is the interface for the compressed logs chunk format.
type Chunk interface {
Bounds() (time.Time, time.Time)
Expand All @@ -148,7 +68,7 @@ type Chunk interface {
UncompressedSize() int
CompressedSize() int
Close() error
Encoding() Encoding
Encoding() compression.Encoding
Rebound(start, end time.Time, filter filter.Func) (Chunk, error)
}

Expand Down
23 changes: 0 additions & 23 deletions pkg/chunkenc/interface_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,6 @@ import (
"github.com/stretchr/testify/require"
)

func TestParseEncoding(t *testing.T) {
tests := []struct {
enc string
want Encoding
wantErr bool
}{
{"gzip", EncGZIP, false},
{"bad", 0, true},
}
for _, tt := range tests {
t.Run(tt.enc, func(t *testing.T) {
got, err := ParseEncoding(tt.enc)
if (err != nil) != tt.wantErr {
t.Errorf("ParseEncoding() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != tt.want {
t.Errorf("ParseEncoding() = %v, want %v", got, tt.want)
}
})
}
}

func TestIsOutOfOrderErr(t *testing.T) {
now := time.Now()

Expand Down
35 changes: 18 additions & 17 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/v3/pkg/compression"
"github.com/grafana/loki/v3/pkg/iter"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/log"
Expand Down Expand Up @@ -131,7 +132,7 @@ type MemChunk struct {
head HeadBlock

format byte
encoding Encoding
encoding compression.Encoding
headFmt HeadBlockFmt

// compressed size of chunk. Set when chunk is cut or while decoding chunk from storage.
Expand Down Expand Up @@ -196,7 +197,7 @@ func (hb *headBlock) Append(ts int64, line string, _ labels.Labels) (bool, error
return false, nil
}

func (hb *headBlock) Serialise(pool WriterPool) ([]byte, error) {
func (hb *headBlock) Serialise(pool compression.WriterPool) ([]byte, error) {
inBuf := serializeBytesBufferPool.Get().(*bytes.Buffer)
defer func() {
inBuf.Reset()
Expand Down Expand Up @@ -354,7 +355,7 @@ type entry struct {
}

// NewMemChunk returns a new in-mem chunk.
func NewMemChunk(chunkFormat byte, enc Encoding, head HeadBlockFmt, blockSize, targetSize int) *MemChunk {
func NewMemChunk(chunkFormat byte, enc compression.Encoding, head HeadBlockFmt, blockSize, targetSize int) *MemChunk {
return newMemChunkWithFormat(chunkFormat, enc, head, blockSize, targetSize)
}

Expand All @@ -369,7 +370,7 @@ func panicIfInvalidFormat(chunkFmt byte, head HeadBlockFmt) {
}

// NewMemChunk returns a new in-mem chunk.
func newMemChunkWithFormat(format byte, enc Encoding, head HeadBlockFmt, blockSize, targetSize int) *MemChunk {
func newMemChunkWithFormat(format byte, enc compression.Encoding, head HeadBlockFmt, blockSize, targetSize int) *MemChunk {
panicIfInvalidFormat(format, head)

symbolizer := newSymbolizer()
Expand Down Expand Up @@ -413,10 +414,10 @@ func newByteChunk(b []byte, blockSize, targetSize int, fromCheckpoint bool) (*Me
bc.format = version
switch version {
case ChunkFormatV1:
bc.encoding = EncGZIP
bc.encoding = compression.EncGZIP
case ChunkFormatV2, ChunkFormatV3, ChunkFormatV4:
// format v2+ has a byte for block encoding.
enc := Encoding(db.byte())
enc := compression.Encoding(db.byte())
if db.err() != nil {
return nil, errors.Wrap(db.err(), "verifying encoding")
}
Expand Down Expand Up @@ -535,7 +536,7 @@ func newByteChunk(b []byte, blockSize, targetSize int, fromCheckpoint bool) (*Me
if fromCheckpoint {
bc.symbolizer = symbolizerFromCheckpoint(lb)
} else {
symbolizer, err := symbolizerFromEnc(lb, GetReaderPool(bc.encoding))
symbolizer, err := symbolizerFromEnc(lb, compression.GetReaderPool(bc.encoding))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -653,7 +654,7 @@ func (c *MemChunk) writeTo(w io.Writer, forCheckpoint bool) (int64, error) {
}
} else {
var err error
n, crcHash, err = c.symbolizer.SerializeTo(w, GetWriterPool(c.encoding))
n, crcHash, err = c.symbolizer.SerializeTo(w, compression.GetWriterPool(c.encoding))
if err != nil {
return offset, errors.Wrap(err, "write structured metadata")
}
Expand Down Expand Up @@ -776,7 +777,7 @@ func MemchunkFromCheckpoint(chk, head []byte, desiredIfNotUnordered HeadBlockFmt
}

// Encoding implements Chunk.
func (c *MemChunk) Encoding() Encoding {
func (c *MemChunk) Encoding() compression.Encoding {
return c.encoding
}

Expand Down Expand Up @@ -941,7 +942,7 @@ func (c *MemChunk) cut() error {
return nil
}

b, err := c.head.Serialise(GetWriterPool(c.encoding))
b, err := c.head.Serialise(compression.GetWriterPool(c.encoding))
if err != nil {
return err
}
Expand Down Expand Up @@ -1172,7 +1173,7 @@ func (c *MemChunk) Rebound(start, end time.Time, filter filter.Func) (Chunk, err
// then allows us to bind a decoding context to a block when requested, but otherwise helps reduce the
// chances of chunk<>block encoding drift in the codebase as the latter is parameterized by the former.
type encBlock struct {
enc Encoding
enc compression.Encoding
format byte
symbolizer *symbolizer
block
Expand All @@ -1182,14 +1183,14 @@ func (b encBlock) Iterator(ctx context.Context, pipeline log.StreamPipeline) ite
if len(b.b) == 0 {
return iter.NoopEntryIterator
}
return newEntryIterator(ctx, GetReaderPool(b.enc), b.b, pipeline, b.format, b.symbolizer)
return newEntryIterator(ctx, compression.GetReaderPool(b.enc), b.b, pipeline, b.format, b.symbolizer)
}

func (b encBlock) SampleIterator(ctx context.Context, extractor log.StreamSampleExtractor) iter.SampleIterator {
if len(b.b) == 0 {
return iter.NoopSampleIterator
}
return newSampleIterator(ctx, GetReaderPool(b.enc), b.b, b.format, extractor, b.symbolizer)
return newSampleIterator(ctx, compression.GetReaderPool(b.enc), b.b, b.format, extractor, b.symbolizer)
}

func (b block) Offset() int {
Expand Down Expand Up @@ -1339,7 +1340,7 @@ type bufferedIterator struct {
stats *stats.Context

reader io.Reader
pool ReaderPool
pool compression.ReaderPool
symbolizer *symbolizer

err error
Expand All @@ -1358,7 +1359,7 @@ type bufferedIterator struct {
closed bool
}

func newBufferedIterator(ctx context.Context, pool ReaderPool, b []byte, format byte, symbolizer *symbolizer) *bufferedIterator {
func newBufferedIterator(ctx context.Context, pool compression.ReaderPool, b []byte, format byte, symbolizer *symbolizer) *bufferedIterator {
stats := stats.FromContext(ctx)
stats.AddCompressedBytes(int64(len(b)))
return &bufferedIterator{
Expand Down Expand Up @@ -1619,7 +1620,7 @@ func (si *bufferedIterator) close() {
si.origBytes = nil
}

func newEntryIterator(ctx context.Context, pool ReaderPool, b []byte, pipeline log.StreamPipeline, format byte, symbolizer *symbolizer) iter.EntryIterator {
func newEntryIterator(ctx context.Context, pool compression.ReaderPool, b []byte, pipeline log.StreamPipeline, format byte, symbolizer *symbolizer) iter.EntryIterator {
return &entryBufferedIterator{
bufferedIterator: newBufferedIterator(ctx, pool, b, format, symbolizer),
pipeline: pipeline,
Expand Down Expand Up @@ -1671,7 +1672,7 @@ func (e *entryBufferedIterator) Close() error {
return e.bufferedIterator.Close()
}

func newSampleIterator(ctx context.Context, pool ReaderPool, b []byte, format byte, extractor log.StreamSampleExtractor, symbolizer *symbolizer) iter.SampleIterator {
func newSampleIterator(ctx context.Context, pool compression.ReaderPool, b []byte, format byte, extractor log.StreamSampleExtractor, symbolizer *symbolizer) iter.SampleIterator {
return &sampleBufferedIterator{
bufferedIterator: newBufferedIterator(ctx, pool, b, format, symbolizer),
extractor: extractor,
Expand Down
Loading

0 comments on commit 9637790

Please sign in to comment.