Skip to content

Commit

Permalink
support indexing doc even if partially analyzed
Browse files Browse the repository at this point in the history
- While analyzing a doc, analysis of few fields can fail.
- We want to index the part of doc for which analysis succeeded.
  • Loading branch information
moshaad7 committed Oct 12, 2023
1 parent 0dc93ee commit 39ff270
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 70 deletions.
9 changes: 0 additions & 9 deletions document/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,6 @@ func (d *Document) VisitFields(visitor index.FieldVisitor) {
}
}

func (d *Document) VisitFieldsAdv(visitor index.FieldVisitorAdv) {
for _, f := range d.Fields {
stop := visitor(f)
if stop {
break
}
}
}

func (d *Document) VisitComposite(visitor index.CompositeFieldVisitor) {
for _, f := range d.CompositeFields {
visitor(f)

Check failure on line 129 in document/document.go

View workflow job for this annotation

GitHub Actions / test (1.18.x, ubuntu-latest)

cannot use f (variable of type *CompositeField) as type index.CompositeField in argument to visitor:

Check failure on line 129 in document/document.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, ubuntu-latest)

cannot use f (variable of type *CompositeField) as index.CompositeField value in argument to visitor: *CompositeField does not implement index.CompositeField (wrong type for method Analyze)

Check failure on line 129 in document/document.go

View workflow job for this annotation

GitHub Actions / test (1.19.x, ubuntu-latest)

cannot use f (variable of type *CompositeField) as type index.CompositeField in argument to visitor:

Check failure on line 129 in document/document.go

View workflow job for this annotation

GitHub Actions / test (1.19.x, macos-latest)

cannot use f (variable of type *CompositeField) as type index.CompositeField in argument to visitor:

Check failure on line 129 in document/document.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, macos-latest)

