diff --git a/pkg/bloombuild/builder/builder.go b/pkg/bloombuild/builder/builder.go index 6cc2ecfa32f6..24710944f8e1 100644 --- a/pkg/bloombuild/builder/builder.go +++ b/pkg/bloombuild/builder/builder.go @@ -21,7 +21,7 @@ import ( "github.com/grafana/loki/v3/pkg/bloombuild/common" "github.com/grafana/loki/v3/pkg/bloombuild/protos" - "github.com/grafana/loki/v3/pkg/chunkenc" + "github.com/grafana/loki/v3/pkg/compression" iter "github.com/grafana/loki/v3/pkg/iter/v2" "github.com/grafana/loki/v3/pkg/storage" v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" @@ -333,7 +333,7 @@ func (b *Builder) processTask( return nil, fmt.Errorf("failed to get client: %w", err) } - blockEnc, err := chunkenc.ParseEncoding(b.limits.BloomBlockEncoding(task.Tenant)) + blockEnc, err := compression.ParseEncoding(b.limits.BloomBlockEncoding(task.Tenant)) if err != nil { return nil, fmt.Errorf("failed to parse block encoding: %w", err) } diff --git a/pkg/bloombuild/builder/spec_test.go b/pkg/bloombuild/builder/spec_test.go index 8ab6c2bba4f4..62e3f70bf22f 100644 --- a/pkg/bloombuild/builder/spec_test.go +++ b/pkg/bloombuild/builder/spec_test.go @@ -10,7 +10,7 @@ import ( "github.com/prometheus/common/model" "github.com/stretchr/testify/require" - "github.com/grafana/loki/v3/pkg/chunkenc" + "github.com/grafana/loki/v3/pkg/compression" v2 "github.com/grafana/loki/v3/pkg/iter/v2" v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" @@ -115,7 +115,7 @@ func dummyBloomGen(t *testing.T, opts v1.BlockOptions, store v2.Iterator[*v1.Ser func TestSimpleBloomGenerator(t *testing.T) { const maxBlockSize = 100 << 20 // 100MB - for _, enc := range []chunkenc.Encoding{chunkenc.EncNone, chunkenc.EncGZIP, chunkenc.EncSnappy} { + for _, enc := range []compression.Encoding{compression.EncNone, compression.EncGZIP, compression.EncSnappy} { for _, tc := range []struct { desc string fromSchema, toSchema v1.BlockOptions diff --git a/pkg/bloombuild/common/tsdb.go b/pkg/bloombuild/common/tsdb.go index a2e22529523b..ea31767cca0b 100644 --- a/pkg/bloombuild/common/tsdb.go +++ b/pkg/bloombuild/common/tsdb.go @@ -13,7 +13,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" - "github.com/grafana/loki/v3/pkg/chunkenc" + "github.com/grafana/loki/v3/pkg/compression" iter "github.com/grafana/loki/v3/pkg/iter/v2" baseStore "github.com/grafana/loki/v3/pkg/storage" v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" @@ -102,7 +102,7 @@ func (b *BloomTSDBStore) LoadTSDB( } defer data.Close() - decompressorPool := chunkenc.GetReaderPool(chunkenc.EncGZIP) + decompressorPool := compression.GetReaderPool(compression.EncGZIP) decompressor, err := decompressorPool.GetReader(data) if err != nil { return nil, errors.Wrap(err, "failed to get decompressor") diff --git a/pkg/bloombuild/planner/planner_test.go b/pkg/bloombuild/planner/planner_test.go index 32f8d5798a7f..fbd3a7bac530 100644 --- a/pkg/bloombuild/planner/planner_test.go +++ b/pkg/bloombuild/planner/planner_test.go @@ -23,7 +23,7 @@ import ( "github.com/grafana/loki/v3/pkg/bloombuild/common" "github.com/grafana/loki/v3/pkg/bloombuild/protos" - "github.com/grafana/loki/v3/pkg/chunkenc" + "github.com/grafana/loki/v3/pkg/compression" iter "github.com/grafana/loki/v3/pkg/iter/v2" "github.com/grafana/loki/v3/pkg/storage" v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" @@ -188,7 +188,7 @@ func genBlock(ref bloomshipper.BlockRef) (bloomshipper.Block, error) { writer := v1.NewMemoryBlockWriter(indexBuf, bloomsBuf) reader := v1.NewByteReader(indexBuf, bloomsBuf) - blockOpts := v1.NewBlockOptions(chunkenc.EncNone, 4, 1, 0, 0) + blockOpts := v1.NewBlockOptions(compression.EncNone, 4, 1, 0, 0) builder, err := v1.NewBlockBuilder(blockOpts, writer) if err != nil { diff --git a/pkg/chunkenc/dumb_chunk.go b/pkg/chunkenc/dumb_chunk.go index 33df4501927b..e28298605118 100644 --- a/pkg/chunkenc/dumb_chunk.go +++ b/pkg/chunkenc/dumb_chunk.go @@ -6,6 +6,7 @@ import ( "sort" "time" + "github.com/grafana/loki/v3/pkg/compression" "github.com/grafana/loki/v3/pkg/iter" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/log" @@ -69,7 +70,7 @@ func (c *dumbChunk) Utilization() float64 { return float64(len(c.entries)) / float64(tmpNumEntries) } -func (c *dumbChunk) Encoding() Encoding { return EncNone } +func (c *dumbChunk) Encoding() compression.Encoding { return compression.EncNone } // Returns an iterator that goes from _most_ recent to _least_ recent (ie, // backwards). diff --git a/pkg/chunkenc/interface.go b/pkg/chunkenc/interface.go index f0b17c7750f3..057fc8b985ad 100644 --- a/pkg/chunkenc/interface.go +++ b/pkg/chunkenc/interface.go @@ -5,9 +5,9 @@ import ( "errors" "fmt" "io" - "strings" "time" + "github.com/grafana/loki/v3/pkg/compression" "github.com/grafana/loki/v3/pkg/iter" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/log" @@ -48,86 +48,6 @@ func IsOutOfOrderErr(err error) bool { return err == ErrOutOfOrder || IsErrTooFarBehind(err) } -// Encoding is the identifier for a chunk encoding. -type Encoding byte - -// The different available encodings. -// Make sure to preserve the order, as these numeric values are written to the chunks! -const ( - EncNone Encoding = iota - EncGZIP - EncDumb - EncLZ4_64k - EncSnappy - EncLZ4_256k - EncLZ4_1M - EncLZ4_4M - EncFlate - EncZstd -) - -var supportedEncoding = []Encoding{ - EncNone, - EncGZIP, - EncLZ4_64k, - EncSnappy, - EncLZ4_256k, - EncLZ4_1M, - EncLZ4_4M, - EncFlate, - EncZstd, -} - -func (e Encoding) String() string { - switch e { - case EncGZIP: - return "gzip" - case EncNone: - return "none" - case EncDumb: - return "dumb" - case EncLZ4_64k: - return "lz4-64k" - case EncLZ4_256k: - return "lz4-256k" - case EncLZ4_1M: - return "lz4-1M" - case EncLZ4_4M: - return "lz4" - case EncSnappy: - return "snappy" - case EncFlate: - return "flate" - case EncZstd: - return "zstd" - default: - return "unknown" - } -} - -// ParseEncoding parses an chunk encoding (compression algorithm) by its name. -func ParseEncoding(enc string) (Encoding, error) { - for _, e := range supportedEncoding { - if strings.EqualFold(e.String(), enc) { - return e, nil - } - } - return 0, fmt.Errorf("invalid encoding: %s, supported: %s", enc, SupportedEncoding()) - -} - -// SupportedEncoding returns the list of supported Encoding. -func SupportedEncoding() string { - var sb strings.Builder - for i := range supportedEncoding { - sb.WriteString(supportedEncoding[i].String()) - if i != len(supportedEncoding)-1 { - sb.WriteString(", ") - } - } - return sb.String() -} - // Chunk is the interface for the compressed logs chunk format. type Chunk interface { Bounds() (time.Time, time.Time) @@ -148,7 +68,7 @@ type Chunk interface { UncompressedSize() int CompressedSize() int Close() error - Encoding() Encoding + Encoding() compression.Encoding Rebound(start, end time.Time, filter filter.Func) (Chunk, error) } diff --git a/pkg/chunkenc/interface_test.go b/pkg/chunkenc/interface_test.go index ed81c4d3604e..8faed8e2c43f 100644 --- a/pkg/chunkenc/interface_test.go +++ b/pkg/chunkenc/interface_test.go @@ -7,29 +7,6 @@ import ( "github.com/stretchr/testify/require" ) -func TestParseEncoding(t *testing.T) { - tests := []struct { - enc string - want Encoding - wantErr bool - }{ - {"gzip", EncGZIP, false}, - {"bad", 0, true}, - } - for _, tt := range tests { - t.Run(tt.enc, func(t *testing.T) { - got, err := ParseEncoding(tt.enc) - if (err != nil) != tt.wantErr { - t.Errorf("ParseEncoding() error = %v, wantErr %v", err, tt.wantErr) - return - } - if got != tt.want { - t.Errorf("ParseEncoding() = %v, want %v", got, tt.want) - } - }) - } -} - func TestIsOutOfOrderErr(t *testing.T) { now := time.Now() diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index 328e91c94deb..03f33b817672 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -16,6 +16,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" + "github.com/grafana/loki/v3/pkg/compression" "github.com/grafana/loki/v3/pkg/iter" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/log" @@ -131,7 +132,7 @@ type MemChunk struct { head HeadBlock format byte - encoding Encoding + encoding compression.Encoding headFmt HeadBlockFmt // compressed size of chunk. Set when chunk is cut or while decoding chunk from storage. @@ -196,7 +197,7 @@ func (hb *headBlock) Append(ts int64, line string, _ labels.Labels) (bool, error return false, nil } -func (hb *headBlock) Serialise(pool WriterPool) ([]byte, error) { +func (hb *headBlock) Serialise(pool compression.WriterPool) ([]byte, error) { inBuf := serializeBytesBufferPool.Get().(*bytes.Buffer) defer func() { inBuf.Reset() @@ -354,7 +355,7 @@ type entry struct { } // NewMemChunk returns a new in-mem chunk. -func NewMemChunk(chunkFormat byte, enc Encoding, head HeadBlockFmt, blockSize, targetSize int) *MemChunk { +func NewMemChunk(chunkFormat byte, enc compression.Encoding, head HeadBlockFmt, blockSize, targetSize int) *MemChunk { return newMemChunkWithFormat(chunkFormat, enc, head, blockSize, targetSize) } @@ -369,7 +370,7 @@ func panicIfInvalidFormat(chunkFmt byte, head HeadBlockFmt) { } // NewMemChunk returns a new in-mem chunk. -func newMemChunkWithFormat(format byte, enc Encoding, head HeadBlockFmt, blockSize, targetSize int) *MemChunk { +func newMemChunkWithFormat(format byte, enc compression.Encoding, head HeadBlockFmt, blockSize, targetSize int) *MemChunk { panicIfInvalidFormat(format, head) symbolizer := newSymbolizer() @@ -413,10 +414,10 @@ func newByteChunk(b []byte, blockSize, targetSize int, fromCheckpoint bool) (*Me bc.format = version switch version { case ChunkFormatV1: - bc.encoding = EncGZIP + bc.encoding = compression.EncGZIP case ChunkFormatV2, ChunkFormatV3, ChunkFormatV4: // format v2+ has a byte for block encoding. - enc := Encoding(db.byte()) + enc := compression.Encoding(db.byte()) if db.err() != nil { return nil, errors.Wrap(db.err(), "verifying encoding") } @@ -535,7 +536,7 @@ func newByteChunk(b []byte, blockSize, targetSize int, fromCheckpoint bool) (*Me if fromCheckpoint { bc.symbolizer = symbolizerFromCheckpoint(lb) } else { - symbolizer, err := symbolizerFromEnc(lb, GetReaderPool(bc.encoding)) + symbolizer, err := symbolizerFromEnc(lb, compression.GetReaderPool(bc.encoding)) if err != nil { return nil, err } @@ -653,7 +654,7 @@ func (c *MemChunk) writeTo(w io.Writer, forCheckpoint bool) (int64, error) { } } else { var err error - n, crcHash, err = c.symbolizer.SerializeTo(w, GetWriterPool(c.encoding)) + n, crcHash, err = c.symbolizer.SerializeTo(w, compression.GetWriterPool(c.encoding)) if err != nil { return offset, errors.Wrap(err, "write structured metadata") } @@ -776,7 +777,7 @@ func MemchunkFromCheckpoint(chk, head []byte, desiredIfNotUnordered HeadBlockFmt } // Encoding implements Chunk. -func (c *MemChunk) Encoding() Encoding { +func (c *MemChunk) Encoding() compression.Encoding { return c.encoding } @@ -941,7 +942,7 @@ func (c *MemChunk) cut() error { return nil } - b, err := c.head.Serialise(GetWriterPool(c.encoding)) + b, err := c.head.Serialise(compression.GetWriterPool(c.encoding)) if err != nil { return err } @@ -1172,7 +1173,7 @@ func (c *MemChunk) Rebound(start, end time.Time, filter filter.Func) (Chunk, err // then allows us to bind a decoding context to a block when requested, but otherwise helps reduce the // chances of chunk<>block encoding drift in the codebase as the latter is parameterized by the former. type encBlock struct { - enc Encoding + enc compression.Encoding format byte symbolizer *symbolizer block @@ -1182,14 +1183,14 @@ func (b encBlock) Iterator(ctx context.Context, pipeline log.StreamPipeline) ite if len(b.b) == 0 { return iter.NoopEntryIterator } - return newEntryIterator(ctx, GetReaderPool(b.enc), b.b, pipeline, b.format, b.symbolizer) + return newEntryIterator(ctx, compression.GetReaderPool(b.enc), b.b, pipeline, b.format, b.symbolizer) } func (b encBlock) SampleIterator(ctx context.Context, extractor log.StreamSampleExtractor) iter.SampleIterator { if len(b.b) == 0 { return iter.NoopSampleIterator } - return newSampleIterator(ctx, GetReaderPool(b.enc), b.b, b.format, extractor, b.symbolizer) + return newSampleIterator(ctx, compression.GetReaderPool(b.enc), b.b, b.format, extractor, b.symbolizer) } func (b block) Offset() int { @@ -1339,7 +1340,7 @@ type bufferedIterator struct { stats *stats.Context reader io.Reader - pool ReaderPool + pool compression.ReaderPool symbolizer *symbolizer err error @@ -1358,7 +1359,7 @@ type bufferedIterator struct { closed bool } -func newBufferedIterator(ctx context.Context, pool ReaderPool, b []byte, format byte, symbolizer *symbolizer) *bufferedIterator { +func newBufferedIterator(ctx context.Context, pool compression.ReaderPool, b []byte, format byte, symbolizer *symbolizer) *bufferedIterator { stats := stats.FromContext(ctx) stats.AddCompressedBytes(int64(len(b))) return &bufferedIterator{ @@ -1619,7 +1620,7 @@ func (si *bufferedIterator) close() { si.origBytes = nil } -func newEntryIterator(ctx context.Context, pool ReaderPool, b []byte, pipeline log.StreamPipeline, format byte, symbolizer *symbolizer) iter.EntryIterator { +func newEntryIterator(ctx context.Context, pool compression.ReaderPool, b []byte, pipeline log.StreamPipeline, format byte, symbolizer *symbolizer) iter.EntryIterator { return &entryBufferedIterator{ bufferedIterator: newBufferedIterator(ctx, pool, b, format, symbolizer), pipeline: pipeline, @@ -1671,7 +1672,7 @@ func (e *entryBufferedIterator) Close() error { return e.bufferedIterator.Close() } -func newSampleIterator(ctx context.Context, pool ReaderPool, b []byte, format byte, extractor log.StreamSampleExtractor, symbolizer *symbolizer) iter.SampleIterator { +func newSampleIterator(ctx context.Context, pool compression.ReaderPool, b []byte, format byte, extractor log.StreamSampleExtractor, symbolizer *symbolizer) iter.SampleIterator { return &sampleBufferedIterator{ bufferedIterator: newBufferedIterator(ctx, pool, b, format, symbolizer), extractor: extractor, diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index 85cccd743cfb..987a5d88b286 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -22,6 +22,7 @@ import ( "github.com/grafana/loki/pkg/push" "github.com/grafana/loki/v3/pkg/chunkenc/testdata" + "github.com/grafana/loki/v3/pkg/compression" "github.com/grafana/loki/v3/pkg/iter" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/log" @@ -31,16 +32,16 @@ import ( "github.com/grafana/loki/v3/pkg/util/filter" ) -var testEncoding = []Encoding{ - EncNone, - EncGZIP, - EncLZ4_64k, - EncLZ4_256k, - EncLZ4_1M, - EncLZ4_4M, - EncSnappy, - EncFlate, - EncZstd, +var testEncodings = []compression.Encoding{ + compression.EncNone, + compression.EncGZIP, + compression.EncLZ4_64k, + compression.EncLZ4_256k, + compression.EncLZ4_1M, + compression.EncLZ4_4M, + compression.EncSnappy, + compression.EncFlate, + compression.EncZstd, } var ( @@ -84,7 +85,7 @@ const ( ) func TestBlocksInclusive(t *testing.T) { - for _, enc := range testEncoding { + for _, enc := range testEncodings { enc := enc for _, format := range allPossibleFormats { chunkfmt, headfmt := format.chunkFormat, format.headBlockFmt @@ -103,7 +104,7 @@ func TestBlocksInclusive(t *testing.T) { } func TestBlock(t *testing.T) { - for _, enc := range testEncoding { + for _, enc := range testEncodings { enc := enc for _, format := range allPossibleFormats { chunkFormat, headBlockFmt := format.chunkFormat, format.headBlockFmt @@ -258,7 +259,7 @@ func TestBlock(t *testing.T) { } func TestCorruptChunk(t *testing.T) { - for _, enc := range testEncoding { + for _, enc := range testEncodings { enc := enc for _, format := range allPossibleFormats { chunkfmt, headfmt := format.chunkFormat, format.headBlockFmt @@ -298,7 +299,7 @@ func TestCorruptChunk(t *testing.T) { func TestReadFormatV1(t *testing.T) { t.Parallel() - c := NewMemChunk(ChunkFormatV3, EncGZIP, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize) + c := NewMemChunk(ChunkFormatV3, compression.EncGZIP, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize) fillChunk(c) // overrides to v1 for testing that specific version. c.format = ChunkFormatV1 @@ -335,7 +336,7 @@ func TestReadFormatV1(t *testing.T) { // 2) []byte loaded chunks <-> []byte loaded chunks func TestRoundtripV2(t *testing.T) { for _, testData := range allPossibleFormats { - for _, enc := range testEncoding { + for _, enc := range testEncodings { enc := enc t.Run(testNameWithFormats(enc, testData.chunkFormat, testData.headBlockFmt), func(t *testing.T) { t.Parallel() @@ -390,12 +391,12 @@ func TestRoundtripV2(t *testing.T) { } } -func testNameWithFormats(enc Encoding, chunkFormat byte, headBlockFmt HeadBlockFmt) string { +func testNameWithFormats(enc compression.Encoding, chunkFormat byte, headBlockFmt HeadBlockFmt) string { return fmt.Sprintf("encoding:%v chunkFormat:%v headBlockFmt:%v", enc, chunkFormat, headBlockFmt) } func TestRoundtripV3(t *testing.T) { - for _, enc := range testEncoding { + for _, enc := range testEncodings { enc := enc for _, format := range allPossibleFormats { chunkfmt, headfmt := format.chunkFormat, format.headBlockFmt @@ -420,7 +421,7 @@ func TestRoundtripV3(t *testing.T) { func TestSerialization(t *testing.T) { for _, testData := range allPossibleFormats { - for _, enc := range testEncoding { + for _, enc := range testEncodings { enc := enc // run tests with and without structured metadata since it is optional for _, appendWithStructuredMetadata := range []bool{false, true} { @@ -509,7 +510,7 @@ func TestSerialization(t *testing.T) { func TestChunkFilling(t *testing.T) { for _, testData := range allPossibleFormats { - for _, enc := range testEncoding { + for _, enc := range testEncodings { enc := enc t.Run(testNameWithFormats(enc, testData.chunkFormat, testData.headBlockFmt), func(t *testing.T) { t.Parallel() @@ -557,7 +558,7 @@ func TestChunkFilling(t *testing.T) { func TestGZIPChunkTargetSize(t *testing.T) { t.Parallel() - chk := NewMemChunk(ChunkFormatV3, EncGZIP, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize) + chk := NewMemChunk(ChunkFormatV3, compression.EncGZIP, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize) lineSize := 512 entry := &logproto.Entry{ @@ -680,7 +681,7 @@ func TestMemChunk_AppendOutOfOrder(t *testing.T) { t.Run(testName, func(t *testing.T) { t.Parallel() - tester(t, NewMemChunk(ChunkFormatV3, EncGZIP, f, testBlockSize, testTargetSize)) + tester(t, NewMemChunk(ChunkFormatV3, compression.EncGZIP, f, testBlockSize, testTargetSize)) }) } } @@ -696,7 +697,7 @@ func TestChunkSize(t *testing.T) { var result []res for _, bs := range testBlockSizes { for _, f := range allPossibleFormats { - for _, enc := range testEncoding { + for _, enc := range testEncodings { name := fmt.Sprintf("%s_%s", enc.String(), humanize.Bytes(uint64(bs))) t.Run(name, func(t *testing.T) { c := newMemChunkWithFormat(f.chunkFormat, enc, f.headBlockFmt, bs, testTargetSize) @@ -725,7 +726,7 @@ func TestChunkSize(t *testing.T) { } func TestChunkStats(t *testing.T) { - c := NewMemChunk(ChunkFormatV4, EncSnappy, DefaultTestHeadBlockFmt, testBlockSize, 0) + c := NewMemChunk(ChunkFormatV4, compression.EncSnappy, DefaultTestHeadBlockFmt, testBlockSize, 0) first := time.Now() entry := &logproto.Entry{ Timestamp: first, @@ -797,7 +798,7 @@ func TestChunkStats(t *testing.T) { func TestIteratorClose(t *testing.T) { for _, f := range allPossibleFormats { - for _, enc := range testEncoding { + for _, enc := range testEncodings { t.Run(enc.String(), func(t *testing.T) { for _, test := range []func(iter iter.EntryIterator, t *testing.T){ func(iter iter.EntryIterator, t *testing.T) { @@ -846,7 +847,7 @@ func BenchmarkWrite(b *testing.B) { i := int64(0) for _, f := range HeadBlockFmts { - for _, enc := range testEncoding { + for _, enc := range testEncodings { for _, withStructuredMetadata := range []bool{false, true} { name := fmt.Sprintf("%v-%v", f, enc) if withStructuredMetadata { @@ -896,7 +897,7 @@ func (nomatchPipeline) ReferencedStructuredMetadata() bool { func BenchmarkRead(b *testing.B) { for _, bs := range testBlockSizes { - for _, enc := range testEncoding { + for _, enc := range testEncodings { name := fmt.Sprintf("%s_%s", enc.String(), humanize.Bytes(uint64(bs))) b.Run(name, func(b *testing.B) { chunks, size := generateData(enc, 5, bs, testTargetSize) @@ -923,7 +924,7 @@ func BenchmarkRead(b *testing.B) { } for _, bs := range testBlockSizes { - for _, enc := range testEncoding { + for _, enc := range testEncodings { name := fmt.Sprintf("sample_%s_%s", enc.String(), humanize.Bytes(uint64(bs))) b.Run(name, func(b *testing.B) { chunks, size := generateData(enc, 5, bs, testTargetSize) @@ -967,7 +968,7 @@ func BenchmarkBackwardIterator(b *testing.B) { for _, bs := range testBlockSizes { b.Run(humanize.Bytes(uint64(bs)), func(b *testing.B) { b.ReportAllocs() - c := NewMemChunk(ChunkFormatV4, EncSnappy, DefaultTestHeadBlockFmt, bs, testTargetSize) + c := NewMemChunk(ChunkFormatV4, compression.EncSnappy, DefaultTestHeadBlockFmt, bs, testTargetSize) _ = fillChunk(c) b.ResetTimer() for n := 0; n < b.N; n++ { @@ -988,7 +989,7 @@ func BenchmarkBackwardIterator(b *testing.B) { } func TestGenerateDataSize(t *testing.T) { - for _, enc := range testEncoding { + for _, enc := range testEncodings { t.Run(enc.String(), func(t *testing.T) { chunks, size := generateData(enc, 50, testBlockSize, testTargetSize) @@ -1081,7 +1082,7 @@ func BenchmarkHeadBlockSampleIterator(b *testing.B) { func TestMemChunk_IteratorBounds(t *testing.T) { createChunk := func() *MemChunk { t.Helper() - c := NewMemChunk(ChunkFormatV3, EncNone, DefaultTestHeadBlockFmt, 1e6, 1e6) + c := NewMemChunk(ChunkFormatV3, compression.EncNone, DefaultTestHeadBlockFmt, 1e6, 1e6) if _, err := c.Append(&logproto.Entry{ Timestamp: time.Unix(0, 1), @@ -1141,7 +1142,7 @@ func TestMemChunk_IteratorBounds(t *testing.T) { } func TestMemchunkLongLine(t *testing.T) { - for _, enc := range testEncoding { + for _, enc := range testEncodings { enc := enc t.Run(enc.String(), func(t *testing.T) { t.Parallel() @@ -1167,9 +1168,9 @@ func TestMemchunkLongLine(t *testing.T) { func TestBytesWith(t *testing.T) { t.Parallel() - exp, err := NewMemChunk(ChunkFormatV3, EncNone, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize).BytesWith(nil) + exp, err := NewMemChunk(ChunkFormatV3, compression.EncNone, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize).BytesWith(nil) require.Nil(t, err) - out, err := NewMemChunk(ChunkFormatV3, EncNone, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize).BytesWith([]byte{1, 2, 3}) + out, err := NewMemChunk(ChunkFormatV3, compression.EncNone, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize).BytesWith([]byte{1, 2, 3}) require.Nil(t, err) require.Equal(t, exp, out) @@ -1180,8 +1181,8 @@ func TestCheckpointEncoding(t *testing.T) { blockSize, targetSize := 256*1024, 1500*1024 for _, f := range allPossibleFormats { - t.Run(testNameWithFormats(EncSnappy, f.chunkFormat, f.headBlockFmt), func(t *testing.T) { - c := newMemChunkWithFormat(f.chunkFormat, EncSnappy, f.headBlockFmt, blockSize, targetSize) + t.Run(testNameWithFormats(compression.EncSnappy, f.chunkFormat, f.headBlockFmt), func(t *testing.T) { + c := newMemChunkWithFormat(f.chunkFormat, compression.EncSnappy, f.headBlockFmt, blockSize, targetSize) // add a few entries for i := 0; i < 5; i++ { @@ -1266,7 +1267,7 @@ var ( func BenchmarkBufferedIteratorLabels(b *testing.B) { for _, f := range HeadBlockFmts { b.Run(f.String(), func(b *testing.B) { - c := NewMemChunk(ChunkFormatV3, EncSnappy, f, testBlockSize, testTargetSize) + c := NewMemChunk(ChunkFormatV3, compression.EncSnappy, f, testBlockSize, testTargetSize) _ = fillChunk(c) labelsSet := []labels.Labels{ @@ -1366,8 +1367,8 @@ func BenchmarkBufferedIteratorLabels(b *testing.B) { func Test_HeadIteratorReverse(t *testing.T) { for _, testData := range allPossibleFormats { - t.Run(testNameWithFormats(EncSnappy, testData.chunkFormat, testData.headBlockFmt), func(t *testing.T) { - c := newMemChunkWithFormat(testData.chunkFormat, EncSnappy, testData.headBlockFmt, testBlockSize, testTargetSize) + t.Run(testNameWithFormats(compression.EncSnappy, testData.chunkFormat, testData.headBlockFmt), func(t *testing.T) { + c := newMemChunkWithFormat(testData.chunkFormat, compression.EncSnappy, testData.headBlockFmt, testBlockSize, testTargetSize) genEntry := func(i int64) *logproto.Entry { return &logproto.Entry{ Timestamp: time.Unix(0, i), @@ -1482,7 +1483,7 @@ func TestMemChunk_Rebound(t *testing.T) { } func buildTestMemChunk(t *testing.T, from, through time.Time) *MemChunk { - chk := NewMemChunk(ChunkFormatV3, EncGZIP, DefaultTestHeadBlockFmt, defaultBlockSize, 0) + chk := NewMemChunk(ChunkFormatV3, compression.EncGZIP, DefaultTestHeadBlockFmt, defaultBlockSize, 0) for ; from.Before(through); from = from.Add(time.Second) { _, err := chk.Append(&logproto.Entry{ Line: from.String(), @@ -1603,7 +1604,7 @@ func TestMemChunk_ReboundAndFilter_with_filter(t *testing.T) { } func buildFilterableTestMemChunk(t *testing.T, from, through time.Time, matchingFrom, matchingTo *time.Time, withStructuredMetadata bool) *MemChunk { - chk := NewMemChunk(ChunkFormatV4, EncGZIP, DefaultTestHeadBlockFmt, defaultBlockSize, 0) + chk := NewMemChunk(ChunkFormatV4, compression.EncGZIP, DefaultTestHeadBlockFmt, defaultBlockSize, 0) t.Logf("from : %v", from.String()) t.Logf("through: %v", through.String()) var structuredMetadata push.LabelsAdapter @@ -1752,7 +1753,7 @@ func TestMemChunk_SpaceFor(t *testing.T) { t.Run(tc.desc, func(t *testing.T) { for _, format := range allPossibleFormats { t.Run(fmt.Sprintf("chunk_v%d_head_%s", format.chunkFormat, format.headBlockFmt), func(t *testing.T) { - chk := newMemChunkWithFormat(format.chunkFormat, EncNone, format.headBlockFmt, 1024, tc.targetSize) + chk := newMemChunkWithFormat(format.chunkFormat, compression.EncNone, format.headBlockFmt, 1024, tc.targetSize) chk.blocks = make([]block, tc.nBlocks) chk.cutBlockSize = tc.cutBlockSize @@ -1775,7 +1776,7 @@ func TestMemChunk_SpaceFor(t *testing.T) { } func TestMemChunk_IteratorWithStructuredMetadata(t *testing.T) { - for _, enc := range testEncoding { + for _, enc := range testEncodings { enc := enc t.Run(enc.String(), func(t *testing.T) { streamLabels := labels.Labels{ @@ -2054,7 +2055,7 @@ func TestDecodeChunkIncorrectBlockOffset(t *testing.T) { t.Run(fmt.Sprintf("chunkFormat:%v headBlockFmt:%v", format.chunkFormat, format.headBlockFmt), func(t *testing.T) { for incorrectOffsetBlockNum := 0; incorrectOffsetBlockNum < 3; incorrectOffsetBlockNum++ { t.Run(fmt.Sprintf("inorrect offset block: %d", incorrectOffsetBlockNum), func(t *testing.T) { - chk := NewMemChunk(format.chunkFormat, EncNone, format.headBlockFmt, blockSize, testTargetSize) + chk := NewMemChunk(format.chunkFormat, compression.EncNone, format.headBlockFmt, blockSize, testTargetSize) ts := time.Now().Unix() for i := 0; i < 3; i++ { dup, err := chk.Append(&logproto.Entry{ diff --git a/pkg/chunkenc/pool.go b/pkg/chunkenc/pool.go index 486bef44b3da..8c640149a78f 100644 --- a/pkg/chunkenc/pool.go +++ b/pkg/chunkenc/pool.go @@ -1,49 +1,15 @@ package chunkenc import ( - "bufio" "bytes" - "io" - "runtime" "sync" - "github.com/golang/snappy" - "github.com/klauspost/compress/flate" - "github.com/klauspost/compress/gzip" - "github.com/klauspost/compress/zstd" - "github.com/pierrec/lz4/v4" "github.com/prometheus/prometheus/util/pool" "github.com/grafana/loki/v3/pkg/logproto" ) -// WriterPool is a pool of io.Writer -// This is used by every chunk to avoid unnecessary allocations. -type WriterPool interface { - GetWriter(io.Writer) io.WriteCloser - PutWriter(io.WriteCloser) -} - -// ReaderPool similar to WriterPool but for reading chunks. -type ReaderPool interface { - GetReader(io.Reader) (io.Reader, error) - PutReader(io.Reader) -} - var ( - // Gzip is the gnu zip compression pool - Gzip = GzipPool{level: gzip.DefaultCompression} - Lz4_64k = LZ4Pool{bufferSize: 1 << 16} // Lz4_64k is the l4z compression pool, with 64k buffer size - Lz4_256k = LZ4Pool{bufferSize: 1 << 18} // Lz4_256k uses 256k buffer - Lz4_1M = LZ4Pool{bufferSize: 1 << 20} // Lz4_1M uses 1M buffer - Lz4_4M = LZ4Pool{bufferSize: 1 << 22} // Lz4_4M uses 4M buffer - Flate = FlatePool{} - Zstd = ZstdPool{} - // Snappy is the snappy compression pool - Snappy SnappyPool - // Noop is the no compression pool - Noop NoopPool - // BytesBufferPool is a bytes buffer used for lines decompressed. // Buckets [0.5KB,1KB,2KB,4KB,8KB] BytesBufferPool = pool.New(1<<9, 1<<13, 2, func(size int) interface{} { return make([]byte, 0, size) }) @@ -81,315 +47,3 @@ var ( }, } ) - -func GetWriterPool(enc Encoding) WriterPool { - return GetReaderPool(enc).(WriterPool) -} - -func GetReaderPool(enc Encoding) ReaderPool { - switch enc { - case EncGZIP: - return &Gzip - case EncLZ4_64k: - return &Lz4_64k - case EncLZ4_256k: - return &Lz4_256k - case EncLZ4_1M: - return &Lz4_1M - case EncLZ4_4M: - return &Lz4_4M - case EncSnappy: - return &Snappy - case EncNone: - return &Noop - case EncFlate: - return &Flate - case EncZstd: - return &Zstd - default: - panic("unknown encoding") - } -} - -// GzipPool is a gun zip compression pool -type GzipPool struct { - readers sync.Pool - writers sync.Pool - level int -} - -// Gzip needs buffering to read efficiently. -// We need to be able to see the underlying gzip.Reader to Reset it. -type gzipBufferedReader struct { - *bufio.Reader - gzipReader *gzip.Reader -} - -// GetReader gets or creates a new CompressionReader and reset it to read from src -func (pool *GzipPool) GetReader(src io.Reader) (io.Reader, error) { - if r := pool.readers.Get(); r != nil { - reader := r.(*gzipBufferedReader) - err := reader.gzipReader.Reset(src) - if err != nil { - return nil, err - } - reader.Reader.Reset(reader.gzipReader) - return reader, nil - } - gzipReader, err := gzip.NewReader(src) - if err != nil { - return nil, err - } - return &gzipBufferedReader{ - gzipReader: gzipReader, - Reader: bufio.NewReaderSize(gzipReader, 4*1024), - }, nil -} - -// PutReader places back in the pool a CompressionReader -func (pool *GzipPool) PutReader(reader io.Reader) { - pool.readers.Put(reader) -} - -// GetWriter gets or creates a new CompressionWriter and reset it to write to dst -func (pool *GzipPool) GetWriter(dst io.Writer) io.WriteCloser { - if w := pool.writers.Get(); w != nil { - writer := w.(*gzip.Writer) - writer.Reset(dst) - return writer - } - - level := pool.level - if level == 0 { - level = gzip.DefaultCompression - } - w, err := gzip.NewWriterLevel(dst, level) - if err != nil { - panic(err) // never happens, error is only returned on wrong compression level. - } - return w -} - -// PutWriter places back in the pool a CompressionWriter -func (pool *GzipPool) PutWriter(writer io.WriteCloser) { - pool.writers.Put(writer) -} - -// FlatePool is a flate compression pool -type FlatePool struct { - readers sync.Pool - writers sync.Pool - level int -} - -// GetReader gets or creates a new CompressionReader and reset it to read from src -func (pool *FlatePool) GetReader(src io.Reader) (io.Reader, error) { - if r := pool.readers.Get(); r != nil { - reader := r.(flate.Resetter) - err := reader.Reset(src, nil) - if err != nil { - panic(err) - } - return reader.(io.Reader), nil - } - return flate.NewReader(src), nil -} - -// PutReader places back in the pool a CompressionReader -func (pool *FlatePool) PutReader(reader io.Reader) { - pool.readers.Put(reader) -} - -// GetWriter gets or creates a new CompressionWriter and reset it to write to dst -func (pool *FlatePool) GetWriter(dst io.Writer) io.WriteCloser { - if w := pool.writers.Get(); w != nil { - writer := w.(*flate.Writer) - writer.Reset(dst) - return writer - } - - level := pool.level - if level == 0 { - level = flate.DefaultCompression - } - w, err := flate.NewWriter(dst, level) - if err != nil { - panic(err) // never happens, error is only returned on wrong compression level. - } - return w -} - -// PutWriter places back in the pool a CompressionWriter -func (pool *FlatePool) PutWriter(writer io.WriteCloser) { - pool.writers.Put(writer) -} - -// GzipPool is a gun zip compression pool -type ZstdPool struct { - readers sync.Pool - writers sync.Pool -} - -// GetReader gets or creates a new CompressionReader and reset it to read from src -func (pool *ZstdPool) GetReader(src io.Reader) (io.Reader, error) { - if r := pool.readers.Get(); r != nil { - reader := r.(*zstd.Decoder) - err := reader.Reset(src) - if err != nil { - return nil, err - } - return reader, nil - } - reader, err := zstd.NewReader(src) - if err != nil { - return nil, err - } - runtime.SetFinalizer(reader, (*zstd.Decoder).Close) - return reader, nil -} - -// PutReader places back in the pool a CompressionReader -func (pool *ZstdPool) PutReader(reader io.Reader) { - pool.readers.Put(reader) -} - -// GetWriter gets or creates a new CompressionWriter and reset it to write to dst -func (pool *ZstdPool) GetWriter(dst io.Writer) io.WriteCloser { - if w := pool.writers.Get(); w != nil { - writer := w.(*zstd.Encoder) - writer.Reset(dst) - return writer - } - - w, err := zstd.NewWriter(dst) - if err != nil { - panic(err) // never happens, error is only returned on wrong compression level. - } - return w -} - -// PutWriter places back in the pool a CompressionWriter -func (pool *ZstdPool) PutWriter(writer io.WriteCloser) { - pool.writers.Put(writer) -} - -type LZ4Pool struct { - readers sync.Pool - writers sync.Pool - bufferSize uint32 // available values: 1<<16 (64k), 1<<18 (256k), 1<<20 (1M), 1<<22 (4M). Defaults to 4MB, if not set. -} - -// We need to be able to see the underlying lz4.Reader to Reset it. -type lz4BufferedReader struct { - *bufio.Reader - lz4Reader *lz4.Reader -} - -// GetReader gets or creates a new CompressionReader and reset it to read from src -func (pool *LZ4Pool) GetReader(src io.Reader) (io.Reader, error) { - var r *lz4BufferedReader - if pooled := pool.readers.Get(); pooled != nil { - r = pooled.(*lz4BufferedReader) - r.lz4Reader.Reset(src) - r.Reader.Reset(r.lz4Reader) - } else { - lz4Reader := lz4.NewReader(src) - r = &lz4BufferedReader{ - lz4Reader: lz4Reader, - Reader: bufio.NewReaderSize(lz4Reader, 4*1024), - } - } - return r, nil -} - -// PutReader places back in the pool a CompressionReader -func (pool *LZ4Pool) PutReader(reader io.Reader) { - pool.readers.Put(reader) -} - -// GetWriter gets or creates a new CompressionWriter and reset it to write to dst -func (pool *LZ4Pool) GetWriter(dst io.Writer) io.WriteCloser { - var w *lz4.Writer - if fromPool := pool.writers.Get(); fromPool != nil { - w = fromPool.(*lz4.Writer) - w.Reset(dst) - } else { - w = lz4.NewWriter(dst) - } - err := w.Apply( - lz4.ChecksumOption(false), - lz4.BlockSizeOption(lz4.BlockSize(pool.bufferSize)), - lz4.CompressionLevelOption(lz4.Fast), - ) - if err != nil { - panic(err) - } - return w -} - -// PutWriter places back in the pool a CompressionWriter -func (pool *LZ4Pool) PutWriter(writer io.WriteCloser) { - pool.writers.Put(writer) -} - -type SnappyPool struct { - readers sync.Pool - writers sync.Pool -} - -// GetReader gets or creates a new CompressionReader and reset it to read from src -func (pool *SnappyPool) GetReader(src io.Reader) (io.Reader, error) { - if r := pool.readers.Get(); r != nil { - reader := r.(*snappy.Reader) - reader.Reset(src) - return reader, nil - } - return snappy.NewReader(src), nil -} - -// PutReader places back in the pool a CompressionReader -func (pool *SnappyPool) PutReader(reader io.Reader) { - r := reader.(*snappy.Reader) - // Reset to free reference to the underlying reader - r.Reset(nil) - pool.readers.Put(reader) -} - -// GetWriter gets or creates a new CompressionWriter and reset it to write to dst -func (pool *SnappyPool) GetWriter(dst io.Writer) io.WriteCloser { - if w := pool.writers.Get(); w != nil { - writer := w.(*snappy.Writer) - writer.Reset(dst) - return writer - } - return snappy.NewBufferedWriter(dst) -} - -// PutWriter places back in the pool a CompressionWriter -func (pool *SnappyPool) PutWriter(writer io.WriteCloser) { - pool.writers.Put(writer) -} - -type NoopPool struct{} - -// GetReader gets or creates a new CompressionReader and reset it to read from src -func (pool *NoopPool) GetReader(src io.Reader) (io.Reader, error) { - return src, nil -} - -// PutReader places back in the pool a CompressionReader -func (pool *NoopPool) PutReader(_ io.Reader) {} - -type noopCloser struct { - io.Writer -} - -func (noopCloser) Close() error { return nil } - -// GetWriter gets or creates a new CompressionWriter and reset it to write to dst -func (pool *NoopPool) GetWriter(dst io.Writer) io.WriteCloser { - return noopCloser{dst} -} - -// PutWriter places back in the pool a CompressionWriter -func (pool *NoopPool) PutWriter(_ io.WriteCloser) {} diff --git a/pkg/chunkenc/pool_test.go b/pkg/chunkenc/pool_test.go deleted file mode 100644 index 04ecaadf9295..000000000000 --- a/pkg/chunkenc/pool_test.go +++ /dev/null @@ -1,60 +0,0 @@ -package chunkenc - -import ( - "bytes" - "io" - "os" - "runtime" - "runtime/pprof" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestPool(t *testing.T) { - var wg sync.WaitGroup - for _, enc := range supportedEncoding { - enc := enc - for i := 0; i < 200; i++ { - wg.Add(1) - go func() { - defer wg.Done() - var ( - buf = bytes.NewBuffer(nil) - res = make([]byte, 1024) - wpool = GetWriterPool(enc) - rpool = GetReaderPool(enc) - ) - - w := wpool.GetWriter(buf) - defer wpool.PutWriter(w) - _, err := w.Write([]byte("test")) - require.NoError(t, err) - require.NoError(t, w.Close()) - - require.True(t, buf.Len() != 0, enc) - r, err := rpool.GetReader(bytes.NewBuffer(buf.Bytes())) - require.NoError(t, err) - defer rpool.PutReader(r) - n, err := r.Read(res) - if err != nil { - require.Error(t, err, io.EOF) - } - require.Equal(t, 4, n, enc.String()) - require.Equal(t, []byte("test"), res[:n], enc) - }() - } - } - - wg.Wait() - - if !assert.Eventually(t, func() bool { - runtime.GC() - return runtime.NumGoroutine() <= 50 - }, 5*time.Second, 10*time.Millisecond) { - _ = pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) - } -} diff --git a/pkg/chunkenc/symbols.go b/pkg/chunkenc/symbols.go index e9f0b4952968..f5d3310921ab 100644 --- a/pkg/chunkenc/symbols.go +++ b/pkg/chunkenc/symbols.go @@ -12,6 +12,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" + "github.com/grafana/loki/v3/pkg/compression" "github.com/grafana/loki/v3/pkg/util" ) @@ -163,7 +164,7 @@ func (s *symbolizer) CheckpointSize() int { // SerializeTo serializes all the labels and writes to the writer in compressed format. // It returns back the number of bytes written and a checksum of the data written. -func (s *symbolizer) SerializeTo(w io.Writer, pool WriterPool) (int, []byte, error) { +func (s *symbolizer) SerializeTo(w io.Writer, pool compression.WriterPool) (int, []byte, error) { crc32Hash := crc32HashPool.Get().(hash.Hash32) defer crc32HashPool.Put(crc32Hash) @@ -324,7 +325,7 @@ func symbolizerFromCheckpoint(b []byte) *symbolizer { } // symbolizerFromEnc builds symbolizer from the bytes generated during serialization. -func symbolizerFromEnc(b []byte, pool ReaderPool) (*symbolizer, error) { +func symbolizerFromEnc(b []byte, pool compression.ReaderPool) (*symbolizer, error) { db := decbuf{b: b} numLabels := db.uvarint() diff --git a/pkg/chunkenc/symbols_test.go b/pkg/chunkenc/symbols_test.go index 7882001c75dd..1f286d7b56d5 100644 --- a/pkg/chunkenc/symbols_test.go +++ b/pkg/chunkenc/symbols_test.go @@ -8,6 +8,8 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/compression" ) func TestSymbolizer(t *testing.T) { @@ -125,7 +127,7 @@ func TestSymbolizer(t *testing.T) { expectedUncompressedSize: 22, }, } { - for _, encoding := range testEncoding { + for _, encoding := range testEncodings { t.Run(fmt.Sprintf("%s - %s", tc.name, encoding), func(t *testing.T) { s := newSymbolizer() for i, labels := range tc.labelsToAdd { @@ -161,10 +163,10 @@ func TestSymbolizer(t *testing.T) { } buf.Reset() - _, _, err = s.SerializeTo(buf, GetWriterPool(encoding)) + _, _, err = s.SerializeTo(buf, compression.GetWriterPool(encoding)) require.NoError(t, err) - loaded, err = symbolizerFromEnc(buf.Bytes(), GetReaderPool(encoding)) + loaded, err = symbolizerFromEnc(buf.Bytes(), compression.GetReaderPool(encoding)) require.NoError(t, err) for i, symbols := range tc.expectedSymbols { require.Equal(t, tc.labelsToAdd[i], loaded.Lookup(symbols, nil)) diff --git a/pkg/chunkenc/unordered.go b/pkg/chunkenc/unordered.go index aed6606c7c6d..3132c77206ab 100644 --- a/pkg/chunkenc/unordered.go +++ b/pkg/chunkenc/unordered.go @@ -14,6 +14,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" + "github.com/grafana/loki/v3/pkg/compression" "github.com/grafana/loki/v3/pkg/iter" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/log" @@ -28,7 +29,7 @@ type HeadBlock interface { CheckpointBytes(b []byte) ([]byte, error) CheckpointSize() int LoadBytes(b []byte) error - Serialise(pool WriterPool) ([]byte, error) + Serialise(pool compression.WriterPool) ([]byte, error) Reset() Bounds() (mint, maxt int64) Entries() int @@ -373,7 +374,7 @@ func (hb *unorderedHeadBlock) SampleIterator( // nolint:unused // serialise is used in creating an ordered, compressed block from an unorderedHeadBlock -func (hb *unorderedHeadBlock) Serialise(pool WriterPool) ([]byte, error) { +func (hb *unorderedHeadBlock) Serialise(pool compression.WriterPool) ([]byte, error) { inBuf := serializeBytesBufferPool.Get().(*bytes.Buffer) defer func() { inBuf.Reset() diff --git a/pkg/chunkenc/unordered_test.go b/pkg/chunkenc/unordered_test.go index 8a3420965bdb..fb341aaa8db9 100644 --- a/pkg/chunkenc/unordered_test.go +++ b/pkg/chunkenc/unordered_test.go @@ -12,6 +12,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" + "github.com/grafana/loki/v3/pkg/compression" "github.com/grafana/loki/v3/pkg/iter" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/log" @@ -450,7 +451,7 @@ func BenchmarkHeadBlockWrites(b *testing.B) { } func TestUnorderedChunkIterators(t *testing.T) { - c := NewMemChunk(ChunkFormatV4, EncSnappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize) + c := NewMemChunk(ChunkFormatV4, compression.EncSnappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize) for i := 0; i < 100; i++ { // push in reverse order dup, err := c.Append(&logproto.Entry{ @@ -496,11 +497,11 @@ func TestUnorderedChunkIterators(t *testing.T) { } func BenchmarkUnorderedRead(b *testing.B) { - legacy := NewMemChunk(ChunkFormatV3, EncSnappy, OrderedHeadBlockFmt, testBlockSize, testTargetSize) + legacy := NewMemChunk(ChunkFormatV3, compression.EncSnappy, OrderedHeadBlockFmt, testBlockSize, testTargetSize) fillChunkClose(legacy, false) - ordered := NewMemChunk(ChunkFormatV3, EncSnappy, UnorderedHeadBlockFmt, testBlockSize, testTargetSize) + ordered := NewMemChunk(ChunkFormatV3, compression.EncSnappy, UnorderedHeadBlockFmt, testBlockSize, testTargetSize) fillChunkClose(ordered, false) - unordered := NewMemChunk(ChunkFormatV3, EncSnappy, UnorderedHeadBlockFmt, testBlockSize, testTargetSize) + unordered := NewMemChunk(ChunkFormatV3, compression.EncSnappy, UnorderedHeadBlockFmt, testBlockSize, testTargetSize) fillChunkRandomOrder(unordered, false) tcs := []struct { @@ -558,7 +559,7 @@ func BenchmarkUnorderedRead(b *testing.B) { } func TestUnorderedIteratorCountsAllEntries(t *testing.T) { - c := NewMemChunk(ChunkFormatV4, EncSnappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize) + c := NewMemChunk(ChunkFormatV4, compression.EncSnappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize) fillChunkRandomOrder(c, false) ct := 0 @@ -595,7 +596,7 @@ func TestUnorderedIteratorCountsAllEntries(t *testing.T) { } func chunkFrom(xs []logproto.Entry) ([]byte, error) { - c := NewMemChunk(ChunkFormatV4, EncSnappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize) + c := NewMemChunk(ChunkFormatV4, compression.EncSnappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize) for _, x := range xs { if _, err := c.Append(&x); err != nil { return nil, err @@ -655,7 +656,7 @@ func TestReorder(t *testing.T) { }, } { t.Run(tc.desc, func(t *testing.T) { - c := NewMemChunk(ChunkFormatV4, EncSnappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize) + c := NewMemChunk(ChunkFormatV4, compression.EncSnappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize) for _, x := range tc.input { dup, err := c.Append(&x) require.False(t, dup) @@ -674,7 +675,7 @@ func TestReorder(t *testing.T) { } func TestReorderAcrossBlocks(t *testing.T) { - c := NewMemChunk(ChunkFormatV4, EncSnappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize) + c := NewMemChunk(ChunkFormatV4, compression.EncSnappy, UnorderedWithStructuredMetadataHeadBlockFmt, testBlockSize, testTargetSize) for _, batch := range [][]int{ // ensure our blocks have overlapping bounds and must be reordered // before closing. diff --git a/pkg/chunkenc/util_test.go b/pkg/chunkenc/util_test.go index 3da8f9e6d5cb..0d75273d6c81 100644 --- a/pkg/chunkenc/util_test.go +++ b/pkg/chunkenc/util_test.go @@ -5,6 +5,7 @@ import ( "time" "github.com/grafana/loki/v3/pkg/chunkenc/testdata" + "github.com/grafana/loki/v3/pkg/compression" "github.com/grafana/loki/v3/pkg/logproto" ) @@ -23,7 +24,7 @@ func logprotoEntryWithStructuredMetadata(ts int64, line string, structuredMetada } } -func generateData(enc Encoding, chunksCount, blockSize, targetSize int) ([]Chunk, uint64) { +func generateData(enc compression.Encoding, chunksCount, blockSize, targetSize int) ([]Chunk, uint64) { chunks := []Chunk{} i := int64(0) size := uint64(0) diff --git a/pkg/compactor/deletion/delete_requests_table.go b/pkg/compactor/deletion/delete_requests_table.go index 80a47a5e6435..7d4c5cf4d254 100644 --- a/pkg/compactor/deletion/delete_requests_table.go +++ b/pkg/compactor/deletion/delete_requests_table.go @@ -13,7 +13,7 @@ import ( "github.com/go-kit/log/level" "go.etcd.io/bbolt" - "github.com/grafana/loki/v3/pkg/chunkenc" + "github.com/grafana/loki/v3/pkg/compression" "github.com/grafana/loki/v3/pkg/storage/chunk/client/local" "github.com/grafana/loki/v3/pkg/storage/stores/series/index" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/storage" @@ -117,8 +117,9 @@ func (t *deleteRequestsTable) uploadFile() error { }() err = t.db.View(func(tx *bbolt.Tx) (err error) { - compressedWriter := chunkenc.Gzip.GetWriter(f) - defer chunkenc.Gzip.PutWriter(compressedWriter) + gzipPool := compression.GetWriterPool(compression.EncGZIP) + compressedWriter := gzipPool.GetWriter(f) + defer gzipPool.PutWriter(compressedWriter) defer func() { cerr := compressedWriter.Close() diff --git a/pkg/compactor/index_set.go b/pkg/compactor/index_set.go index 7102aef56425..76b5546a9628 100644 --- a/pkg/compactor/index_set.go +++ b/pkg/compactor/index_set.go @@ -12,8 +12,8 @@ import ( "github.com/go-kit/log/level" "github.com/pkg/errors" - "github.com/grafana/loki/v3/pkg/chunkenc" "github.com/grafana/loki/v3/pkg/compactor/retention" + "github.com/grafana/loki/v3/pkg/compression" "github.com/grafana/loki/v3/pkg/storage/chunk/client/util" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/index" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/storage" @@ -229,8 +229,9 @@ func (is *indexSet) upload() error { } }() - compressedWriter := chunkenc.Gzip.GetWriter(f) - defer chunkenc.Gzip.PutWriter(compressedWriter) + gzipPool := compression.GetWriterPool(compression.EncGZIP) + compressedWriter := gzipPool.GetWriter(f) + defer gzipPool.PutWriter(compressedWriter) idxReader, err := idx.Reader() if err != nil { diff --git a/pkg/compactor/retention/retention_test.go b/pkg/compactor/retention/retention_test.go index 4885c835003c..3b124691087a 100644 --- a/pkg/compactor/retention/retention_test.go +++ b/pkg/compactor/retention/retention_test.go @@ -21,6 +21,7 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/chunkenc" + "github.com/grafana/loki/v3/pkg/compression" ingesterclient "github.com/grafana/loki/v3/pkg/ingester/client" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/log" @@ -220,7 +221,7 @@ func createChunk(t testing.TB, userID string, lbs labels.Labels, from model.Time labelsBuilder.Set(labels.MetricName, "logs") metric := labelsBuilder.Labels() fp := ingesterclient.Fingerprint(lbs) - chunkEnc := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, chunkenc.EncSnappy, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, blockSize, targetSize) + chunkEnc := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncSnappy, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, blockSize, targetSize) for ts := from; !ts.After(through); ts = ts.Add(1 * time.Minute) { dup, err := chunkEnc.Append(&logproto.Entry{ diff --git a/pkg/compression/encoding.go b/pkg/compression/encoding.go new file mode 100644 index 000000000000..6b421ed97644 --- /dev/null +++ b/pkg/compression/encoding.go @@ -0,0 +1,86 @@ +package compression + +import ( + "fmt" + "strings" +) + +// Encoding identifies an available compression type. +type Encoding byte + +// The different available encodings. +// Make sure to preserve the order, as the numeric values are serialized! +const ( + EncNone Encoding = iota + EncGZIP + EncDumb + EncLZ4_64k + EncSnappy + EncLZ4_256k + EncLZ4_1M + EncLZ4_4M + EncFlate + EncZstd +) + +var supportedEncoding = []Encoding{ + EncNone, + EncGZIP, + EncLZ4_64k, + EncSnappy, + EncLZ4_256k, + EncLZ4_1M, + EncLZ4_4M, + EncFlate, + EncZstd, +} + +func (e Encoding) String() string { + switch e { + case EncGZIP: + return "gzip" + case EncNone: + return "none" + case EncDumb: + return "dumb" + case EncLZ4_64k: + return "lz4-64k" + case EncLZ4_256k: + return "lz4-256k" + case EncLZ4_1M: + return "lz4-1M" + case EncLZ4_4M: + return "lz4" + case EncSnappy: + return "snappy" + case EncFlate: + return "flate" + case EncZstd: + return "zstd" + default: + return "unknown" + } +} + +// ParseEncoding parses an chunk encoding (compression algorithm) by its name. +func ParseEncoding(enc string) (Encoding, error) { + for _, e := range supportedEncoding { + if strings.EqualFold(e.String(), enc) { + return e, nil + } + } + return 0, fmt.Errorf("invalid encoding: %s, supported: %s", enc, SupportedEncoding()) + +} + +// SupportedEncoding returns the list of supported Encoding. +func SupportedEncoding() string { + var sb strings.Builder + for i := range supportedEncoding { + sb.WriteString(supportedEncoding[i].String()) + if i != len(supportedEncoding)-1 { + sb.WriteString(", ") + } + } + return sb.String() +} diff --git a/pkg/compression/encoding_test.go b/pkg/compression/encoding_test.go new file mode 100644 index 000000000000..d67323ebb2d4 --- /dev/null +++ b/pkg/compression/encoding_test.go @@ -0,0 +1,26 @@ +package compression + +import "testing" + +func TestParseEncoding(t *testing.T) { + tests := []struct { + enc string + want Encoding + wantErr bool + }{ + {"gzip", EncGZIP, false}, + {"bad", 0, true}, + } + for _, tt := range tests { + t.Run(tt.enc, func(t *testing.T) { + got, err := ParseEncoding(tt.enc) + if (err != nil) != tt.wantErr { + t.Errorf("ParseEncoding() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("ParseEncoding() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/compression/pool.go b/pkg/compression/pool.go new file mode 100644 index 000000000000..b68ff7de47b1 --- /dev/null +++ b/pkg/compression/pool.go @@ -0,0 +1,368 @@ +package compression + +import ( + "bufio" + "io" + "runtime" + "sync" + + snappylib "github.com/golang/snappy" + flatelib "github.com/klauspost/compress/flate" + gziplib "github.com/klauspost/compress/gzip" + zstdlib "github.com/klauspost/compress/zstd" + lz4lib "github.com/pierrec/lz4/v4" +) + +// WriterPool is a pool of io.Writer +// This is used by every chunk to avoid unnecessary allocations. +type WriterPool interface { + GetWriter(io.Writer) io.WriteCloser + PutWriter(io.WriteCloser) +} + +// ReaderPool is a pool of io.Reader +// ReaderPool similar to WriterPool but for reading chunks. +type ReaderPool interface { + GetReader(io.Reader) (io.Reader, error) + PutReader(io.Reader) +} + +// ReaderPool is a pool of io.Reader and io.Writer +type ReaderWriterPool interface { + ReaderPool + WriterPool +} + +var ( + // gzip is the gnu zip compression pool + gzip = GzipPool{level: gziplib.DefaultCompression} + // lz4_* are the lz4 compression pools + lz4_64k = LZ4Pool{bufferSize: 1 << 16} // lz4_64k is the l4z compression pool, with 64k buffer size + lz4_256k = LZ4Pool{bufferSize: 1 << 18} // lz4_256k uses 256k buffer + lz4_1M = LZ4Pool{bufferSize: 1 << 20} // lz4_1M uses 1M buffer + lz4_4M = LZ4Pool{bufferSize: 1 << 22} // lz4_4M uses 4M buffer + // flate is the flate compression pool + flate = FlatePool{} + // zstd is the zstd compression pool + zstd = ZstdPool{} + // snappy is the snappy compression pool + snappy = SnappyPool{} + // noop is the no compression pool + noop = NoopPool{} +) + +func GetWriterPool(enc Encoding) WriterPool { + return GetPool(enc).(WriterPool) +} + +func GetReaderPool(enc Encoding) ReaderPool { + return GetPool(enc).(ReaderPool) +} + +func GetPool(enc Encoding) ReaderWriterPool { + switch enc { + case EncGZIP: + return &gzip + case EncLZ4_64k: + return &lz4_64k + case EncLZ4_256k: + return &lz4_256k + case EncLZ4_1M: + return &lz4_1M + case EncLZ4_4M: + return &lz4_4M + case EncSnappy: + return &snappy + case EncNone: + return &noop + case EncFlate: + return &flate + case EncZstd: + return &zstd + default: + panic("unknown encoding") + } +} + +// GzipPool is a gnu zip compression pool +type GzipPool struct { + readers sync.Pool + writers sync.Pool + level int +} + +// Gzip needs buffering to read efficiently. +// We need to be able to see the underlying gzip.Reader to Reset it. +type gzipBufferedReader struct { + *bufio.Reader + gzipReader *gziplib.Reader +} + +// GetReader gets or creates a new CompressionReader and reset it to read from src +func (pool *GzipPool) GetReader(src io.Reader) (io.Reader, error) { + if r := pool.readers.Get(); r != nil { + reader := r.(*gzipBufferedReader) + err := reader.gzipReader.Reset(src) + if err != nil { + return nil, err + } + reader.Reader.Reset(reader.gzipReader) + return reader, nil + } + gzipReader, err := gziplib.NewReader(src) + if err != nil { + return nil, err + } + return &gzipBufferedReader{ + gzipReader: gzipReader, + Reader: bufio.NewReaderSize(gzipReader, 4*1024), + }, nil +} + +// PutReader places back in the pool a CompressionReader +func (pool *GzipPool) PutReader(reader io.Reader) { + pool.readers.Put(reader) +} + +// GetWriter gets or creates a new CompressionWriter and reset it to write to dst +func (pool *GzipPool) GetWriter(dst io.Writer) io.WriteCloser { + if w := pool.writers.Get(); w != nil { + writer := w.(*gziplib.Writer) + writer.Reset(dst) + return writer + } + + level := pool.level + if level == 0 { + level = gziplib.DefaultCompression + } + w, err := gziplib.NewWriterLevel(dst, level) + if err != nil { + panic(err) // never happens, error is only returned on wrong compression level. + } + return w +} + +// PutWriter places back in the pool a CompressionWriter +func (pool *GzipPool) PutWriter(writer io.WriteCloser) { + pool.writers.Put(writer) +} + +// FlatePool is a flate compression pool +type FlatePool struct { + readers sync.Pool + writers sync.Pool + level int +} + +// GetReader gets or creates a new CompressionReader and reset it to read from src +func (pool *FlatePool) GetReader(src io.Reader) (io.Reader, error) { + if r := pool.readers.Get(); r != nil { + reader := r.(flatelib.Resetter) + err := reader.Reset(src, nil) + if err != nil { + panic(err) + } + return reader.(io.Reader), nil + } + return flatelib.NewReader(src), nil +} + +// PutReader places back in the pool a CompressionReader +func (pool *FlatePool) PutReader(reader io.Reader) { + pool.readers.Put(reader) +} + +// GetWriter gets or creates a new CompressionWriter and reset it to write to dst +func (pool *FlatePool) GetWriter(dst io.Writer) io.WriteCloser { + if w := pool.writers.Get(); w != nil { + writer := w.(*flatelib.Writer) + writer.Reset(dst) + return writer + } + + level := pool.level + if level == 0 { + level = flatelib.DefaultCompression + } + w, err := flatelib.NewWriter(dst, level) + if err != nil { + panic(err) // never happens, error is only returned on wrong compression level. + } + return w +} + +// PutWriter places back in the pool a CompressionWriter +func (pool *FlatePool) PutWriter(writer io.WriteCloser) { + pool.writers.Put(writer) +} + +// GzipPool is a gun zip compression pool +type ZstdPool struct { + readers sync.Pool + writers sync.Pool +} + +// GetReader gets or creates a new CompressionReader and reset it to read from src +func (pool *ZstdPool) GetReader(src io.Reader) (io.Reader, error) { + if r := pool.readers.Get(); r != nil { + reader := r.(*zstdlib.Decoder) + err := reader.Reset(src) + if err != nil { + return nil, err + } + return reader, nil + } + reader, err := zstdlib.NewReader(src) + if err != nil { + return nil, err + } + runtime.SetFinalizer(reader, (*zstdlib.Decoder).Close) + return reader, nil +} + +// PutReader places back in the pool a CompressionReader +func (pool *ZstdPool) PutReader(reader io.Reader) { + pool.readers.Put(reader) +} + +// GetWriter gets or creates a new CompressionWriter and reset it to write to dst +func (pool *ZstdPool) GetWriter(dst io.Writer) io.WriteCloser { + if w := pool.writers.Get(); w != nil { + writer := w.(*zstdlib.Encoder) + writer.Reset(dst) + return writer + } + + w, err := zstdlib.NewWriter(dst) + if err != nil { + panic(err) // never happens, error is only returned on wrong compression level. + } + return w +} + +// PutWriter places back in the pool a CompressionWriter +func (pool *ZstdPool) PutWriter(writer io.WriteCloser) { + pool.writers.Put(writer) +} + +type LZ4Pool struct { + readers sync.Pool + writers sync.Pool + bufferSize uint32 // available values: 1<<16 (64k), 1<<18 (256k), 1<<20 (1M), 1<<22 (4M). Defaults to 4MB, if not set. +} + +// We need to be able to see the underlying lz4.Reader to Reset it. +type lz4BufferedReader struct { + *bufio.Reader + lz4Reader *lz4lib.Reader +} + +// GetReader gets or creates a new CompressionReader and reset it to read from src +func (pool *LZ4Pool) GetReader(src io.Reader) (io.Reader, error) { + var r *lz4BufferedReader + if pooled := pool.readers.Get(); pooled != nil { + r = pooled.(*lz4BufferedReader) + r.lz4Reader.Reset(src) + r.Reader.Reset(r.lz4Reader) + } else { + lz4Reader := lz4lib.NewReader(src) + r = &lz4BufferedReader{ + lz4Reader: lz4Reader, + Reader: bufio.NewReaderSize(lz4Reader, 4*1024), + } + } + return r, nil +} + +// PutReader places back in the pool a CompressionReader +func (pool *LZ4Pool) PutReader(reader io.Reader) { + pool.readers.Put(reader) +} + +// GetWriter gets or creates a new CompressionWriter and reset it to write to dst +func (pool *LZ4Pool) GetWriter(dst io.Writer) io.WriteCloser { + var w *lz4lib.Writer + if fromPool := pool.writers.Get(); fromPool != nil { + w = fromPool.(*lz4lib.Writer) + w.Reset(dst) + } else { + w = lz4lib.NewWriter(dst) + } + err := w.Apply( + lz4lib.ChecksumOption(false), + lz4lib.BlockSizeOption(lz4lib.BlockSize(pool.bufferSize)), + lz4lib.CompressionLevelOption(lz4lib.Fast), + ) + if err != nil { + panic(err) + } + return w +} + +// PutWriter places back in the pool a CompressionWriter +func (pool *LZ4Pool) PutWriter(writer io.WriteCloser) { + pool.writers.Put(writer) +} + +type SnappyPool struct { + readers sync.Pool + writers sync.Pool +} + +// GetReader gets or creates a new CompressionReader and reset it to read from src +func (pool *SnappyPool) GetReader(src io.Reader) (io.Reader, error) { + if r := pool.readers.Get(); r != nil { + reader := r.(*snappylib.Reader) + reader.Reset(src) + return reader, nil + } + return snappylib.NewReader(src), nil +} + +// PutReader places back in the pool a CompressionReader +func (pool *SnappyPool) PutReader(reader io.Reader) { + r := reader.(*snappylib.Reader) + // Reset to free reference to the underlying reader + r.Reset(nil) + pool.readers.Put(reader) +} + +// GetWriter gets or creates a new CompressionWriter and reset it to write to dst +func (pool *SnappyPool) GetWriter(dst io.Writer) io.WriteCloser { + if w := pool.writers.Get(); w != nil { + writer := w.(*snappylib.Writer) + writer.Reset(dst) + return writer + } + return snappylib.NewBufferedWriter(dst) +} + +// PutWriter places back in the pool a CompressionWriter +func (pool *SnappyPool) PutWriter(writer io.WriteCloser) { + pool.writers.Put(writer) +} + +type NoopPool struct{} + +// GetReader gets or creates a new CompressionReader and reset it to read from src +func (pool *NoopPool) GetReader(src io.Reader) (io.Reader, error) { + return src, nil +} + +// PutReader places back in the pool a CompressionReader +func (pool *NoopPool) PutReader(_ io.Reader) {} + +type noopCloser struct { + io.Writer +} + +func (noopCloser) Close() error { return nil } + +// GetWriter gets or creates a new CompressionWriter and reset it to write to dst +func (pool *NoopPool) GetWriter(dst io.Writer) io.WriteCloser { + return noopCloser{dst} +} + +// PutWriter places back in the pool a CompressionWriter +func (pool *NoopPool) PutWriter(_ io.WriteCloser) {} diff --git a/pkg/compression/pool_test.go b/pkg/compression/pool_test.go new file mode 100644 index 000000000000..b39bbe0ad6f4 --- /dev/null +++ b/pkg/compression/pool_test.go @@ -0,0 +1,64 @@ +package compression + +import ( + "bytes" + "io" + "os" + "runtime" + "runtime/pprof" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestPool(t *testing.T) { + for _, enc := range supportedEncoding { + enc := enc + t.Run(enc.String(), func(t *testing.T) { + var wg sync.WaitGroup + + for i := 0; i < 200; i++ { + wg.Add(1) + go func() { + defer wg.Done() + var ( + buf = bytes.NewBuffer(nil) + res = make([]byte, 1024) + wpool = GetWriterPool(enc) + rpool = GetReaderPool(enc) + ) + + w := wpool.GetWriter(buf) + defer wpool.PutWriter(w) + _, err := w.Write([]byte("test")) + require.NoError(t, err) + require.NoError(t, w.Close()) + + require.True(t, buf.Len() != 0, enc) + r, err := rpool.GetReader(bytes.NewBuffer(buf.Bytes())) + require.NoError(t, err) + defer rpool.PutReader(r) + n, err := r.Read(res) + if err != nil { + require.Error(t, err, io.EOF) + } + require.Equal(t, 4, n, enc.String()) + require.Equal(t, []byte("test"), res[:n], enc) + }() + } + + wg.Wait() + + if !assert.Eventually(t, func() bool { + runtime.GC() + return runtime.NumGoroutine() <= 50 + }, 5*time.Second, 10*time.Millisecond) { + _ = pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) + } + + }) + } +} diff --git a/pkg/ingester/checkpoint_test.go b/pkg/ingester/checkpoint_test.go index 5a816a3b779d..88e770d0c2da 100644 --- a/pkg/ingester/checkpoint_test.go +++ b/pkg/ingester/checkpoint_test.go @@ -16,6 +16,7 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/chunkenc" + "github.com/grafana/loki/v3/pkg/compression" "github.com/grafana/loki/v3/pkg/distributor/writefailures" "github.com/grafana/loki/v3/pkg/ingester/client" "github.com/grafana/loki/v3/pkg/logproto" @@ -565,7 +566,7 @@ func buildChunks(t testing.TB, size int) []Chunk { for i := 0; i < size; i++ { // build chunks of 256k blocks, 1.5MB target size. Same as default config. - c := chunkenc.NewMemChunk(chunkenc.ChunkFormatV3, chunkenc.EncGZIP, chunkenc.UnorderedHeadBlockFmt, 256*1024, 1500*1024) + c := chunkenc.NewMemChunk(chunkenc.ChunkFormatV3, compression.EncGZIP, chunkenc.UnorderedHeadBlockFmt, 256*1024, 1500*1024) fillChunk(t, c) descs = append(descs, chunkDesc{ chunk: c, diff --git a/pkg/ingester/chunk_test.go b/pkg/ingester/chunk_test.go index f6a16731e6d4..961b256ea58c 100644 --- a/pkg/ingester/chunk_test.go +++ b/pkg/ingester/chunk_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/chunkenc" + "github.com/grafana/loki/v3/pkg/compression" "github.com/grafana/loki/v3/pkg/iter" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/log" @@ -49,7 +50,7 @@ func TestIterator(t *testing.T) { }{ {"dumbChunk", chunkenc.NewDumbChunk}, {"gzipChunk", func() chunkenc.Chunk { - return chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, chunkenc.EncGZIP, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 256*1024, 0) + return chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncGZIP, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 256*1024, 0) }}, } { t.Run(chk.name, func(t *testing.T) { diff --git a/pkg/ingester/encoding_test.go b/pkg/ingester/encoding_test.go index 458da1132c96..ee2ad1d8f681 100644 --- a/pkg/ingester/encoding_test.go +++ b/pkg/ingester/encoding_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/chunkenc" + "github.com/grafana/loki/v3/pkg/compression" "github.com/grafana/loki/v3/pkg/ingester/wal" "github.com/grafana/loki/v3/pkg/logproto" ) @@ -58,7 +59,7 @@ func Test_EncodingChunks(t *testing.T) { t.Run(fmt.Sprintf("%v-%s", close, tc.desc), func(t *testing.T) { conf := tc.conf - c := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, chunkenc.EncGZIP, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, conf.BlockSize, conf.TargetChunkSize) + c := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncGZIP, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, conf.BlockSize, conf.TargetChunkSize) fillChunk(t, c) if close { require.Nil(t, c.Close()) @@ -121,7 +122,7 @@ func Test_EncodingChunks(t *testing.T) { func Test_EncodingCheckpoint(t *testing.T) { conf := dummyConf() - c := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, chunkenc.EncGZIP, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, conf.BlockSize, conf.TargetChunkSize) + c := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncGZIP, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, conf.BlockSize, conf.TargetChunkSize) dup, err := c.Append(&logproto.Entry{ Timestamp: time.Unix(1, 0), Line: "hi there", diff --git a/pkg/ingester/flush_test.go b/pkg/ingester/flush_test.go index 69462a3d352a..5ef40d9d17d0 100644 --- a/pkg/ingester/flush_test.go +++ b/pkg/ingester/flush_test.go @@ -24,6 +24,7 @@ import ( "github.com/grafana/dskit/tenant" "github.com/grafana/loki/v3/pkg/chunkenc" + "github.com/grafana/loki/v3/pkg/compression" "github.com/grafana/loki/v3/pkg/distributor/writefailures" "github.com/grafana/loki/v3/pkg/ingester/client" "github.com/grafana/loki/v3/pkg/ingester/wal" @@ -188,7 +189,7 @@ func buildChunkDecs(t testing.TB) []*chunkDesc { for i := range res { res[i] = &chunkDesc{ closed: true, - chunk: chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, chunkenc.EncSnappy, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, dummyConf().BlockSize, dummyConf().TargetChunkSize), + chunk: chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncSnappy, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, dummyConf().BlockSize, dummyConf().TargetChunkSize), } fillChunk(t, res[i].chunk) require.NoError(t, res[i].chunk.Close()) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 9c913f9049f4..81c3c1a68350 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -14,14 +14,6 @@ import ( "sync" "time" - "github.com/grafana/loki/v3/pkg/kafka" - "github.com/grafana/loki/v3/pkg/kafka/partitionring" - "github.com/grafana/loki/v3/pkg/loghttp/push" - "github.com/grafana/loki/v3/pkg/logqlmodel/metadata" - "github.com/grafana/loki/v3/pkg/storage/types" - - lokilog "github.com/grafana/loki/v3/pkg/logql/log" - "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/backoff" @@ -38,17 +30,20 @@ import ( "golang.org/x/time/rate" "google.golang.org/grpc/health/grpc_health_v1" - server_util "github.com/grafana/loki/v3/pkg/util/server" - "github.com/grafana/loki/v3/pkg/analytics" - "github.com/grafana/loki/v3/pkg/chunkenc" + "github.com/grafana/loki/v3/pkg/compression" "github.com/grafana/loki/v3/pkg/distributor/writefailures" "github.com/grafana/loki/v3/pkg/ingester/client" "github.com/grafana/loki/v3/pkg/ingester/index" "github.com/grafana/loki/v3/pkg/iter" + "github.com/grafana/loki/v3/pkg/kafka" + "github.com/grafana/loki/v3/pkg/kafka/partitionring" + "github.com/grafana/loki/v3/pkg/loghttp/push" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql" + lokilog "github.com/grafana/loki/v3/pkg/logql/log" "github.com/grafana/loki/v3/pkg/logql/syntax" + "github.com/grafana/loki/v3/pkg/logqlmodel/metadata" "github.com/grafana/loki/v3/pkg/logqlmodel/stats" "github.com/grafana/loki/v3/pkg/querier/plan" "github.com/grafana/loki/v3/pkg/runtime" @@ -59,8 +54,10 @@ import ( indexstore "github.com/grafana/loki/v3/pkg/storage/stores/index" "github.com/grafana/loki/v3/pkg/storage/stores/index/seriesvolume" index_stats "github.com/grafana/loki/v3/pkg/storage/stores/index/stats" + "github.com/grafana/loki/v3/pkg/storage/types" "github.com/grafana/loki/v3/pkg/util" util_log "github.com/grafana/loki/v3/pkg/util/log" + server_util "github.com/grafana/loki/v3/pkg/util/server" "github.com/grafana/loki/v3/pkg/util/wal" ) @@ -90,18 +87,18 @@ var ( type Config struct { LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty" doc:"description=Configures how the lifecycle of the ingester will operate and where it will register for discovery."` - ConcurrentFlushes int `yaml:"concurrent_flushes"` - FlushCheckPeriod time.Duration `yaml:"flush_check_period"` - FlushOpBackoff backoff.Config `yaml:"flush_op_backoff"` - FlushOpTimeout time.Duration `yaml:"flush_op_timeout"` - RetainPeriod time.Duration `yaml:"chunk_retain_period"` - MaxChunkIdle time.Duration `yaml:"chunk_idle_period"` - BlockSize int `yaml:"chunk_block_size"` - TargetChunkSize int `yaml:"chunk_target_size"` - ChunkEncoding string `yaml:"chunk_encoding"` - parsedEncoding chunkenc.Encoding `yaml:"-"` // placeholder for validated encoding - MaxChunkAge time.Duration `yaml:"max_chunk_age"` - AutoForgetUnhealthy bool `yaml:"autoforget_unhealthy"` + ConcurrentFlushes int `yaml:"concurrent_flushes"` + FlushCheckPeriod time.Duration `yaml:"flush_check_period"` + FlushOpBackoff backoff.Config `yaml:"flush_op_backoff"` + FlushOpTimeout time.Duration `yaml:"flush_op_timeout"` + RetainPeriod time.Duration `yaml:"chunk_retain_period"` + MaxChunkIdle time.Duration `yaml:"chunk_idle_period"` + BlockSize int `yaml:"chunk_block_size"` + TargetChunkSize int `yaml:"chunk_target_size"` + ChunkEncoding string `yaml:"chunk_encoding"` + parsedEncoding compression.Encoding `yaml:"-"` // placeholder for validated encoding + MaxChunkAge time.Duration `yaml:"max_chunk_age"` + AutoForgetUnhealthy bool `yaml:"autoforget_unhealthy"` // Synchronization settings. Used to make sure that ingesters cut their chunks at the same moments. SyncPeriod time.Duration `yaml:"sync_period"` @@ -151,7 +148,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.MaxChunkIdle, "ingester.chunks-idle-period", 30*time.Minute, "How long chunks should sit in-memory with no updates before being flushed if they don't hit the max block size. This means that half-empty chunks will still be flushed after a certain period as long as they receive no further activity.") f.IntVar(&cfg.BlockSize, "ingester.chunks-block-size", 256*1024, "The targeted _uncompressed_ size in bytes of a chunk block When this threshold is exceeded the head block will be cut and compressed inside the chunk.") f.IntVar(&cfg.TargetChunkSize, "ingester.chunk-target-size", 1572864, "A target _compressed_ size in bytes for chunks. This is a desired size not an exact size, chunks may be slightly bigger or significantly smaller if they get flushed for other reasons (e.g. chunk_idle_period). A value of 0 creates chunks with a fixed 10 blocks, a non zero value will create chunks with a variable number of blocks to meet the target size.") // 1.5 MB - f.StringVar(&cfg.ChunkEncoding, "ingester.chunk-encoding", chunkenc.EncGZIP.String(), fmt.Sprintf("The algorithm to use for compressing chunk. (%s)", chunkenc.SupportedEncoding())) + f.StringVar(&cfg.ChunkEncoding, "ingester.chunk-encoding", compression.EncGZIP.String(), fmt.Sprintf("The algorithm to use for compressing chunk. (%s)", compression.SupportedEncoding())) f.DurationVar(&cfg.SyncPeriod, "ingester.sync-period", 1*time.Hour, "Parameters used to synchronize ingesters to cut chunks at the same moment. Sync period is used to roll over incoming entry to a new chunk. If chunk's utilization isn't high enough (eg. less than 50% when sync_min_utilization is set to 0.5), then this chunk rollover doesn't happen.") f.Float64Var(&cfg.SyncMinUtilization, "ingester.sync-min-utilization", 0.1, "Minimum utilization of chunk when doing synchronization.") f.IntVar(&cfg.MaxReturnedErrors, "ingester.max-ignored-stream-errors", 10, "The maximum number of errors a stream will report to the user when a push fails. 0 to make unlimited.") @@ -165,7 +162,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { } func (cfg *Config) Validate() error { - enc, err := chunkenc.ParseEncoding(cfg.ChunkEncoding) + enc, err := compression.ParseEncoding(cfg.ChunkEncoding) if err != nil { return err } diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 17d34b57dc54..bd43daec7a31 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -32,7 +32,7 @@ import ( "github.com/grafana/dskit/tenant" - "github.com/grafana/loki/v3/pkg/chunkenc" + "github.com/grafana/loki/v3/pkg/compression" "github.com/grafana/loki/v3/pkg/distributor/writefailures" "github.com/grafana/loki/v3/pkg/ingester/client" "github.com/grafana/loki/v3/pkg/ingester/index" @@ -697,7 +697,7 @@ func TestValidate(t *testing.T) { }{ { in: Config{ - ChunkEncoding: chunkenc.EncGZIP.String(), + ChunkEncoding: compression.EncGZIP.String(), FlushOpBackoff: backoff.Config{ MinBackoff: 100 * time.Millisecond, MaxBackoff: 10 * time.Second, @@ -708,7 +708,7 @@ func TestValidate(t *testing.T) { MaxChunkAge: time.Minute, }, expected: Config{ - ChunkEncoding: chunkenc.EncGZIP.String(), + ChunkEncoding: compression.EncGZIP.String(), FlushOpBackoff: backoff.Config{ MinBackoff: 100 * time.Millisecond, MaxBackoff: 10 * time.Second, @@ -717,12 +717,12 @@ func TestValidate(t *testing.T) { FlushOpTimeout: 15 * time.Second, IndexShards: index.DefaultIndexShards, MaxChunkAge: time.Minute, - parsedEncoding: chunkenc.EncGZIP, + parsedEncoding: compression.EncGZIP, }, }, { in: Config{ - ChunkEncoding: chunkenc.EncSnappy.String(), + ChunkEncoding: compression.EncSnappy.String(), FlushOpBackoff: backoff.Config{ MinBackoff: 100 * time.Millisecond, MaxBackoff: 10 * time.Second, @@ -732,7 +732,7 @@ func TestValidate(t *testing.T) { IndexShards: index.DefaultIndexShards, }, expected: Config{ - ChunkEncoding: chunkenc.EncSnappy.String(), + ChunkEncoding: compression.EncSnappy.String(), FlushOpBackoff: backoff.Config{ MinBackoff: 100 * time.Millisecond, MaxBackoff: 10 * time.Second, @@ -740,7 +740,7 @@ func TestValidate(t *testing.T) { }, FlushOpTimeout: 15 * time.Second, IndexShards: index.DefaultIndexShards, - parsedEncoding: chunkenc.EncSnappy, + parsedEncoding: compression.EncSnappy, }, }, { @@ -758,7 +758,7 @@ func TestValidate(t *testing.T) { }, { in: Config{ - ChunkEncoding: chunkenc.EncGZIP.String(), + ChunkEncoding: compression.EncGZIP.String(), FlushOpBackoff: backoff.Config{ MinBackoff: 100 * time.Millisecond, MaxBackoff: 10 * time.Second, @@ -771,7 +771,7 @@ func TestValidate(t *testing.T) { }, { in: Config{ - ChunkEncoding: chunkenc.EncGZIP.String(), + ChunkEncoding: compression.EncGZIP.String(), FlushOpBackoff: backoff.Config{ MinBackoff: 100 * time.Millisecond, MaxBackoff: 10 * time.Second, @@ -784,7 +784,7 @@ func TestValidate(t *testing.T) { }, { in: Config{ - ChunkEncoding: chunkenc.EncGZIP.String(), + ChunkEncoding: compression.EncGZIP.String(), FlushOpBackoff: backoff.Config{ MinBackoff: 100 * time.Millisecond, MaxBackoff: 10 * time.Second, diff --git a/pkg/ingester/stream_test.go b/pkg/ingester/stream_test.go index 6dbd521f1abc..9ac86fbd3015 100644 --- a/pkg/ingester/stream_test.go +++ b/pkg/ingester/stream_test.go @@ -14,6 +14,7 @@ import ( gokitlog "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" + "github.com/grafana/loki/v3/pkg/compression" "github.com/grafana/loki/v3/pkg/runtime" "github.com/grafana/dskit/httpgrpc" @@ -276,7 +277,7 @@ func TestStreamIterator(t *testing.T) { {"gzipChunk", func() *chunkenc.MemChunk { chunkfmt, headfmt := defaultChunkFormat(t) - return chunkenc.NewMemChunk(chunkfmt, chunkenc.EncGZIP, headfmt, 256*1024, 0) + return chunkenc.NewMemChunk(chunkfmt, compression.EncGZIP, headfmt, 256*1024, 0) }}, } { t.Run(chk.name, func(t *testing.T) { diff --git a/pkg/storage/bloom/v1/archive.go b/pkg/storage/bloom/v1/archive.go index fcc3294eba97..201b071b2500 100644 --- a/pkg/storage/bloom/v1/archive.go +++ b/pkg/storage/bloom/v1/archive.go @@ -8,7 +8,7 @@ import ( "github.com/pkg/errors" - "github.com/grafana/loki/v3/pkg/chunkenc" + "github.com/grafana/loki/v3/pkg/compression" ) type TarEntry struct { @@ -23,7 +23,7 @@ func TarGz(dst io.Writer, reader BlockReader) error { return errors.Wrap(err, "error getting tar entries") } - gzipper := chunkenc.GetWriterPool(chunkenc.EncGZIP).GetWriter(dst) + gzipper := compression.GetWriterPool(compression.EncGZIP).GetWriter(dst) defer gzipper.Close() tarballer := tar.NewWriter(gzipper) @@ -50,7 +50,7 @@ func TarGz(dst io.Writer, reader BlockReader) error { } func UnTarGz(dst string, r io.Reader) error { - gzipper, err := chunkenc.GetReaderPool(chunkenc.EncGZIP).GetReader(r) + gzipper, err := compression.GetReaderPool(compression.EncGZIP).GetReader(r) if err != nil { return errors.Wrap(err, "error getting gzip reader") } diff --git a/pkg/storage/bloom/v1/archive_test.go b/pkg/storage/bloom/v1/archive_test.go index 401cc56a218c..e0d2f69a1c84 100644 --- a/pkg/storage/bloom/v1/archive_test.go +++ b/pkg/storage/bloom/v1/archive_test.go @@ -7,7 +7,7 @@ import ( "github.com/stretchr/testify/require" - "github.com/grafana/loki/v3/pkg/chunkenc" + "github.com/grafana/loki/v3/pkg/compression" v2 "github.com/grafana/loki/v3/pkg/iter/v2" ) @@ -24,7 +24,7 @@ func TestArchive(t *testing.T) { BlockOptions{ Schema: Schema{ version: CurrentSchemaVersion, - encoding: chunkenc.EncSnappy, + encoding: compression.EncSnappy, }, SeriesPageSize: 100, BloomPageSize: 10 << 10, diff --git a/pkg/storage/bloom/v1/bloom.go b/pkg/storage/bloom/v1/bloom.go index 878f254abc17..b77af18d1ace 100644 --- a/pkg/storage/bloom/v1/bloom.go +++ b/pkg/storage/bloom/v1/bloom.go @@ -7,7 +7,7 @@ import ( "github.com/pkg/errors" - "github.com/grafana/loki/v3/pkg/chunkenc" + "github.com/grafana/loki/v3/pkg/compression" "github.com/grafana/loki/v3/pkg/storage/bloom/v1/filter" "github.com/grafana/loki/v3/pkg/util/encoding" "github.com/grafana/loki/v3/pkg/util/mempool" @@ -71,7 +71,7 @@ func (b *Bloom) Decode(dec *encoding.Decbuf) error { return nil } -func LazyDecodeBloomPage(r io.Reader, alloc mempool.Allocator, pool chunkenc.ReaderPool, page BloomPageHeader) (*BloomPageDecoder, error) { +func LazyDecodeBloomPage(r io.Reader, alloc mempool.Allocator, pool compression.ReaderPool, page BloomPageHeader) (*BloomPageDecoder, error) { data, err := alloc.Get(page.Len) if err != nil { return nil, errors.Wrap(err, "allocating buffer") @@ -316,7 +316,7 @@ func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, alloc mempool.Allocator, return nil, false, errors.Wrap(err, "seeking to bloom page") } - if b.schema.encoding == chunkenc.EncNone { + if b.schema.encoding == compression.EncNone { res, err = LazyDecodeBloomPageNoCompression(r, alloc, page) } else { res, err = LazyDecodeBloomPage(r, alloc, b.schema.DecompressorPool(), page) diff --git a/pkg/storage/bloom/v1/bloom_tokenizer_test.go b/pkg/storage/bloom/v1/bloom_tokenizer_test.go index 7e8f5c4c9993..b5145f5f9309 100644 --- a/pkg/storage/bloom/v1/bloom_tokenizer_test.go +++ b/pkg/storage/bloom/v1/bloom_tokenizer_test.go @@ -12,6 +12,7 @@ import ( "github.com/grafana/dskit/multierror" "github.com/grafana/loki/v3/pkg/chunkenc" + "github.com/grafana/loki/v3/pkg/compression" "github.com/grafana/loki/v3/pkg/iter" v2 "github.com/grafana/loki/v3/pkg/iter/v2" "github.com/grafana/loki/v3/pkg/logproto" @@ -104,7 +105,7 @@ func TestTokenizerPopulate(t *testing.T) { {Name: "pod", Value: "loki-1"}, {Name: "trace_id", Value: "3bef3c91643bde73"}, } - memChunk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, chunkenc.EncSnappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000) + memChunk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncSnappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000) _, _ = memChunk.Append(&push.Entry{ Timestamp: time.Unix(0, 1), Line: testLine, @@ -149,7 +150,7 @@ func TestBloomTokenizerPopulateWithoutPreexistingBloom(t *testing.T) { {Name: "pod", Value: "loki-1"}, {Name: "trace_id", Value: "3bef3c91643bde73"}, } - memChunk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, chunkenc.EncSnappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000) + memChunk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncSnappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000) _, _ = memChunk.Append(&push.Entry{ Timestamp: time.Unix(0, 1), Line: testLine, @@ -186,7 +187,7 @@ func TestBloomTokenizerPopulateWithoutPreexistingBloom(t *testing.T) { } func chunkRefItrFromMetadata(metadata ...push.LabelsAdapter) (iter.EntryIterator, error) { - memChunk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, chunkenc.EncSnappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000) + memChunk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncSnappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000) for i, md := range metadata { if _, err := memChunk.Append(&push.Entry{ Timestamp: time.Unix(0, int64(i)), @@ -272,7 +273,7 @@ func BenchmarkPopulateSeriesWithBloom(b *testing.B) { sbf := filter.NewScalableBloomFilter(1024, 0.01, 0.8) - memChunk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, chunkenc.EncSnappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000) + memChunk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncSnappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000) _, _ = memChunk.Append(&push.Entry{ Timestamp: time.Unix(0, 1), Line: testLine, diff --git a/pkg/storage/bloom/v1/builder.go b/pkg/storage/bloom/v1/builder.go index 08f92631b164..1e5278c24d31 100644 --- a/pkg/storage/bloom/v1/builder.go +++ b/pkg/storage/bloom/v1/builder.go @@ -7,7 +7,7 @@ import ( "github.com/pkg/errors" - "github.com/grafana/loki/v3/pkg/chunkenc" + "github.com/grafana/loki/v3/pkg/compression" iter "github.com/grafana/loki/v3/pkg/iter/v2" "github.com/grafana/loki/v3/pkg/util/encoding" ) @@ -66,7 +66,7 @@ func (b BlockOptions) Encode(enc *encoding.Encbuf) { enc.PutBE64(b.BlockSize) } -func NewBlockOptions(enc chunkenc.Encoding, nGramLength, nGramSkip, maxBlockSizeBytes, maxBloomSizeBytes uint64) BlockOptions { +func NewBlockOptions(enc compression.Encoding, nGramLength, nGramSkip, maxBlockSizeBytes, maxBloomSizeBytes uint64) BlockOptions { opts := NewBlockOptionsFromSchema(Schema{ version: CurrentSchemaVersion, encoding: enc, @@ -122,7 +122,7 @@ func (w *PageWriter) Add(item []byte) (offset int) { return offset } -func (w *PageWriter) writePage(writer io.Writer, pool chunkenc.WriterPool, crc32Hash hash.Hash32) (int, int, error) { +func (w *PageWriter) writePage(writer io.Writer, pool compression.WriterPool, crc32Hash hash.Hash32) (int, int, error) { // write the number of blooms in this page, must not be varint // so we can calculate it's position+len during decoding w.enc.PutBE64(uint64(w.n)) diff --git a/pkg/storage/bloom/v1/builder_test.go b/pkg/storage/bloom/v1/builder_test.go index 8db825e79965..3664b60d515f 100644 --- a/pkg/storage/bloom/v1/builder_test.go +++ b/pkg/storage/bloom/v1/builder_test.go @@ -9,18 +9,18 @@ import ( "github.com/prometheus/common/model" "github.com/stretchr/testify/require" - "github.com/grafana/loki/v3/pkg/chunkenc" + "github.com/grafana/loki/v3/pkg/compression" iter "github.com/grafana/loki/v3/pkg/iter/v2" "github.com/grafana/loki/v3/pkg/util/encoding" "github.com/grafana/loki/v3/pkg/util/mempool" ) -var blockEncodings = []chunkenc.Encoding{ - chunkenc.EncNone, - chunkenc.EncGZIP, - chunkenc.EncSnappy, - chunkenc.EncLZ4_256k, - chunkenc.EncZstd, +var blockEncodings = []compression.Encoding{ + compression.EncNone, + compression.EncGZIP, + compression.EncSnappy, + compression.EncLZ4_256k, + compression.EncZstd, } func TestBlockOptions_RoundTrip(t *testing.T) { @@ -28,7 +28,7 @@ func TestBlockOptions_RoundTrip(t *testing.T) { opts := BlockOptions{ Schema: Schema{ version: CurrentSchemaVersion, - encoding: chunkenc.EncSnappy, + encoding: compression.EncSnappy, nGramLength: 10, nGramSkip: 2, }, @@ -205,7 +205,7 @@ func TestMergeBuilder(t *testing.T) { blockOpts := BlockOptions{ Schema: Schema{ version: CurrentSchemaVersion, - encoding: chunkenc.EncSnappy, + encoding: compression.EncSnappy, }, SeriesPageSize: 100, BloomPageSize: 10 << 10, @@ -302,7 +302,7 @@ func TestMergeBuilderFingerprintCollision(t *testing.T) { blockOpts := BlockOptions{ Schema: Schema{ version: CurrentSchemaVersion, - encoding: chunkenc.EncSnappy, + encoding: compression.EncSnappy, }, SeriesPageSize: 100, BloomPageSize: 10 << 10, @@ -399,7 +399,7 @@ func TestBlockReset(t *testing.T) { schema := Schema{ version: CurrentSchemaVersion, - encoding: chunkenc.EncSnappy, + encoding: compression.EncSnappy, nGramLength: 10, nGramSkip: 2, } @@ -457,9 +457,9 @@ func TestMergeBuilder_Roundtrip(t *testing.T) { blockOpts := BlockOptions{ Schema: Schema{ version: CurrentSchemaVersion, - encoding: chunkenc.EncSnappy, // test with different encodings? - nGramLength: 4, // needs to match values from MkBasicSeriesWithBlooms - nGramSkip: 0, // needs to match values from MkBasicSeriesWithBlooms + encoding: compression.EncSnappy, // test with different encodings? + nGramLength: 4, // needs to match values from MkBasicSeriesWithBlooms + nGramSkip: 0, // needs to match values from MkBasicSeriesWithBlooms }, SeriesPageSize: 100, BloomPageSize: 10 << 10, diff --git a/pkg/storage/bloom/v1/fuse_test.go b/pkg/storage/bloom/v1/fuse_test.go index 4f33a9130938..befa5a7a9fa5 100644 --- a/pkg/storage/bloom/v1/fuse_test.go +++ b/pkg/storage/bloom/v1/fuse_test.go @@ -13,7 +13,7 @@ import ( "github.com/prometheus/common/model" "github.com/stretchr/testify/require" - "github.com/grafana/loki/v3/pkg/chunkenc" + "github.com/grafana/loki/v3/pkg/compression" v2 "github.com/grafana/loki/v3/pkg/iter/v2" "github.com/grafana/loki/v3/pkg/storage/bloom/v1/filter" "github.com/grafana/loki/v3/pkg/util/mempool" @@ -61,7 +61,7 @@ func TestFusedQuerier(t *testing.T) { BlockOptions{ Schema: Schema{ version: CurrentSchemaVersion, - encoding: chunkenc.EncSnappy, + encoding: compression.EncSnappy, }, SeriesPageSize: 100, BloomPageSize: 10 << 10, @@ -155,7 +155,7 @@ func TestFuseMultiPage(t *testing.T) { BlockOptions{ Schema: Schema{ version: CurrentSchemaVersion, - encoding: chunkenc.EncSnappy, + encoding: compression.EncSnappy, nGramLength: 3, // we test trigrams nGramSkip: 0, }, @@ -312,7 +312,7 @@ func TestLazyBloomIter_Seek_ResetError(t *testing.T) { BlockOptions{ Schema: Schema{ version: CurrentSchemaVersion, - encoding: chunkenc.EncSnappy, + encoding: compression.EncSnappy, }, SeriesPageSize: 100, BloomPageSize: 10, // So we force one series per page @@ -370,7 +370,7 @@ func TestFusedQuerierSkipsEmptyBlooms(t *testing.T) { BlockOptions{ Schema: Schema{ version: CurrentSchemaVersion, - encoding: chunkenc.EncNone, + encoding: compression.EncNone, }, SeriesPageSize: 100, BloomPageSize: 10 << 10, @@ -431,7 +431,7 @@ func setupBlockForBenchmark(b *testing.B) (*BlockQuerier, [][]Request, []chan Ou BlockOptions{ Schema: Schema{ version: CurrentSchemaVersion, - encoding: chunkenc.EncSnappy, + encoding: compression.EncSnappy, }, SeriesPageSize: 256 << 10, // 256k BloomPageSize: 1 << 20, // 1MB diff --git a/pkg/storage/bloom/v1/schema.go b/pkg/storage/bloom/v1/schema.go index 6fd862165423..dd532b61559f 100644 --- a/pkg/storage/bloom/v1/schema.go +++ b/pkg/storage/bloom/v1/schema.go @@ -6,7 +6,7 @@ import ( "github.com/pkg/errors" - "github.com/grafana/loki/v3/pkg/chunkenc" + "github.com/grafana/loki/v3/pkg/compression" "github.com/grafana/loki/v3/pkg/util/encoding" ) @@ -39,14 +39,14 @@ var ( type Schema struct { version Version - encoding chunkenc.Encoding + encoding compression.Encoding nGramLength, nGramSkip uint64 } func NewSchema() Schema { return Schema{ version: CurrentSchemaVersion, - encoding: chunkenc.EncNone, + encoding: compression.EncNone, nGramLength: 0, nGramSkip: 0, } @@ -78,12 +78,12 @@ func (s Schema) Len() int { return 4 + 1 + 1 + 8 + 8 } -func (s *Schema) DecompressorPool() chunkenc.ReaderPool { - return chunkenc.GetReaderPool(s.encoding) +func (s *Schema) DecompressorPool() compression.ReaderPool { + return compression.GetReaderPool(s.encoding) } -func (s *Schema) CompressorPool() chunkenc.WriterPool { - return chunkenc.GetWriterPool(s.encoding) +func (s *Schema) CompressorPool() compression.WriterPool { + return compression.GetWriterPool(s.encoding) } func (s *Schema) Encode(enc *encoding.Encbuf) { @@ -118,8 +118,8 @@ func (s *Schema) Decode(dec *encoding.Decbuf) error { return errors.Errorf("invalid version. expected %d, got %d", 3, s.version) } - s.encoding = chunkenc.Encoding(dec.Byte()) - if _, err := chunkenc.ParseEncoding(s.encoding.String()); err != nil { + s.encoding = compression.Encoding(dec.Byte()) + if _, err := compression.ParseEncoding(s.encoding.String()); err != nil { return errors.Wrap(err, "parsing encoding") } diff --git a/pkg/storage/bloom/v1/test_util.go b/pkg/storage/bloom/v1/test_util.go index 3bca46865c75..6de5c85be4b8 100644 --- a/pkg/storage/bloom/v1/test_util.go +++ b/pkg/storage/bloom/v1/test_util.go @@ -9,7 +9,7 @@ import ( "github.com/prometheus/common/model" "github.com/stretchr/testify/require" - "github.com/grafana/loki/v3/pkg/chunkenc" + "github.com/grafana/loki/v3/pkg/compression" iter "github.com/grafana/loki/v3/pkg/iter/v2" "github.com/grafana/loki/pkg/push" @@ -30,7 +30,7 @@ func MakeBlock(t testing.TB, nth int, fromFp, throughFp model.Fingerprint, fromT BlockOptions{ Schema: Schema{ version: CurrentSchemaVersion, - encoding: chunkenc.EncSnappy, + encoding: compression.EncSnappy, nGramLength: 4, // see DefaultNGramLength in bloom_tokenizer_test.go nGramSkip: 0, // see DefaultNGramSkip in bloom_tokenizer_test.go }, diff --git a/pkg/storage/bloom/v1/versioned_builder_test.go b/pkg/storage/bloom/v1/versioned_builder_test.go index 2fb45e13d63e..2ef08daad893 100644 --- a/pkg/storage/bloom/v1/versioned_builder_test.go +++ b/pkg/storage/bloom/v1/versioned_builder_test.go @@ -6,7 +6,7 @@ import ( "github.com/stretchr/testify/require" - "github.com/grafana/loki/v3/pkg/chunkenc" + "github.com/grafana/loki/v3/pkg/compression" v2 "github.com/grafana/loki/v3/pkg/iter/v2" "github.com/grafana/loki/v3/pkg/util/encoding" "github.com/grafana/loki/v3/pkg/util/mempool" @@ -14,7 +14,7 @@ import ( // smallBlockOpts returns a set of block options that are suitable for testing // characterized by small page sizes -func smallBlockOpts(v Version, enc chunkenc.Encoding) BlockOptions { +func smallBlockOpts(v Version, enc compression.Encoding) BlockOptions { return BlockOptions{ Schema: Schema{ version: v, @@ -35,7 +35,7 @@ func setup(v Version) (BlockOptions, []SeriesWithBlooms, BlockWriter, BlockReade bloomsBuf := bytes.NewBuffer(nil) writer := NewMemoryBlockWriter(indexBuf, bloomsBuf) reader := NewByteReader(indexBuf, bloomsBuf) - return smallBlockOpts(v, chunkenc.EncNone), data, writer, reader + return smallBlockOpts(v, compression.EncNone), data, writer, reader } func TestV3Roundtrip(t *testing.T) { diff --git a/pkg/storage/chunk/cache/cache_test.go b/pkg/storage/chunk/cache/cache_test.go index c6ab61666b88..2f236c1f40e4 100644 --- a/pkg/storage/chunk/cache/cache_test.go +++ b/pkg/storage/chunk/cache/cache_test.go @@ -15,6 +15,7 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/chunkenc" + "github.com/grafana/loki/v3/pkg/compression" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/storage/chunk" "github.com/grafana/loki/v3/pkg/storage/chunk/cache" @@ -34,7 +35,7 @@ func fillCache(t *testing.T, scfg config.SchemaConfig, cache cache.Cache) ([]str for i := 0; i < 111; i++ { ts := model.TimeFromUnix(int64(i * chunkLen)) - cs := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, chunkenc.EncGZIP, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 256*1024, 0) + cs := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncGZIP, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 256*1024, 0) _, err := cs.Append(&logproto.Entry{ Timestamp: ts.Time(), diff --git a/pkg/storage/chunk/client/grpc/grpc_client_test.go b/pkg/storage/chunk/client/grpc/grpc_client_test.go index b0bcffce91eb..d40d825a9442 100644 --- a/pkg/storage/chunk/client/grpc/grpc_client_test.go +++ b/pkg/storage/chunk/client/grpc/grpc_client_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/chunkenc" + "github.com/grafana/loki/v3/pkg/compression" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/storage/chunk" "github.com/grafana/loki/v3/pkg/storage/config" @@ -81,7 +82,7 @@ func TestGrpcStore(t *testing.T) { newChunkData := func() chunk.Data { return chunkenc.NewFacade( chunkenc.NewMemChunk( - chunkenc.ChunkFormatV3, chunkenc.EncNone, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 256*1024, 0, + chunkenc.ChunkFormatV3, compression.EncNone, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 256*1024, 0, ), 0, 0) } diff --git a/pkg/storage/chunk/client/testutils/testutils.go b/pkg/storage/chunk/client/testutils/testutils.go index b34e75a6a166..e436c1335f21 100644 --- a/pkg/storage/chunk/client/testutils/testutils.go +++ b/pkg/storage/chunk/client/testutils/testutils.go @@ -13,6 +13,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/grafana/loki/v3/pkg/chunkenc" + "github.com/grafana/loki/v3/pkg/compression" "github.com/grafana/loki/v3/pkg/ingester/client" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/storage/chunk" @@ -86,7 +87,7 @@ func CreateChunks(scfg config.SchemaConfig, startIndex, batchSize int, from mode } func DummyChunkFor(from, through model.Time, metric labels.Labels) chunk.Chunk { - cs := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, chunkenc.EncGZIP, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 256*1024, 0) + cs := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncGZIP, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 256*1024, 0) for ts := from; ts <= through; ts = ts.Add(15 * time.Second) { _, err := cs.Append(&logproto.Entry{Timestamp: ts.Time(), Line: fmt.Sprintf("line ts=%d", ts)}) diff --git a/pkg/storage/chunk/fetcher/fetcher_test.go b/pkg/storage/chunk/fetcher/fetcher_test.go index 03efc9afdc80..58123957919b 100644 --- a/pkg/storage/chunk/fetcher/fetcher_test.go +++ b/pkg/storage/chunk/fetcher/fetcher_test.go @@ -13,6 +13,7 @@ import ( "golang.org/x/exp/slices" "github.com/grafana/loki/v3/pkg/chunkenc" + "github.com/grafana/loki/v3/pkg/compression" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/storage/chunk" "github.com/grafana/loki/v3/pkg/storage/chunk/cache" @@ -311,7 +312,7 @@ func makeChunks(now time.Time, tpls ...c) []chunk.Chunk { from := int(chk.from) / int(time.Hour) // This is only here because it's helpful for debugging. // This isn't even the write format for Loki but we dont' care for the sake of these tests. - memChk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, chunkenc.EncNone, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 256*1024, 0) + memChk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncNone, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 256*1024, 0) // To make sure the fetcher doesn't swap keys and buffers each chunk is built with different, but deterministic data for i := 0; i < from; i++ { _, _ = memChk.Append(&logproto.Entry{ diff --git a/pkg/storage/hack/main.go b/pkg/storage/hack/main.go index 74257a8ba6ad..b2d01d2e41e0 100644 --- a/pkg/storage/hack/main.go +++ b/pkg/storage/hack/main.go @@ -15,6 +15,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/grafana/loki/v3/pkg/chunkenc" + "github.com/grafana/loki/v3/pkg/compression" "github.com/grafana/loki/v3/pkg/ingester/client" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/syntax" @@ -103,7 +104,7 @@ func fillStore(cm storage.ClientMetrics) error { labelsBuilder.Set(labels.MetricName, "logs") metric := labelsBuilder.Labels() fp := client.Fingerprint(lbs) - chunkEnc := chunkenc.NewMemChunk(chunkfmt, chunkenc.EncLZ4_4M, headfmt, 262144, 1572864) + chunkEnc := chunkenc.NewMemChunk(chunkfmt, compression.EncLZ4_4M, headfmt, 262144, 1572864) for ts := start.UnixNano(); ts < start.UnixNano()+time.Hour.Nanoseconds(); ts = ts + time.Millisecond.Nanoseconds() { entry := &logproto.Entry{ Timestamp: time.Unix(0, ts), @@ -126,7 +127,7 @@ func fillStore(cm storage.ClientMetrics) error { if flushCount >= maxChunks { return } - chunkEnc = chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, chunkenc.EncLZ4_64k, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 262144, 1572864) + chunkEnc = chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncLZ4_64k, chunkenc.UnorderedWithStructuredMetadataHeadBlockFmt, 262144, 1572864) } } }(i) diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index 101c906b8b4f..c509783d8661 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -13,6 +13,7 @@ import ( "testing" "time" + "github.com/grafana/loki/v3/pkg/compression" "github.com/grafana/loki/v3/pkg/storage/types" "github.com/grafana/loki/v3/pkg/util/httpreq" @@ -2035,7 +2036,7 @@ func TestQueryReferencingStructuredMetadata(t *testing.T) { metric := labelsBuilder.Labels() fp := client.Fingerprint(lbs) - chunkEnc := chunkenc.NewMemChunk(chunkfmt, chunkenc.EncLZ4_4M, headfmt, 262144, 1572864) + chunkEnc := chunkenc.NewMemChunk(chunkfmt, compression.EncLZ4_4M, headfmt, 262144, 1572864) for ts := chkFrom; !ts.After(chkThrough); ts = ts.Add(time.Second) { entry := logproto.Entry{ Timestamp: ts, diff --git a/pkg/storage/stores/series/series_store_test.go b/pkg/storage/stores/series/series_store_test.go index 553ea945f94f..3bd136cb3b61 100644 --- a/pkg/storage/stores/series/series_store_test.go +++ b/pkg/storage/stores/series/series_store_test.go @@ -18,6 +18,7 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/chunkenc" + "github.com/grafana/loki/v3/pkg/compression" "github.com/grafana/loki/v3/pkg/ingester/client" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logqlmodel/stats" @@ -752,7 +753,7 @@ func dummyChunkWithFormat(t testing.TB, now model.Time, metric labels.Labels, fo samples := 1 chunkStart := now.Add(-time.Hour) - chk := chunkenc.NewMemChunk(format, chunkenc.EncGZIP, headfmt, 256*1024, 0) + chk := chunkenc.NewMemChunk(format, compression.EncGZIP, headfmt, 256*1024, 0) for i := 0; i < samples; i++ { ts := time.Duration(i) * 15 * time.Second dup, err := chk.Append(&logproto.Entry{Timestamp: chunkStart.Time().Add(ts), Line: fmt.Sprintf("line %d", i)}) diff --git a/pkg/storage/stores/series_store_write_test.go b/pkg/storage/stores/series_store_write_test.go index f58ec1a730c5..a24608675a3d 100644 --- a/pkg/storage/stores/series_store_write_test.go +++ b/pkg/storage/stores/series_store_write_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/chunkenc" + "github.com/grafana/loki/v3/pkg/compression" "github.com/grafana/loki/v3/pkg/logqlmodel/stats" "github.com/grafana/loki/v3/pkg/storage/chunk" "github.com/grafana/loki/v3/pkg/storage/chunk/fetcher" @@ -92,7 +93,7 @@ func TestChunkWriter_PutOne(t *testing.T) { chunkfmt, headfmt, err := periodConfig.ChunkFormat() require.NoError(t, err) - memchk := chunkenc.NewMemChunk(chunkfmt, chunkenc.EncGZIP, headfmt, 256*1024, 0) + memchk := chunkenc.NewMemChunk(chunkfmt, compression.EncGZIP, headfmt, 256*1024, 0) chk := chunk.NewChunk("fake", model.Fingerprint(0), []labels.Label{{Name: "foo", Value: "bar"}}, chunkenc.NewFacade(memchk, 0, 0), 100, 400) for name, tc := range map[string]struct { diff --git a/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/util.go b/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/util.go index 6f1b0326a5cc..a7ea7af3b05e 100644 --- a/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/util.go +++ b/pkg/storage/stores/shipper/indexshipper/boltdb/compactor/util.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/chunkenc" + "github.com/grafana/loki/v3/pkg/compression" ingesterclient "github.com/grafana/loki/v3/pkg/ingester/client" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/storage/chunk" @@ -31,7 +32,7 @@ func createChunk(t testing.TB, chunkFormat byte, headBlockFmt chunkenc.HeadBlock labelsBuilder.Set(labels.MetricName, "logs") metric := labelsBuilder.Labels() fp := ingesterclient.Fingerprint(lbs) - chunkEnc := chunkenc.NewMemChunk(chunkFormat, chunkenc.EncSnappy, headBlockFmt, blockSize, targetSize) + chunkEnc := chunkenc.NewMemChunk(chunkFormat, compression.EncSnappy, headBlockFmt, blockSize, targetSize) for ts := from; !ts.After(through); ts = ts.Add(1 * time.Minute) { dup, err := chunkEnc.Append(&logproto.Entry{ diff --git a/pkg/storage/stores/shipper/indexshipper/uploads/index_set.go b/pkg/storage/stores/shipper/indexshipper/uploads/index_set.go index 19bf88842b02..36dc13850956 100644 --- a/pkg/storage/stores/shipper/indexshipper/uploads/index_set.go +++ b/pkg/storage/stores/shipper/indexshipper/uploads/index_set.go @@ -11,7 +11,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/grafana/loki/v3/pkg/chunkenc" + "github.com/grafana/loki/v3/pkg/compression" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/index" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/storage" util_log "github.com/grafana/loki/v3/pkg/util/log" @@ -145,8 +145,9 @@ func (t *indexSet) uploadIndex(ctx context.Context, idx index.Index) error { } }() - compressedWriter := chunkenc.Gzip.GetWriter(f) - defer chunkenc.Gzip.PutWriter(compressedWriter) + gzipPool := compression.GetWriterPool(compression.EncGZIP) + compressedWriter := gzipPool.GetWriter(f) + defer gzipPool.PutWriter(compressedWriter) idxReader, err := idx.Reader() if err != nil { diff --git a/pkg/storage/util_test.go b/pkg/storage/util_test.go index 5ef02e74b1ca..dd535197afb3 100644 --- a/pkg/storage/util_test.go +++ b/pkg/storage/util_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/grafana/loki/v3/pkg/chunkenc" + "github.com/grafana/loki/v3/pkg/compression" "github.com/grafana/loki/v3/pkg/ingester/client" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/syntax" @@ -108,7 +109,7 @@ func newChunk(chunkFormat byte, headBlockFmt chunkenc.HeadBlockFmt, stream logpr lbs = builder.Labels() } from, through := loki_util.RoundToMilliseconds(stream.Entries[0].Timestamp, stream.Entries[len(stream.Entries)-1].Timestamp) - chk := chunkenc.NewMemChunk(chunkFormat, chunkenc.EncGZIP, headBlockFmt, 256*1024, 0) + chk := chunkenc.NewMemChunk(chunkFormat, compression.EncGZIP, headBlockFmt, 256*1024, 0) for _, e := range stream.Entries { _, _ = chk.Append(&e) } diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 75128607b882..58e93b3937d8 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -19,8 +19,8 @@ import ( "golang.org/x/time/rate" "gopkg.in/yaml.v2" - "github.com/grafana/loki/v3/pkg/chunkenc" "github.com/grafana/loki/v3/pkg/compactor/deletionmode" + "github.com/grafana/loki/v3/pkg/compression" "github.com/grafana/loki/v3/pkg/distributor/shardstreams" "github.com/grafana/loki/v3/pkg/loghttp/push" "github.com/grafana/loki/v3/pkg/logql" @@ -496,7 +496,7 @@ func (l *Limits) Validate() error { return errors.Wrap(err, "invalid tsdb sharding strategy") } - if _, err := chunkenc.ParseEncoding(l.BloomBlockEncoding); err != nil { + if _, err := compression.ParseEncoding(l.BloomBlockEncoding); err != nil { return err } diff --git a/pkg/validation/limits_test.go b/pkg/validation/limits_test.go index 2d4457c2a119..87fab6837029 100644 --- a/pkg/validation/limits_test.go +++ b/pkg/validation/limits_test.go @@ -12,8 +12,8 @@ import ( "github.com/stretchr/testify/require" "gopkg.in/yaml.v2" - "github.com/grafana/loki/v3/pkg/chunkenc" "github.com/grafana/loki/v3/pkg/compactor/deletionmode" + "github.com/grafana/loki/v3/pkg/compression" "github.com/grafana/loki/v3/pkg/loghttp/push" "github.com/grafana/loki/v3/pkg/logql" ) @@ -339,7 +339,7 @@ func TestLimitsValidation(t *testing.T) { }, { limits: Limits{DeletionMode: "disabled", BloomBlockEncoding: "unknown"}, - expected: fmt.Errorf("invalid encoding: unknown, supported: %s", chunkenc.SupportedEncoding()), + expected: fmt.Errorf("invalid encoding: unknown, supported: %s", compression.SupportedEncoding()), }, } { desc := fmt.Sprintf("%s/%s", tc.limits.DeletionMode, tc.limits.BloomBlockEncoding) diff --git a/tools/tsdb/migrate-versions/main.go b/tools/tsdb/migrate-versions/main.go index 8469cd560711..e4fb39e69a4f 100644 --- a/tools/tsdb/migrate-versions/main.go +++ b/tools/tsdb/migrate-versions/main.go @@ -17,7 +17,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - "github.com/grafana/loki/v3/pkg/chunkenc" + "github.com/grafana/loki/v3/pkg/compression" "github.com/grafana/loki/v3/pkg/loki" "github.com/grafana/loki/v3/pkg/storage" "github.com/grafana/loki/v3/pkg/storage/chunk/client/util" @@ -257,8 +257,9 @@ func uploadFile(idx shipperindex.Index, indexStorageClient shipperstorage.Client } }() - compressedWriter := chunkenc.Gzip.GetWriter(f) - defer chunkenc.Gzip.PutWriter(compressedWriter) + gzipPool := compression.GetWriterPool(compression.EncGZIP) + compressedWriter := gzipPool.GetWriter(f) + defer gzipPool.PutWriter(compressedWriter) idxReader, err := idx.Reader() if err != nil {