Skip to content

Commit

Permalink
Rework searchers to execute on a single reader
Browse files Browse the repository at this point in the history
  • Loading branch information
prateek committed Sep 5, 2018
1 parent 1672ba6 commit 948b4d2
Show file tree
Hide file tree
Showing 34 changed files with 539 additions and 668 deletions.
71 changes: 71 additions & 0 deletions src/m3ninx/index/segment/fst/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright (c) 2018 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package fst

import (
"bytes"
"testing"

sgmt "github.com/m3db/m3/src/m3ninx/index/segment"
"github.com/stretchr/testify/require"
)

// ToTestSegment returns a FST segment equivalent to the provide mutable segment.
func ToTestSegment(t *testing.T, s sgmt.MutableSegment, opts Options) sgmt.Segment {
return newFSTSegment(t, s, opts)
}

func newFSTSegment(t *testing.T, s sgmt.MutableSegment, opts Options) sgmt.Segment {
_, err := s.Seal()
require.NoError(t, err)

w := NewWriter()
require.NoError(t, w.Reset(s))

var (
docsDataBuffer bytes.Buffer
docsIndexBuffer bytes.Buffer
postingsBuffer bytes.Buffer
fstTermsBuffer bytes.Buffer
fstFieldsBuffer bytes.Buffer
)

require.NoError(t, w.WriteDocumentsData(&docsDataBuffer))
require.NoError(t, w.WriteDocumentsIndex(&docsIndexBuffer))
require.NoError(t, w.WritePostingsOffsets(&postingsBuffer))
require.NoError(t, w.WriteFSTTerms(&fstTermsBuffer))
require.NoError(t, w.WriteFSTFields(&fstFieldsBuffer))

data := SegmentData{
MajorVersion: w.MajorVersion(),
MinorVersion: w.MinorVersion(),
Metadata: w.Metadata(),
DocsData: docsDataBuffer.Bytes(),
DocsIdxData: docsIndexBuffer.Bytes(),
PostingsData: postingsBuffer.Bytes(),
FSTTermsData: fstTermsBuffer.Bytes(),
FSTFieldsData: fstFieldsBuffer.Bytes(),
}
reader, err := NewSegment(data, opts)
require.NoError(t, err)

return reader
}
39 changes: 1 addition & 38 deletions src/m3ninx/index/segment/fst/writer_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func newTestSegments(t *testing.T, docs []doc.Document) (memSeg sgmt.MutableSegm
_, err := s.Insert(d)
require.NoError(t, err)
}
return s, newFSTSegment(t, s)
return s, newFSTSegment(t, s, testOptions)
}