cannot use f (variable of type *CompositeField) as index.CompositeField value in argument to visitor: *CompositeField does not implement index.CompositeField (wrong type for method Analyze)
Expand Down
99 changes: 38 additions & 61 deletions index/scorch/scorch.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package scorch
import (
"encoding/json"
"fmt"
"log"
"os"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -77,6 +78,13 @@ type Scorch struct {
segPlugin SegmentPlugin

spatialPlugin index.SpatialAnalyzerPlugin

failedAnalysisMutex sync.RWMutex
// note: this can grow unboundedly.
// In future, we may want to limit the size of this map.
// (something like, only keep the last 1000 failed analysis)
// In addition to that, we can store total number of failed analysis so far.
failedAnalysis map[string]map[string]error // docID -> fieldName -> error
}

// AsyncPanicError is passed to scorch asyncErrorHandler when panic occurs in scorch background process
Expand Down Expand Up @@ -112,6 +120,8 @@ func NewScorch(storeName string,
ineligibleForRemoval: map[string]bool{},
forceMergeRequestCh: make(chan *mergerCtrl, 1),
segPlugin: defaultSegmentPlugin,

failedAnalysis: make(map[string]map[string]error),
}

forcedSegmentType, forcedSegmentVersion, err := configForceSegmentTypeVersion(config)
Expand Down Expand Up @@ -361,6 +371,9 @@ func (s *Scorch) Delete(id string) error {
return s.Batch(b)
}

type FieldsAnalysisError map[string]error // field name -> error
type FailedAnalysis map[string]FieldsAnalysisError // docID -> FieldsAnalysisError

// Batch applices a batch of changes to the index atomically
func (s *Scorch) Batch(batch *index.Batch) (err error) {
start := time.Now()
Expand All @@ -370,12 +383,6 @@ func (s *Scorch) Batch(batch *index.Batch) (err error) {
}()

resultChan := make(chan index.Document, len(batch.IndexOps))
// docIDs of failed doc
type failedDoc struct {
id string
err error
}
failedDocs := make(chan *failedDoc, len(batch.IndexOps))

var numUpdates uint64
var numDeletes uint64
Expand All @@ -397,68 +404,41 @@ func (s *Scorch) Batch(batch *index.Batch) (err error) {

if numUpdates > 0 {
go func() {
var wg sync.WaitGroup
for k := range batch.IndexOps {
doc := batch.IndexOps[k]
if doc != nil {
// put the work on the queue
wg.Add(1)
s.analysisQueue.Queue(func() {
defer wg.Done()
errAnalyze := analyze(doc, s.setSpatialAnalyzerPlugin)
if errAnalyze != nil {
failedDocs <- &failedDoc{id: k, err: errAnalyze}
return
fieldsError := analyze(doc, s.setSpatialAnalyzerPlugin)
if len(fieldsError) > 0 {
s.failedAnalysisMutex.Lock()
s.failedAnalysis[doc.ID()] = fieldsError
s.failedAnalysisMutex.Unlock()

log.Printf("AnalysisReport: docID: %s, fieldsError: %v",
doc.ID(), fieldsError)
}

resultChan <- doc
})
}
}
wg.Wait()
close(resultChan)
close(failedDocs)
}()
} else {
close(resultChan)
close(failedDocs)
}

// # Setup routines to handle analysis results and failed docs
var wg sync.WaitGroup
wg.Add(2)

// handle analysis result
// wait for analysis result
analysisResults := make([]index.Document, int(numUpdates))
analysisResults = analysisResults[:0]
var itemsDeQueued uint64
var totalAnalysisSize int
go func() {
defer wg.Done()

for result := range resultChan {
resultSize := result.Size()
atomic.AddUint64(&s.iStats.analysisBytesAdded, uint64(resultSize))
totalAnalysisSize += resultSize
analysisResults = append(analysisResults, result)
}
}()

// handle failed docs
failedResults := make([]*failedDoc, 0, len(batch.IndexOps))
failedResults = failedResults[:0]
go func() {
defer wg.Done()
for failedDoc := range failedDocs {
failedResults = append(failedResults, failedDoc)
}
}()

wg.Wait()

// todo: change the interface of bleve_index_api.Index to return failedDocs
for _, failedDoc := range failedResults {
fmt.Println("failed doc:", failedDoc.id, failedDoc.err)
}

for itemsDeQueued < numUpdates {
result := <-resultChan
resultSize := result.Size()
atomic.AddUint64(&s.iStats.analysisBytesAdded, uint64(resultSize))
totalAnalysisSize += resultSize
analysisResults[itemsDeQueued] = result
itemsDeQueued++
}
close(resultChan)
defer atomic.AddUint64(&s.iStats.analysisBytesRemoved, uint64(totalAnalysisSize))

atomic.AddUint64(&s.stats.TotAnalysisTime, uint64(time.Since(start)))
Expand Down Expand Up @@ -700,18 +680,17 @@ func (s *Scorch) setSpatialAnalyzerPlugin(f index.Field) {
}
}

func analyze(d index.Document, fn customAnalyzerPluginInitFunc) error {
var analyzeErr error
d.VisitFieldsAdv(func(field index.Field) bool {
func analyze(d index.Document, fn customAnalyzerPluginInitFunc) map[string]error {
rv := make(map[string]error)
d.VisitFields(func(field index.Field) {
if field.Options().IsIndexed() {
if fn != nil {
fn(field)
}

err := field.Analyze()
if err != nil {
analyzeErr = err
return true // stop visiting further fields
rv[field.Name()] = err
}

if d.HasComposite() && field.Name() != "_id" {
Expand All @@ -721,11 +700,9 @@ func analyze(d index.Document, fn customAnalyzerPluginInitFunc) error {
})
}
}

return false
})

return analyzeErr
return rv
}

func (s *Scorch) AddEligibleForRemoval(epoch uint64) {
Expand Down

0 comments on commit 39ff270

Please sign in to comment.