diff --git a/src/dbnode/encoding/encoding.go b/src/dbnode/encoding/encoding.go index ed9a2f62ec..4d65f5e921 100644 --- a/src/dbnode/encoding/encoding.go +++ b/src/dbnode/encoding/encoding.go @@ -52,7 +52,7 @@ func LeadingAndTrailingZeros(v uint64) (int, int) { } // SignExtend sign extends the highest bit of v which has numBits (<=64) -func SignExtend(v uint64, numBits int) int64 { - shift := uint(64 - numBits) +func SignExtend(v uint64, numBits uint) int64 { + shift := 64 - numBits return (int64(v) << shift) >> shift } diff --git a/src/dbnode/encoding/encoding_mock.go b/src/dbnode/encoding/encoding_mock.go index 8b1c8b1540..e101cf96e1 100644 --- a/src/dbnode/encoding/encoding_mock.go +++ b/src/dbnode/encoding/encoding_mock.go @@ -1344,7 +1344,7 @@ func (mr *MockIStreamMockRecorder) ReadByte() *gomock.Call { } // ReadBits mocks base method -func (m *MockIStream) ReadBits(numBits int) (uint64, error) { +func (m *MockIStream) ReadBits(numBits uint) (uint64, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ReadBits", numBits) ret0, _ := ret[0].(uint64) @@ -1359,7 +1359,7 @@ func (mr *MockIStreamMockRecorder) ReadBits(numBits interface{}) *gomock.Call { } // PeekBits mocks base method -func (m *MockIStream) PeekBits(numBits int) (uint64, error) { +func (m *MockIStream) PeekBits(numBits uint) (uint64, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "PeekBits", numBits) ret0, _ := ret[0].(uint64) @@ -1374,10 +1374,10 @@ func (mr *MockIStreamMockRecorder) PeekBits(numBits interface{}) *gomock.Call { } // RemainingBitsInCurrentByte mocks base method -func (m *MockIStream) RemainingBitsInCurrentByte() int { +func (m *MockIStream) RemainingBitsInCurrentByte() uint { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "RemainingBitsInCurrentByte") - ret0, _ := ret[0].(int) + ret0, _ := ret[0].(uint) return ret0 } diff --git a/src/dbnode/encoding/istream.go b/src/dbnode/encoding/istream.go index cc6686c409..85d3569269 100644 --- a/src/dbnode/encoding/istream.go +++ b/src/dbnode/encoding/istream.go @@ -31,7 +31,7 @@ type istream struct { r *bufio.Reader // encoded stream err error // error encountered current byte // current byte we are working off of - remaining int // bits remaining in current to be read + remaining uint // bits remaining in current to be read } // NewIStream creates a new Istream @@ -93,11 +93,10 @@ func (is *istream) ReadByte() (byte, error) { } // ReadBits reads the next Bits -func (is *istream) ReadBits(numBits int) (uint64, error) { +func (is *istream) ReadBits(numBits uint) (uint64, error) { if is.err != nil { return 0, is.err } - var res uint64 for numBits >= 8 { byteRead, err := is.ReadByte() @@ -121,8 +120,8 @@ func (is *istream) ReadBits(numBits int) (uint64, error) { if is.remaining < numToRead { numToRead = is.remaining } - bits := is.current >> uint(8-numToRead) - is.current <<= uint(numToRead) + bits := is.current >> (8 - numToRead) + is.current <<= numToRead is.remaining -= numToRead res = (res << uint64(numToRead)) | uint64(bits) numBits -= numToRead @@ -131,10 +130,7 @@ func (is *istream) ReadBits(numBits int) (uint64, error) { } // PeekBits looks at the next Bits, but doesn't move the pos -func (is *istream) PeekBits(numBits int) (uint64, error) { - if is.err != nil { - return 0, is.err - } +func (is *istream) PeekBits(numBits uint) (uint64, error) { // check the last byte first if numBits <= is.remaining { return uint64(readBitsInByte(is.current, numBits)), nil @@ -152,25 +148,25 @@ func (is *istream) PeekBits(numBits int) (uint64, error) { numBitsRead += 8 } remainder := readBitsInByte(bytesRead[numBytesToRead-1], numBits-numBitsRead) - res = (res << uint(numBits-numBitsRead)) | uint64(remainder) + res = (res << (numBits - numBitsRead)) | uint64(remainder) return res, nil } // RemainingBitsInCurrentByte returns the number of bits remaining to be read in // the current byte. -func (is *istream) RemainingBitsInCurrentByte() int { +func (is *istream) RemainingBitsInCurrentByte() uint { return is.remaining } // readBitsInByte reads numBits in byte b. -func readBitsInByte(b byte, numBits int) byte { - return b >> uint(8-numBits) +func readBitsInByte(b byte, numBits uint) byte { + return b >> (8 - numBits) } // consumeBuffer consumes numBits in is.current. -func (is *istream) consumeBuffer(numBits int) byte { +func (is *istream) consumeBuffer(numBits uint) byte { res := readBitsInByte(is.current, numBits) - is.current <<= uint(numBits) + is.current <<= numBits is.remaining -= numBits return res } diff --git a/src/dbnode/encoding/istream_test.go b/src/dbnode/encoding/istream_test.go index 7658c419f1..4841590b01 100644 --- a/src/dbnode/encoding/istream_test.go +++ b/src/dbnode/encoding/istream_test.go @@ -35,7 +35,7 @@ func TestReadBits(t *testing.T) { o := NewIStream(bytes.NewReader(byteStream), 16) is := o.(*istream) - numBits := []int{1, 3, 4, 8, 7, 2, 64, 64} + numBits := []uint{1, 3, 4, 8, 7, 2, 64, 64} var res []uint64 for _, v := range numBits { read, err := is.ReadBits(v) @@ -44,11 +44,9 @@ func TestReadBits(t *testing.T) { } expected := []uint64{0x1, 0x4, 0xa, 0xfe, 0x7e, 0x3, 0x1234567890abcdef, 0x1} require.Equal(t, expected, res) - require.NoError(t, is.err) _, err := is.ReadBits(8) require.Error(t, err) - require.Error(t, is.err) } func TestPeekBitsSuccess(t *testing.T) { @@ -56,7 +54,7 @@ func TestPeekBitsSuccess(t *testing.T) { o := NewIStream(bytes.NewReader(byteStream), 16) is := o.(*istream) inputs := []struct { - numBits int + numBits uint expected uint64 }{ {0, 0}, @@ -73,9 +71,8 @@ func TestPeekBitsSuccess(t *testing.T) { require.NoError(t, err) require.Equal(t, input.expected, res) } - require.NoError(t, is.err) require.Equal(t, byte(0), is.current) - require.Equal(t, 0, is.remaining) + require.Equal(t, 0, int(is.remaining)) } func TestPeekBitsError(t *testing.T) { @@ -98,7 +95,7 @@ func TestReadAfterPeekBits(t *testing.T) { require.Error(t, err) inputs := []struct { - numBits int + numBits uint expected uint64 }{ {2, 0x2}, @@ -117,9 +114,7 @@ func TestResetIStream(t *testing.T) { o := NewIStream(bytes.NewReader(nil), 16) is := o.(*istream) is.ReadBits(1) - require.Error(t, is.err) is.Reset(bytes.NewReader(nil)) - require.NoError(t, is.err) require.Equal(t, byte(0), is.current) - require.Equal(t, 0, is.remaining) + require.Equal(t, 0, int(is.remaining)) } diff --git a/src/dbnode/encoding/m3tsz/float_encoder_iterator.go b/src/dbnode/encoding/m3tsz/float_encoder_iterator.go index fa185095d9..b27f959ac2 100644 --- a/src/dbnode/encoding/m3tsz/float_encoder_iterator.go +++ b/src/dbnode/encoding/m3tsz/float_encoder_iterator.go @@ -134,7 +134,7 @@ func (eit *FloatEncoderAndIterator) readNextFloat(stream encoding.IStream) error cb = (cb << 1) | nextCB if cb == opcodeContainedValueXOR { previousLeading, previousTrailing := encoding.LeadingAndTrailingZeros(eit.PrevXOR) - numMeaningfulBits := 64 - previousLeading - previousTrailing + numMeaningfulBits := uint(64 - previousLeading - previousTrailing) meaningfulBits, err := stream.ReadBits(numMeaningfulBits) if err != nil { return err @@ -153,7 +153,7 @@ func (eit *FloatEncoderAndIterator) readNextFloat(stream encoding.IStream) error numLeadingZeros := (numLeadingZeroesAndNumMeaningfulBits & bits12To6Mask) >> 6 numMeaningfulBits := (numLeadingZeroesAndNumMeaningfulBits & bits6To0Mask) + 1 - meaningfulBits, err := stream.ReadBits(int(numMeaningfulBits)) + meaningfulBits, err := stream.ReadBits(uint(numMeaningfulBits)) if err != nil { return err } diff --git a/src/dbnode/encoding/m3tsz/iterator.go b/src/dbnode/encoding/m3tsz/iterator.go index 4b0614c55c..b2f7e69fa2 100644 --- a/src/dbnode/encoding/m3tsz/iterator.go +++ b/src/dbnode/encoding/m3tsz/iterator.go @@ -165,10 +165,10 @@ func (it *readerIterator) readIntValDiff() { sign = 1.0 } - it.intVal += sign * float64(it.readBits(int(it.sig))) + it.intVal += sign * float64(it.readBits(uint(it.sig))) } -func (it *readerIterator) readBits(numBits int) uint64 { +func (it *readerIterator) readBits(numBits uint) uint64 { if !it.hasNext() { return 0 } diff --git a/src/dbnode/encoding/m3tsz/timestamp_iterator.go b/src/dbnode/encoding/m3tsz/timestamp_iterator.go index ecc7ce4194..ca3402607c 100644 --- a/src/dbnode/encoding/m3tsz/timestamp_iterator.go +++ b/src/dbnode/encoding/m3tsz/timestamp_iterator.go @@ -48,13 +48,21 @@ type TimestampIterator struct { // schemes. Setting SkipMarkers to true disables the look ahead behavior // for situations where looking ahead is not safe. SkipMarkers bool + + numValueBits uint + numBits uint + markerEncodingScheme encoding.MarkerEncodingScheme } // NewTimestampIterator creates a new TimestampIterator. func NewTimestampIterator(opts encoding.Options, skipMarkers bool) TimestampIterator { + mes := opts.MarkerEncodingScheme() return TimestampIterator{ - Opts: opts, - SkipMarkers: skipMarkers, + Opts: opts, + SkipMarkers: skipMarkers, + numValueBits: uint(mes.NumValueBits()), + numBits: uint(mes.NumOpcodeBits() + mes.NumValueBits()), + markerEncodingScheme: mes, } } @@ -137,32 +145,30 @@ func (it *TimestampIterator) readNextTimestamp(stream encoding.IStream) error { } func (it *TimestampIterator) tryReadMarker(stream encoding.IStream) (time.Duration, bool, error) { - mes := it.Opts.MarkerEncodingScheme() - numBits := mes.NumOpcodeBits() + mes.NumValueBits() - opcodeAndValue, success := it.tryPeekBits(stream, numBits) + opcodeAndValue, success := it.tryPeekBits(stream, it.numBits) if !success { return 0, false, nil } - opcode := opcodeAndValue >> uint(mes.NumValueBits()) - if opcode != mes.Opcode() { + opcode := opcodeAndValue >> it.numValueBits + if opcode != it.markerEncodingScheme.Opcode() { return 0, false, nil } var ( - valueMask = (1 << uint(mes.NumValueBits())) - 1 + valueMask = (1 << it.numValueBits) - 1 markerValue = int64(opcodeAndValue & uint64(valueMask)) ) switch encoding.Marker(markerValue) { - case mes.EndOfStream(): - _, err := stream.ReadBits(numBits) + case it.markerEncodingScheme.EndOfStream(): + _, err := stream.ReadBits(it.numBits) if err != nil { return 0, false, err } it.Done = true return 0, true, nil - case mes.Annotation(): - _, err := stream.ReadBits(numBits) + case it.markerEncodingScheme.Annotation(): + _, err := stream.ReadBits(it.numBits) if err != nil { return 0, false, err } @@ -175,8 +181,8 @@ func (it *TimestampIterator) tryReadMarker(stream encoding.IStream) (time.Durati return 0, false, err } return markerOrDOD, true, nil - case mes.TimeUnit(): - _, err := stream.ReadBits(numBits) + case it.markerEncodingScheme.TimeUnit(): + _, err := stream.ReadBits(it.numBits) if err != nil { return 0, false, err } @@ -248,12 +254,12 @@ func (it *TimestampIterator) readDeltaOfDelta( cb = (cb << 1) | nextCB if cb == buckets[i].Opcode() { - dodBits, err := stream.ReadBits(buckets[i].NumValueBits()) + dodBits, err := stream.ReadBits(uint(buckets[i].NumValueBits())) if err != nil { return 0, err } - dod := encoding.SignExtend(dodBits, buckets[i].NumValueBits()) + dod := encoding.SignExtend(dodBits, uint(buckets[i].NumValueBits())) timeUnit, err := it.TimeUnit.Value() if err != nil { return 0, nil @@ -263,7 +269,7 @@ func (it *TimestampIterator) readDeltaOfDelta( } } - numValueBits := tes.DefaultBucket().NumValueBits() + numValueBits := uint(tes.DefaultBucket().NumValueBits()) dodBits, err := stream.ReadBits(numValueBits) if err != nil { return 0, err @@ -310,7 +316,7 @@ func (it *TimestampIterator) readVarint(stream encoding.IStream) (int, error) { return int(res), err } -func (it *TimestampIterator) tryPeekBits(stream encoding.IStream, numBits int) (uint64, bool) { +func (it *TimestampIterator) tryPeekBits(stream encoding.IStream, numBits uint) (uint64, bool) { res, err := stream.PeekBits(numBits) if err != nil { return 0, false diff --git a/src/dbnode/encoding/options.go b/src/dbnode/encoding/options.go index 68c9251536..1016be3d53 100644 --- a/src/dbnode/encoding/options.go +++ b/src/dbnode/encoding/options.go @@ -30,7 +30,7 @@ import ( const ( defaultDefaultTimeUnit = xtime.Second defaultByteFieldDictLRUSize = 4 - defaultIStreamReaderSizeM3TSZ = 16 + defaultIStreamReaderSizeM3TSZ = 8 * 2 defaultIStreamReaderSizeProto = 128 ) diff --git a/src/dbnode/encoding/proto/int_encoder_iterator.go b/src/dbnode/encoding/proto/int_encoder_iterator.go index 1daee6bd93..53dc4074eb 100644 --- a/src/dbnode/encoding/proto/int_encoder_iterator.go +++ b/src/dbnode/encoding/proto/int_encoder_iterator.go @@ -224,7 +224,7 @@ func (eit *intEncoderAndIterator) readIntValDiff(stream encoding.IStream) error itErrPrefix, err) } - numSig := int(eit.intSigBitsTracker.NumSig) + numSig := uint(eit.intSigBitsTracker.NumSig) diffSigBits, err := stream.ReadBits(numSig) if err != nil { return fmt.Errorf( diff --git a/src/dbnode/encoding/proto/iterator.go b/src/dbnode/encoding/proto/iterator.go index 4aa6e79c91..54dbb68eda 100644 --- a/src/dbnode/encoding/proto/iterator.go +++ b/src/dbnode/encoding/proto/iterator.go @@ -336,7 +336,7 @@ func (it *iterator) readCustomFieldsSchema() error { } for i := 1; i <= int(numCustomFields); i++ { - fieldTypeBits, err := it.stream.ReadBits(numBitsToEncodeCustomType) + fieldTypeBits, err := it.stream.ReadBits(uint(numBitsToEncodeCustomType)) if err != nil { return err } @@ -546,7 +546,7 @@ func (it *iterator) readBytesValue(i int, customField customFieldState) error { if valueInDictControlBit == opCodeInterpretSubsequentBitsAsLRUIndex { dictIdxBits, err := it.stream.ReadBits( - numBitsRequiredForNumUpToN(it.byteFieldDictLRUSize)) + uint(numBitsRequiredForNumUpToN(it.byteFieldDictLRUSize))) if err != nil { return fmt.Errorf( "%s error trying to read bytes dict idx: %v", @@ -861,7 +861,7 @@ func (it *iterator) nextToBeEvicted(fieldIdx int) []byte { return dict[0] } -func (it *iterator) readBits(numBits int) (uint64, error) { +func (it *iterator) readBits(numBits uint) (uint64, error) { res, err := it.stream.ReadBits(numBits) if err != nil { return 0, err diff --git a/src/dbnode/encoding/types.go b/src/dbnode/encoding/types.go index 92ed280817..106eb89f64 100644 --- a/src/dbnode/encoding/types.go +++ b/src/dbnode/encoding/types.go @@ -332,9 +332,9 @@ type IStream interface { Read([]byte) (int, error) ReadBit() (Bit, error) ReadByte() (byte, error) - ReadBits(numBits int) (uint64, error) - PeekBits(numBits int) (uint64, error) - RemainingBitsInCurrentByte() int + ReadBits(numBits uint) (uint64, error) + PeekBits(numBits uint) (uint64, error) + RemainingBitsInCurrentByte() uint Reset(r io.Reader) } diff --git a/src/query/ts/m3db/encoded_step_iterator_test.go b/src/query/ts/m3db/encoded_step_iterator_test.go index 5b523045a2..02b7af1568 100644 --- a/src/query/ts/m3db/encoded_step_iterator_test.go +++ b/src/query/ts/m3db/encoded_step_iterator_test.go @@ -398,6 +398,7 @@ func (t iterType) name(name string) string { } type reset func() +type stop func() // newTestOptions provides options with very small/non-existent pools // so that memory profiles don't get cluttered with pooled allocated objects. @@ -423,7 +424,7 @@ func newTestOptions() Options { return newOptions(bytesPool, iteratorPools) } -func setupBlock(b *testing.B, iterations int, t iterType) (block.Block, reset) { +func setupBlock(b *testing.B, iterations int, t iterType) (block.Block, reset, stop) { var ( seriesCount = 1000 replicasCount = 3 @@ -509,6 +510,39 @@ func setupBlock(b *testing.B, iterations int, t iterType) (block.Block, reset) { } usePools := t == stepParallel + + opts := newTestOptions() + if usePools { + poolOpts := xsync.NewPooledWorkerPoolOptions() + readWorkerPools, err := xsync.NewPooledWorkerPool(1024, poolOpts) + require.NoError(b, err) + readWorkerPools.Init() + opts = opts.SetReadWorkerPool(readWorkerPools) + } + + for _, reset := range itersReset { + reset() + } + + block, err := NewEncodedBlock(iters, models.Bounds{ + Start: start, + StepSize: stepSize, + Duration: window, + }, false, block.NewResultMetadata(), opts) + + require.NoError(b, err) + return block, func() { + for _, reset := range itersReset { + reset() + } + }, + setupProf(usePools, iterations) +} + +func setupProf(usePools bool, iterations int) stop { + var prof interface { + Stop() + } if os.Getenv("PROFILE_TEST_CPU") == "true" { key := profileTakenKey{ profile: "cpu", @@ -516,8 +550,7 @@ func setupBlock(b *testing.B, iterations int, t iterType) (block.Block, reset) { iterations: iterations, } if v := profilesTaken[key]; v == 2 { - p := profile.Start(profile.CPUProfile) - defer p.Stop() + prof = profile.Start(profile.CPUProfile) } profilesTaken[key] = profilesTaken[key] + 1 @@ -531,42 +564,22 @@ func setupBlock(b *testing.B, iterations int, t iterType) (block.Block, reset) { } if v := profilesTaken[key]; v == 2 { - p := profile.Start(profile.MemProfile) - defer p.Stop() + prof = profile.Start(profile.MemProfile) } profilesTaken[key] = profilesTaken[key] + 1 } - - opts := newTestOptions() - if usePools { - poolOpts := xsync.NewPooledWorkerPoolOptions() - readWorkerPools, err := xsync.NewPooledWorkerPool(1024, poolOpts) - require.NoError(b, err) - readWorkerPools.Init() - opts = opts.SetReadWorkerPool(readWorkerPools) - } - - for _, reset := range itersReset { - reset() - } - - block, err := NewEncodedBlock(iters, models.Bounds{ - Start: start, - StepSize: stepSize, - Duration: window, - }, false, block.NewResultMetadata(), opts) - - require.NoError(b, err) - return block, func() { - for _, reset := range itersReset { - reset() + return func() { + if prof != nil { + prof.Stop() } } } func benchmarkNextIteration(b *testing.B, iterations int, t iterType) { - bl, reset := setupBlock(b, iterations, t) + bl, reset, close := setupBlock(b, iterations, t) + defer close() + if t == seriesSequential { it, err := bl.SeriesIter() require.NoError(b, err)