Skip to content

Commit

Permalink
fix: try reading chunks which have incorrect offset for blocks (#13720)
Browse files Browse the repository at this point in the history
  • Loading branch information
sandeepsukhani authored Aug 4, 2024
1 parent 629671e commit 7e224d5
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 10 deletions.
62 changes: 52 additions & 10 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,13 +441,20 @@ func newByteChunk(b []byte, blockSize, targetSize int, fromCheckpoint bool) (*Me

metasOffset := uint64(0)
metasLen := uint64(0)
// There is a rare issue where chunks built by Loki have incorrect offset for some blocks which causes Loki to fail to read those chunks.
// While the root cause is yet to be identified, we will try to read those problematic chunks using the expected offset for blocks calculated using other relative offsets in the chunk.
expectedBlockOffset := 0
if version >= ChunkFormatV4 {
// version >= 4 starts writing length of sections after their offsets
// version >= 4 starts writing length of sections before their offsets
metasLen, metasOffset = readSectionLenAndOffset(chunkMetasSectionIdx)
structuredMetadataLength, structuredMetadataOffset := readSectionLenAndOffset(chunkStructuredMetadataSectionIdx)
expectedBlockOffset = int(structuredMetadataLength + structuredMetadataOffset + 4)
} else {
// version <= 3 does not store length of metas. metas are followed by metasOffset + hash and then the chunk ends
metasOffset = binary.BigEndian.Uint64(b[len(b)-8:])
metasLen = uint64(len(b)-(8+4)) - metasOffset
// version 1 writes blocks after version number while version 2 and 3 write blocks after chunk encoding
expectedBlockOffset = len(b) - len(db.b)
}
mb := b[metasOffset : metasOffset+metasLen]
db = decbuf{b: mb}
Expand Down Expand Up @@ -476,18 +483,35 @@ func newByteChunk(b []byte, blockSize, targetSize int, fromCheckpoint bool) (*Me
blk.uncompressedSize = db.uvarint()
}
l := db.uvarint()
if blk.offset+l > len(b) {
return nil, fmt.Errorf("block %d offset %d + length %d exceeds chunk length %d", i, blk.offset, l, len(b))
}
blk.b = b[blk.offset : blk.offset+l]

// Verify checksums.
expCRC := binary.BigEndian.Uint32(b[blk.offset+l:])
if expCRC != crc32.Checksum(blk.b, castagnoliTable) {
_ = level.Error(util_log.Logger).Log("msg", "Checksum does not match for a block in chunk, this block will be skipped", "err", ErrInvalidChecksum)
continue
invalidBlockErr := validateBlock(b, blk.offset, l)
if invalidBlockErr != nil {
level.Error(util_log.Logger).Log("msg", "invalid block found", "err", invalidBlockErr)
// if block is expected to have different offset than what is encoded, see if we get a valid block using expected offset
if blk.offset != expectedBlockOffset {
_ = level.Error(util_log.Logger).Log("msg", "block offset does not match expected one, will try reading with expected offset", "actual", blk.offset, "expected", expectedBlockOffset)
blk.offset = expectedBlockOffset
if err := validateBlock(b, blk.offset, l); err != nil {
level.Error(util_log.Logger).Log("msg", "could not find valid block using expected offset", "err", err)
} else {
invalidBlockErr = nil
level.Info(util_log.Logger).Log("msg", "valid block found using expected offset")
}
}

// if the block read with expected offset is still invalid, do not continue further
if invalidBlockErr != nil {
if errors.Is(invalidBlockErr, ErrInvalidChecksum) {
expectedBlockOffset += l + 4
continue
}
return nil, invalidBlockErr
}
}

// next block starts at current block start + current block length + checksum
expectedBlockOffset = blk.offset + l + 4
blk.b = b[blk.offset : blk.offset+l]
bc.blocks = append(bc.blocks, blk)

// Update the counter used to track the size of cut blocks.
Expand Down Expand Up @@ -1696,3 +1720,21 @@ func (e *sampleBufferedIterator) StreamHash() uint64 { return e.extractor.BaseLa
func (e *sampleBufferedIterator) At() logproto.Sample {
return e.cur
}

// validateBlock validates block by doing following checks:
// 1. Offset+length do not overrun size of the chunk from which we are reading the block.
// 2. Checksum of the block we will read matches the stored checksum in the chunk.
func validateBlock(chunkBytes []byte, offset, length int) error {
if offset+length > len(chunkBytes) {
return fmt.Errorf("offset %d + length %d exceeds chunk length %d", offset, length, len(chunkBytes))
}

blockBytes := chunkBytes[offset : offset+length]
// Verify checksums.
expCRC := binary.BigEndian.Uint32(chunkBytes[offset+length:])
if expCRC != crc32.Checksum(blockBytes, castagnoliTable) {
return ErrInvalidChecksum
}

return nil
}
117 changes: 117 additions & 0 deletions pkg/chunkenc/memchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"encoding/binary"
"fmt"
"hash"
"math"
"math/rand"
"sort"
Expand Down Expand Up @@ -2044,3 +2045,119 @@ func TestMemChunk_IteratorWithStructuredMetadata(t *testing.T) {
})
}
}

func TestDecodeChunkIncorrectBlockOffset(t *testing.T) {
// use small block size to build multiple blocks in the test chunk
blockSize := 10

for _, format := range allPossibleFormats {
t.Run(fmt.Sprintf("chunkFormat:%v headBlockFmt:%v", format.chunkFormat, format.headBlockFmt), func(t *testing.T) {
for incorrectOffsetBlockNum := 0; incorrectOffsetBlockNum < 3; incorrectOffsetBlockNum++ {
t.Run(fmt.Sprintf("inorrect offset block: %d", incorrectOffsetBlockNum), func(t *testing.T) {
chk := NewMemChunk(format.chunkFormat, EncNone, format.headBlockFmt, blockSize, testTargetSize)
ts := time.Now().Unix()
for i := 0; i < 3; i++ {
dup, err := chk.Append(&logproto.Entry{
Timestamp: time.Now(),
Line: fmt.Sprintf("%d-%d", ts, i),
StructuredMetadata: []logproto.LabelAdapter{
{Name: "foo", Value: fmt.Sprintf("%d-%d", ts, i)},
},
})
require.NoError(t, err)
require.False(t, dup)
}

require.Len(t, chk.blocks, 3)

b, err := chk.Bytes()
require.NoError(t, err)

metasOffset := binary.BigEndian.Uint64(b[len(b)-8:])

w := bytes.NewBuffer(nil)
eb := EncodeBufferPool.Get().(*encbuf)
defer EncodeBufferPool.Put(eb)

crc32Hash := crc32HashPool.Get().(hash.Hash32)
defer crc32HashPool.Put(crc32Hash)

crc32Hash.Reset()
eb.reset()

// BEGIN - code copied from writeTo func starting from encoding of block metas to change offset of a block
eb.putUvarint(len(chk.blocks))

for i, b := range chk.blocks {
eb.putUvarint(b.numEntries)
eb.putVarint64(b.mint)
eb.putVarint64(b.maxt)
// change offset of one block
blockOffset := b.offset
if i == incorrectOffsetBlockNum {
blockOffset += 5
}
eb.putUvarint(blockOffset)
if chk.format >= ChunkFormatV3 {
eb.putUvarint(b.uncompressedSize)
}
eb.putUvarint(len(b.b))
}
metasLen := len(eb.get())
eb.putHash(crc32Hash)

_, err = w.Write(eb.get())
require.NoError(t, err)

if chk.format >= ChunkFormatV4 {
// Write structured metadata offset and length
eb.reset()

eb.putBE64int(int(binary.BigEndian.Uint64(b[len(b)-32:])))
eb.putBE64int(int(binary.BigEndian.Uint64(b[len(b)-24:])))
_, err = w.Write(eb.get())
require.NoError(t, err)
}

// Write the metasOffset.
eb.reset()
if chk.format >= ChunkFormatV4 {
eb.putBE64int(metasLen)
}
eb.putBE64int(int(metasOffset))
_, err = w.Write(eb.get())
require.NoError(t, err)
// END - code copied from writeTo func

// build chunk using pre-block meta section + rewritten remainder of the chunk with incorrect offset for a block
chkWithIncorrectOffset := make([]byte, int(metasOffset)+w.Len())
copy(chkWithIncorrectOffset, b[:metasOffset])
copy(chkWithIncorrectOffset[metasOffset:], w.Bytes())

// decoding the problematic chunk should succeed
decodedChkWithIncorrectOffset, err := newByteChunk(chkWithIncorrectOffset, blockSize, testTargetSize, false)
require.NoError(t, err)

require.Len(t, decodedChkWithIncorrectOffset.blocks, len(chk.blocks))

// both chunks should have same log lines
origChunkItr, err := chk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, log.NewNoopPipeline().ForStream(labels.Labels{}))
require.NoError(t, err)

corruptChunkItr, err := decodedChkWithIncorrectOffset.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, log.NewNoopPipeline().ForStream(labels.Labels{}))
require.NoError(t, err)

numEntriesFound := 0
for origChunkItr.Next() {
numEntriesFound++
require.True(t, corruptChunkItr.Next())
require.Equal(t, origChunkItr.At(), corruptChunkItr.At())
}

require.False(t, corruptChunkItr.Next())
require.Equal(t, 3, numEntriesFound)
})
}
})
}
}

0 comments on commit 7e224d5

Please sign in to comment.