diff --git a/internal/datacoord/compaction_task_clustering.go b/internal/datacoord/compaction_task_clustering.go index dad1ab9e60e6f..245a7c6375baf 100644 --- a/internal/datacoord/compaction_task_clustering.go +++ b/internal/datacoord/compaction_task_clustering.go @@ -94,8 +94,7 @@ func (t *clusteringCompactionTask) Process() bool { Observe(float64(elapse)) } } - // todo debug - log.Info("process clustering task", zap.String("lastState", lastState), zap.String("currentState", currentState)) + log.Debug("process clustering task", zap.String("lastState", lastState), zap.String("currentState", currentState)) return t.State == datapb.CompactionTaskState_completed || t.State == datapb.CompactionTaskState_cleaned } @@ -186,7 +185,7 @@ func (t *clusteringCompactionTask) processPipelining() error { err := t.meta.UpdateSegmentsInfo(operators...) if err != nil { log.Warn("fail to set segment level to L2", zap.Error(err)) - return err + return merr.WrapErrClusteringCompactionMetaError("UpdateSegmentsInfo before compaction executing", err) } if typeutil.IsVectorType(t.GetClusteringKeyField().DataType) { @@ -211,7 +210,7 @@ func (t *clusteringCompactionTask) processExecuting() error { if err != nil || result == nil { if errors.Is(err, merr.ErrNodeNotFound) { log.Warn("GetCompactionPlanResult fail", zap.Error(err)) - // todo reassign node ID + // setNodeID(NullNodeID) to trigger reassign node ID t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)) return nil } @@ -223,9 +222,9 @@ func (t *clusteringCompactionTask) processExecuting() error { t.result = result result := t.result if len(result.GetSegments()) == 0 { - log.Info("illegal compaction results") - err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed)) - return err + log.Warn("illegal compaction results, this should not happen") + t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed)) + return merr.WrapErrCompactionResult("compaction result is empty") } resultSegmentIDs := lo.Map(result.Segments, func(segment *datapb.CompactionSegment, _ int) int64 { @@ -247,6 +246,8 @@ func (t *clusteringCompactionTask) processExecuting() error { err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_timeout)) if err == nil { return t.processFailedOrTimeout() + } else { + return err } } return nil @@ -294,11 +295,11 @@ func (t *clusteringCompactionTask) completeTask() error { if err != nil { return merr.WrapErrClusteringCompactionMetaError("SavePartitionStatsInfo", err) } + var operators []UpdateOperator for _, segID := range t.GetResultSegments() { operators = append(operators, UpdateSegmentPartitionStatsVersionOperator(segID, t.GetPlanID())) } - err = t.meta.UpdateSegmentsInfo(operators...) if err != nil { return merr.WrapErrClusteringCompactionMetaError("UpdateSegmentPartitionStatsVersion", err) @@ -306,7 +307,7 @@ func (t *clusteringCompactionTask) completeTask() error { err = t.meta.GetPartitionStatsMeta().SaveCurrentPartitionStatsVersion(t.GetCollectionID(), t.GetPartitionID(), t.GetChannel(), t.GetPlanID()) if err != nil { - return err + return merr.WrapErrClusteringCompactionMetaError("SaveCurrentPartitionStatsVersion", err) } return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed)) } @@ -315,13 +316,13 @@ func (t *clusteringCompactionTask) processAnalyzing() error { analyzeTask := t.meta.GetAnalyzeMeta().GetTask(t.GetAnalyzeTaskID()) if analyzeTask == nil { log.Warn("analyzeTask not found", zap.Int64("id", t.GetAnalyzeTaskID())) - return errors.New("analyzeTask not found") + return merr.WrapErrAnalyzeTaskNotFound(t.GetAnalyzeTaskID()) // retryable } log.Info("check analyze task state", zap.Int64("id", t.GetAnalyzeTaskID()), zap.Int64("version", analyzeTask.GetVersion()), zap.String("state", analyzeTask.State.String())) switch analyzeTask.State { case indexpb.JobState_JobStateFinished: if analyzeTask.GetCentroidsFile() == "" { - // fake finished vector clustering is not supported in opensource + // not retryable, fake finished vector clustering is not supported in opensource return merr.WrapErrClusteringCompactionNotSupportVector() } else { t.AnalyzeVersion = analyzeTask.GetVersion() @@ -354,7 +355,7 @@ func (t *clusteringCompactionTask) processFailedOrTimeout() error { err := t.meta.UpdateSegmentsInfo(operators...) if err != nil { log.Warn("UpdateSegmentsInfo fail", zap.Error(err)) - return err + return merr.WrapErrClusteringCompactionMetaError("UpdateSegmentsInfo", err) } t.resetSegmentCompacting() @@ -404,14 +405,15 @@ func (t *clusteringCompactionTask) doAnalyze() error { } func (t *clusteringCompactionTask) doCompact() error { + log := log.With(zap.Int64("planID", t.GetPlanID()), zap.String("type", t.GetType().String())) if t.NeedReAssignNodeID() { return errors.New("not assign nodeID") } var err error t.plan, err = t.BuildCompactionRequest() if err != nil { - err2 := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed), setFailReason(err.Error())) - return err2 + log.Warn("Failed to BuildCompactionRequest", zap.Error(err)) + return merr.WrapErrBuildCompactionRequestFail(err) // retryable } err = t.sessions.Compaction(context.Background(), t.GetNodeID(), t.GetPlan()) if err != nil { @@ -460,7 +462,8 @@ func (t *clusteringCompactionTask) updateAndSaveTaskMeta(opts ...compactionTaskO task := t.ShadowClone(opts...) err := t.saveTaskMeta(task) if err != nil { - return err + log.Warn("Failed to saveTaskMeta", zap.Error(err)) + return merr.WrapErrClusteringCompactionMetaError("updateAndSaveTaskMeta", err) // retryable } t.CompactionTask = task return nil diff --git a/pkg/util/merr/errors.go b/pkg/util/merr/errors.go index 78034ca1ab3d6..d6f39af93b9ff 100644 --- a/pkg/util/merr/errors.go +++ b/pkg/util/merr/errors.go @@ -185,6 +185,10 @@ var ( ErrClusteringCompactionMetaError = newMilvusError("fail to update meta in clustering compaction", 2308, true) ErrClusteringCompactionGetCollectionFail = newMilvusError("fail to get collection in compaction", 2309, true) ErrCompactionResultNotFound = newMilvusError("compaction result not found", 2310, false) + ErrAnalyzeTaskNotFound = newMilvusError("analyze task not found", 2311, true) + ErrBuildCompactionRequestFail = newMilvusError("fail to build CompactionRequest", 2312, true) + ErrGetCompactionPlanResultFail = newMilvusError("fail to get compaction plan", 2313, true) + ErrCompactionResult = newMilvusError("illegal compaction results", 2314, false) // General ErrOperationNotSupported = newMilvusError("unsupported operation", 3000, false) diff --git a/pkg/util/merr/utils.go b/pkg/util/merr/utils.go index c61a4cc92f238..203e2bb86dbf6 100644 --- a/pkg/util/merr/utils.go +++ b/pkg/util/merr/utils.go @@ -1117,3 +1117,23 @@ func WrapErrClusteringCompactionSubmitTaskFail(taskType string, err error) error func WrapErrClusteringCompactionMetaError(operation string, err error) error { return wrapFieldsWithDesc(ErrClusteringCompactionMetaError, err.Error(), value("operation", operation)) } + +func WrapErrAnalyzeTaskNotFound(id int64) error { + return wrapFields(ErrAnalyzeTaskNotFound, value("analyzeId", id)) +} + +func WrapErrBuildCompactionRequestFail(err error) error { + return wrapFieldsWithDesc(ErrBuildCompactionRequestFail, err.Error()) +} + +func WrapErrGetCompactionPlanResultFail(err error) error { + return wrapFieldsWithDesc(ErrGetCompactionPlanResultFail, err.Error()) +} + +func WrapErrCompactionResult(msg ...string) error { + err := error(ErrCompactionResult) + if len(msg) > 0 { + err = errors.Wrap(err, strings.Join(msg, "->")) + } + return err +}