Skip to content

Commit

Permalink
Use a FileReader when decoding the symbol table.
Browse files Browse the repository at this point in the history
Note that this implementation is not goroutine safe - I've only done
the minimum needed to switch from a BufReader to a FileReader.
  • Loading branch information
charleskorn committed Nov 29, 2022
1 parent 1316da0 commit ee9f51a
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 77 deletions.
102 changes: 62 additions & 40 deletions pkg/storegateway/indexheader/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
package index

import (
"fmt"
"encoding/binary"
"hash/crc32"
"os"
"sort"
"unsafe"

Expand All @@ -41,10 +42,12 @@ type ByteSlice interface {
}

type Symbols struct {
bs ByteSlice
r stream_encoding.Reader
version int
symbolTablePosition int
// TODO: we shouldn't be sharing a single file descriptor here, as we will use it from multiple goroutines simultaneously -
// pass in file path and create readers when required?
f *os.File
version int
tableLength int
tableOffset int

offsets []int
seen int
Expand All @@ -53,53 +56,70 @@ type Symbols struct {
const symbolFactor = 32

// NewSymbols returns a Symbols object for symbol lookups.
// bs should contain the Decbuf-encoded symbol table, including leading length and trailing checksum bytes.
// symbolTablePosition should be the offset, from the beginning of the index file, of the symbol table.
func NewSymbols(bs ByteSlice, version, symbolTablePosition int) (*Symbols, error) {
r, err := stream_encoding.NewBufReader(bs)
// f should contain a Decbuf-encoded symbol table at offset.
func NewSymbols(f *os.File, version, offset int) (*Symbols, error) {
lengthBytes := make([]byte, 4)
n, err := f.ReadAt(lengthBytes, int64(offset))
if err != nil {
return nil, err
}
if n != 4 {
return nil, errors.Wrapf(stream_encoding.ErrInvalidSize, "insufficient bytes read for symbol table size (got %d, wanted %d)", n, 4)
}

s := &Symbols{
bs: bs,
r: r,
version: version,
symbolTablePosition: symbolTablePosition,
f: f,
version: version,
tableLength: len(lengthBytes) + int(binary.BigEndian.Uint32(lengthBytes)) + 4,
tableOffset: offset,
}

r, err := stream_encoding.NewFileReader(f, offset, s.tableLength)
if err != nil {
return nil, errors.Wrap(err, "create symbol table file reader")
}

d := stream_encoding.NewDecbuf(r, 0, castagnoliTable)
if d.Err() != nil {
fmt.Printf("error: %v\n", d.Err())
return nil, errors.Wrap(err, "decode symbol table")
}

var (
origLen = d.Len()
cnt = d.Be32int()
basePos = 4
)
origLen := d.Len()
cnt := d.Be32int()
basePos := 4
s.offsets = make([]int, 0, 1+cnt/symbolFactor)
for d.Err() == nil && s.seen < cnt {
//fmt.Printf("seen=%d cnt=%d\n", s.seen, cnt)

if s.seen%symbolFactor == 0 {
s.offsets = append(s.offsets, basePos+origLen-d.Len())
}
d.UvarintBytes() // The symbol.
s.seen++
}
//fmt.Printf("finished\n")

if d.Err() != nil {
return nil, d.Err()
}
//fmt.Printf("finished good\n")

return s, nil
}

func (s Symbols) Lookup(o uint32) (string, error) {
// r := stream_encoding.NewBufReader(s.bs)
// newRawDecbuf returns a Decbuf for reading the contents of this symbol table.
// It does not check the integrity of the symbol table, as it is assumed that this is
// done in NewSymbols.
func (s Symbols) newRawDecbuf() (stream_encoding.Decbuf, error) {
r, err := stream_encoding.NewFileReader(s.f, s.tableOffset, s.tableLength)
if err != nil {
return stream_encoding.Decbuf{}, errors.Wrap(err, "create symbol table file reader")
}

return stream_encoding.NewDecbufRawReader(r), nil
}

d := stream_encoding.NewDecbufRawReader(s.r)
func (s Symbols) Lookup(o uint32) (string, error) {
d, err := s.newRawDecbuf()
if err != nil {
return "", err
}

if s.version == index.FormatV2 {
if int(o) >= s.seen {
Expand All @@ -114,7 +134,7 @@ func (s Symbols) Lookup(o uint32) (string, error) {
// In v1, o is relative to the beginning of the whole index header file, so we
// need to adjust for the fact our view into the file starts at the beginning
// of the symbol table.
offsetInTable := int(o) - s.symbolTablePosition
offsetInTable := int(o) - s.tableOffset
d.Skip(offsetInTable)
}
sym := d.UvarintStr()
Expand All @@ -128,22 +148,24 @@ func (s Symbols) ReverseLookup(sym string) (uint32, error) {
if len(s.offsets) == 0 {
return 0, errors.Errorf("unknown symbol %q - no symbols", sym)
}

i := sort.Search(len(s.offsets), func(i int) bool {
// Any decoding errors here will be lost, however
// we already read through all of this at startup.
d := stream_encoding.NewDecbufRawReader(s.r)
// d := stream_encoding.NewDecbufRaw2(s.bs)
//d := encoding.Decbuf{
// B: s.bs.Range(0, s.bs.Len()),
//}
// TODO: don't create a new Decbuf instance for every call of this function -
// instead, add a Seek() method to Decbuf and use the one instance for the entirety of ReverseLookup()
d, err := s.newRawDecbuf()
if err != nil {
panic(err)
}

d.Skip(s.offsets[i])
return yoloString(d.UvarintBytes()) > sym
})
//d := stream_encoding.NewDecbufRaw2(s.bs)
d := stream_encoding.NewDecbufRawReader(s.r)
//d := encoding.Decbuf{
// B: s.bs.Range(0, s.bs.Len()),
//}

d, err := s.newRawDecbuf()
if err != nil {
return 0, err
}

if i > 0 {
i--
}
Expand All @@ -168,7 +190,7 @@ func (s Symbols) ReverseLookup(sym string) (uint32, error) {
if s.version == index.FormatV2 {
return uint32(res), nil
}
return uint32(s.bs.Len() - lastLen), nil
return uint32(s.tableLength - lastLen), nil
}

func (s Symbols) Size() int {
Expand Down
28 changes: 12 additions & 16 deletions pkg/storegateway/indexheader/index/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ package index

import (
"hash/crc32"
"os"
"path"
"testing"

"github.com/stretchr/testify/require"
Expand All @@ -30,20 +32,6 @@ func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}

type realByteSlice []byte

func (b realByteSlice) Len() int {
return len(b)
}

func (b realByteSlice) Range(start, end int) []byte {
return b[start:end]
}

func (b realByteSlice) Sub(start, end int) index.ByteSlice {
return b[start:end]
}

func TestSymbols(t *testing.T) {
buf := encoding.Encbuf{}

Expand All @@ -60,9 +48,17 @@ func TestSymbols(t *testing.T) {
checksum := crc32.Checksum(buf.Get()[symbolsStart+4:], castagnoliTable)
buf.PutBE32(checksum) // Check sum at the end.

d := stream_encoding.NewDecbufAt(realByteSlice(buf.Get()), symbolsStart, castagnoliTable)
dir := t.TempDir()
filePath := path.Join(dir, "index")
require.NoError(t, os.WriteFile(filePath, buf.Get(), 0700))

f, err := os.Open(filePath)
require.NoError(t, err)
r, err := stream_encoding.NewFileReader(f, 0, buf.Len())
require.NoError(t, err)
d := stream_encoding.NewDecbuf(r, symbolsStart, castagnoliTable)
require.NoError(t, d.E)
s, err := NewSymbols(realByteSlice(buf.Get()[symbolsStart:]), index.FormatV2, symbolsStart)
s, err := NewSymbols(f, index.FormatV2, symbolsStart)
require.NoError(t, err)

// We store only 4 offsets to symbols.
Expand Down
23 changes: 2 additions & 21 deletions pkg/storegateway/indexheader/stream_binary_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,30 +120,11 @@ func newFileStreamBinaryReader(path string, postingOffsetsInMemSampling int) (bw
return nil, errors.Wrap(err, "read index header TOC")
}

symbolsByteSlice, err := readDecbufBytes(f, int64(r.toc.Symbols))
if err != nil {
return nil, errors.Wrap(err, "read symbols")
}

lengthBytes := make([]byte, 4)
n, err = f.ReadAt(lengthBytes, int64(r.toc.Symbols))
if err != nil {
return nil, err
}
//length := len(lengthBytes) + int(binary.BigEndian.Uint32(lengthBytes)) + 4

//fr := stream_encoding.NewFileReader(f, int(r.toc.Symbols), length)
r.symbols, err = stream_index.NewSymbols(symbolsByteSlice, r.indexVersion, int(r.toc.Symbols))
r.symbols, err = stream_index.NewSymbols(f, r.indexVersion, int(r.toc.Symbols))
if err != nil {
return nil, errors.Wrap(err, "load symbols")
}

// TODO(bwplotka): Consider contributing to Prometheus to allow specifying custom number for symbolsFactor.
// r.symbols, err = stream_index.NewSymbols(symbolsByteSlice, r.indexVersion, 0)
// if err != nil {
// return nil, errors.Wrap(err, "read symbols")
// }

var lastKey []string
if r.indexVersion == index.FormatV1 {
// Earlier V1 formats don't have a sorted postings offset table, so
Expand Down Expand Up @@ -326,7 +307,7 @@ func newBinaryTOCFromFile(f *os.File) (*BinaryTOC, error) {
Symbols: d.Be64(),
PostingsOffsetTable: d.Be64(),
}

if err := d.Err(); err != nil {
return nil, err
}
Expand Down

0 comments on commit ee9f51a

Please sign in to comment.