diff --git a/document/document.go b/document/document.go index 6960b424b..54fd6d442 100644 --- a/document/document.go +++ b/document/document.go @@ -124,15 +124,6 @@ func (d *Document) VisitFields(visitor index.FieldVisitor) { } } -func (d *Document) VisitFieldsAdv(visitor index.FieldVisitorAdv) { - for _, f := range d.Fields { - stop := visitor(f) - if stop { - break - } - } -} - func (d *Document) VisitComposite(visitor index.CompositeFieldVisitor) { for _, f := range d.CompositeFields { visitor(f) diff --git a/index/scorch/scorch.go b/index/scorch/scorch.go index 340680d66..d86b4a083 100644 --- a/index/scorch/scorch.go +++ b/index/scorch/scorch.go @@ -17,6 +17,7 @@ package scorch import ( "encoding/json" "fmt" + "log" "os" "sync" "sync/atomic" @@ -77,6 +78,13 @@ type Scorch struct { segPlugin SegmentPlugin spatialPlugin index.SpatialAnalyzerPlugin + + failedAnalysisMutex sync.RWMutex + // note: this can grow unboundedly. + // In future, we may want to limit the size of this map. + // (something like, only keep the last 1000 failed analysis) + // In addition to that, we can store total number of failed analysis so far. + failedAnalysis map[string]map[string]error // docID -> fieldName -> error } // AsyncPanicError is passed to scorch asyncErrorHandler when panic occurs in scorch background process @@ -112,6 +120,8 @@ func NewScorch(storeName string, ineligibleForRemoval: map[string]bool{}, forceMergeRequestCh: make(chan *mergerCtrl, 1), segPlugin: defaultSegmentPlugin, + + failedAnalysis: make(map[string]map[string]error), } forcedSegmentType, forcedSegmentVersion, err := configForceSegmentTypeVersion(config) @@ -361,6 +371,9 @@ func (s *Scorch) Delete(id string) error { return s.Batch(b) } +type FieldsAnalysisError map[string]error // field name -> error +type FailedAnalysis map[string]FieldsAnalysisError // docID -> FieldsAnalysisError + // Batch applices a batch of changes to the index atomically func (s *Scorch) Batch(batch *index.Batch) (err error) { start := time.Now() @@ -370,12 +383,6 @@ func (s *Scorch) Batch(batch *index.Batch) (err error) { }() resultChan := make(chan index.Document, len(batch.IndexOps)) - // docIDs of failed doc - type failedDoc struct { - id string - err error - } - failedDocs := make(chan *failedDoc, len(batch.IndexOps)) var numUpdates uint64 var numDeletes uint64 @@ -397,68 +404,41 @@ func (s *Scorch) Batch(batch *index.Batch) (err error) { if numUpdates > 0 { go func() { - var wg sync.WaitGroup for k := range batch.IndexOps { doc := batch.IndexOps[k] if doc != nil { // put the work on the queue - wg.Add(1) s.analysisQueue.Queue(func() { - defer wg.Done() - errAnalyze := analyze(doc, s.setSpatialAnalyzerPlugin) - if errAnalyze != nil { - failedDocs <- &failedDoc{id: k, err: errAnalyze} - return + fieldsError := analyze(doc, s.setSpatialAnalyzerPlugin) + if len(fieldsError) > 0 { + s.failedAnalysisMutex.Lock() + s.failedAnalysis[doc.ID()] = fieldsError + s.failedAnalysisMutex.Unlock() + + log.Printf("AnalysisReport: docID: %s, fieldsError: %v", + doc.ID(), fieldsError) } + resultChan <- doc }) } } - wg.Wait() - close(resultChan) - close(failedDocs) }() - } else { - close(resultChan) - close(failedDocs) } - // # Setup routines to handle analysis results and failed docs - var wg sync.WaitGroup - wg.Add(2) - - // handle analysis result + // wait for analysis result analysisResults := make([]index.Document, int(numUpdates)) - analysisResults = analysisResults[:0] + var itemsDeQueued uint64 var totalAnalysisSize int - go func() { - defer wg.Done() - - for result := range resultChan { - resultSize := result.Size() - atomic.AddUint64(&s.iStats.analysisBytesAdded, uint64(resultSize)) - totalAnalysisSize += resultSize - analysisResults = append(analysisResults, result) - } - }() - - // handle failed docs - failedResults := make([]*failedDoc, 0, len(batch.IndexOps)) - failedResults = failedResults[:0] - go func() { - defer wg.Done() - for failedDoc := range failedDocs { - failedResults = append(failedResults, failedDoc) - } - }() - - wg.Wait() - - // todo: change the interface of bleve_index_api.Index to return failedDocs - for _, failedDoc := range failedResults { - fmt.Println("failed doc:", failedDoc.id, failedDoc.err) - } - + for itemsDeQueued < numUpdates { + result := <-resultChan + resultSize := result.Size() + atomic.AddUint64(&s.iStats.analysisBytesAdded, uint64(resultSize)) + totalAnalysisSize += resultSize + analysisResults[itemsDeQueued] = result + itemsDeQueued++ + } + close(resultChan) defer atomic.AddUint64(&s.iStats.analysisBytesRemoved, uint64(totalAnalysisSize)) atomic.AddUint64(&s.stats.TotAnalysisTime, uint64(time.Since(start))) @@ -700,9 +680,9 @@ func (s *Scorch) setSpatialAnalyzerPlugin(f index.Field) { } } -func analyze(d index.Document, fn customAnalyzerPluginInitFunc) error { - var analyzeErr error - d.VisitFieldsAdv(func(field index.Field) bool { +func analyze(d index.Document, fn customAnalyzerPluginInitFunc) map[string]error { + rv := make(map[string]error) + d.VisitFields(func(field index.Field) { if field.Options().IsIndexed() { if fn != nil { fn(field) @@ -710,8 +690,7 @@ func analyze(d index.Document, fn customAnalyzerPluginInitFunc) error { err := field.Analyze() if err != nil { - analyzeErr = err - return true // stop visiting further fields + rv[field.Name()] = err } if d.HasComposite() && field.Name() != "_id" { @@ -721,11 +700,9 @@ func analyze(d index.Document, fn customAnalyzerPluginInitFunc) error { }) } } - - return false }) - return analyzeErr + return rv } func (s *Scorch) AddEligibleForRemoval(epoch uint64) {