func newTestMemSegment(t *testing.T) sgmt.MutableSegment {
Expand All @@ -390,43 +390,6 @@ func newTestMemSegment(t *testing.T) sgmt.MutableSegment {
return s
}

func newFSTSegment(t *testing.T, s sgmt.MutableSegment) sgmt.Segment {
_, err := s.Seal()
require.NoError(t, err)

w := NewWriter()
require.NoError(t, w.Reset(s))

var (
docsDataBuffer bytes.Buffer
docsIndexBuffer bytes.Buffer
postingsBuffer bytes.Buffer
fstTermsBuffer bytes.Buffer
fstFieldsBuffer bytes.Buffer
)

require.NoError(t, w.WriteDocumentsData(&docsDataBuffer))
require.NoError(t, w.WriteDocumentsIndex(&docsIndexBuffer))
require.NoError(t, w.WritePostingsOffsets(&postingsBuffer))
require.NoError(t, w.WriteFSTTerms(&fstTermsBuffer))
require.NoError(t, w.WriteFSTFields(&fstFieldsBuffer))

data := SegmentData{
MajorVersion: w.MajorVersion(),
MinorVersion: w.MinorVersion(),
Metadata: w.Metadata(),
DocsData: docsDataBuffer.Bytes(),
DocsIdxData: docsIndexBuffer.Bytes(),
PostingsData: postingsBuffer.Bytes(),
FSTTermsData: fstTermsBuffer.Bytes(),
FSTFieldsData: fstFieldsBuffer.Bytes(),
}
reader, err := NewSegment(data, testOptions)
require.NoError(t, err)

return reader
}

func assertSliceOfByteSlicesEqual(t *testing.T, a, b [][]byte) {
require.Equal(t, len(a), len(b), fmt.Sprintf("a = [%s], b = [%s]", pprint(a), pprint(b)))
require.Equal(t, a, b)
Expand Down
2 changes: 1 addition & 1 deletion src/m3ninx/search/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (e *executor) Execute(q search.Query) (doc.Iterator, error) {
return nil, errExecutorClosed
}

s, err := q.Searcher(e.readers)
s, err := q.Searcher()
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion src/m3ninx/search/executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestExecutor(t *testing.T) {
rs = index.Readers{r}
)
gomock.InOrder(
q.EXPECT().Searcher(rs).Return(nil, nil),
q.EXPECT().Searcher().Return(nil, nil),

r.EXPECT().Close().Return(nil),
)
Expand Down
52 changes: 19 additions & 33 deletions src/m3ninx/search/executor/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,11 @@
package executor

import (
"errors"

"github.com/m3db/m3/src/m3ninx/doc"
"github.com/m3db/m3/src/m3ninx/index"
"github.com/m3db/m3/src/m3ninx/search"
)

var (
errNotEnoughReaders = errors.New("searcher returned more postings lists than the number of readers")
errTooManyReaders = errors.New("searcher returned less postings lists than the number of readers")
)

type iterator struct {
searcher search.Searcher
readers index.Readers
Expand All @@ -49,12 +42,14 @@ func newIterator(s search.Searcher, rs index.Readers) (doc.Iterator, error) {
it := &iterator{
searcher: s,
readers: rs,
idx: -1,
}

currIter, err := it.nextIter()
currIter, _, err := it.nextIter()
if err != nil {
return nil, err
}

it.currIter = currIter
return it, nil
}
Expand All @@ -79,16 +74,16 @@ func (it *iterator) Next() bool {
return false
}

it.idx++
iter, err := it.nextIter()
iter, hasNext, err := it.nextIter()
if err != nil {
it.err = err
return false
}

if iter == nil {
if !hasNext {
return false
}

it.currIter = iter
}

Expand All @@ -114,32 +109,23 @@ func (it *iterator) Close() error {

// nextIter gets the next document iterator by getting the next postings list from
// the it's searcher and then getting the documents for that postings list from the
// corresponding reader associated with that postings list. It also validates that
// the number of postings lists returned by the searcher is equal to the number of
// readers that the iterator is searching over.
func (it *iterator) nextIter() (doc.Iterator, error) {
if !it.searcher.Next() {
if err := it.searcher.Err(); err != nil {
return nil, err
}

// Check that the Searcher hasn't returned too few postings lists.
if it.idx != len(it.readers) {
return nil, errTooManyReaders
}

return nil, nil
// corresponding reader associated with that postings list.
func (it *iterator) nextIter() (doc.Iterator, bool, error) {
it.idx++
if it.idx >= len(it.readers) {
return nil, false, nil
}

// Check that the Searcher hasn't returned too many postings lists.
if it.idx == len(it.readers) {
return nil, errNotEnoughReaders
reader := it.readers[it.idx]
pl, err := it.searcher.Search(reader)
if err != nil {
return nil, false, err
}

pl := it.searcher.Current()
iter, err := it.readers[it.idx].Docs(pl)
iter, err := reader.Docs(pl)
if err != nil {
return nil, err
return nil, false, err
}
return iter, nil

return iter, true, nil
}
18 changes: 7 additions & 11 deletions src/m3ninx/search/executor/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,6 @@ func TestIterator(t *testing.T) {
secondPL := roaring.NewPostingsList()
firstPL.Insert(67)

searcher := search.NewMockSearcher(mockCtrl)
gomock.InOrder(
searcher.EXPECT().Next().Return(true),
searcher.EXPECT().Current().Return(firstPL),
searcher.EXPECT().Next().Return(true),
searcher.EXPECT().Current().Return(secondPL),
searcher.EXPECT().Next().Return(false),
searcher.EXPECT().Err().Return(nil),
)

// Set up Readers.
docs := []doc.Document{
doc.Document{
Expand Down Expand Up @@ -103,9 +93,15 @@ func TestIterator(t *testing.T) {
secondReader := index.NewMockReader(mockCtrl)
gomock.InOrder(
firstReader.EXPECT().Docs(firstPL).Return(firstDocIter, nil),

secondReader.EXPECT().Docs(secondPL).Return(secondDocIter, nil),
)

searcher := search.NewMockSearcher(mockCtrl)
gomock.InOrder(
searcher.EXPECT().Search(firstReader).Return(firstPL, nil),
searcher.EXPECT().Search(secondReader).Return(secondPL, nil),
)

readers := index.Readers{firstReader, secondReader}

// Construct iterator and run tests.
Expand Down
Loading

0 comments on commit 948b4d2

Please sign in to comment.