From 033eae9e73b527795820ed59a5a2e0282d2a94ad Mon Sep 17 00:00:00 2001 From: xige-16 Date: Mon, 29 Jan 2024 10:17:02 +0800 Subject: [PATCH] enhance: Set segment.maxSize param to 1024M (#30139) issue: #25639 /kind improvement When the number of vector columns increases, the number of rows per segment will decrease. In order to reduce the impact on vector indexing performance, it is necessary to increase the segment max limit. If a collection has multiple vector fields with memory and disk indices on different vector fields, the size limit after segment compaction is the minimum of segment.maxSize and segment.diskSegmentMaxSize. Signed-off-by: xige-16 --------- Signed-off-by: xige-16 --- configs/milvus.yaml | 4 +- internal/datacoord/compaction_trigger.go | 58 ++-- internal/datacoord/compaction_trigger_test.go | 320 +++++++++++++++++- pkg/util/indexparamcheck/index_type.go | 4 + pkg/util/paramtable/component_param.go | 6 +- 5 files changed, 364 insertions(+), 28 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index e154a5d7199ab..13571edd7ba03 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -384,9 +384,9 @@ dataCoord: balanceSilentDuration: 300 # The duration before the channelBalancer on datacoord to run balanceInterval: 360 #The interval for the channelBalancer on datacoord to check balance status segment: - maxSize: 512 # Maximum size of a segment in MB + maxSize: 1024 # Maximum size of a segment in MB diskSegmentMaxSize: 2048 # Maximum size of a segment in MB for collection which has Disk index - sealProportion: 0.23 + sealProportion: 0.12 # The time of the assignment expiration in ms # Warning! this parameter is an expert variable and closely related to data integrity. Without specific # target and solid understanding of the scenarios, it should not be changed. If it's necessary to alter diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index 46c2f7ba63a5a..a8f143d792665 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -28,11 +28,14 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/indexparamcheck" "github.com/milvus-io/milvus/pkg/util/logutil" "github.com/milvus-io/milvus/pkg/util/tsoutil" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) type compactTime struct { @@ -315,31 +318,44 @@ func (t *compactionTrigger) updateSegmentMaxSize(segments []*SegmentInfo) (bool, collectionID := segments[0].GetCollectionID() indexInfos := t.meta.GetIndexesForCollection(segments[0].GetCollectionID(), "") - isDiskANN := false - for _, indexInfo := range indexInfos { - indexType := getIndexType(indexInfo.IndexParams) - if indexType == indexparamcheck.IndexDISKANN { - // If index type is DiskANN, recalc segment max size here. - isDiskANN = true - newMaxRows, err := t.reCalcSegmentMaxNumOfRows(collectionID, true) - if err != nil { - return false, err - } - if len(segments) > 0 && int64(newMaxRows) != segments[0].GetMaxRowNum() { - log.Info("segment max rows recalculated for DiskANN collection", - zap.Int64("old max rows", segments[0].GetMaxRowNum()), - zap.Int64("new max rows", int64(newMaxRows))) - for _, segment := range segments { - segment.MaxRowNum = int64(newMaxRows) - } + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + collMeta, err := t.handler.GetCollection(ctx, collectionID) + if err != nil { + return false, fmt.Errorf("failed to get collection %d", collectionID) + } + vectorFields := typeutil.GetVectorFieldSchemas(collMeta.Schema) + fieldIndexTypes := lo.SliceToMap(indexInfos, func(t *model.Index) (int64, indexparamcheck.IndexType) { + return t.FieldID, getIndexType(t.IndexParams) + }) + vectorFieldsWithDiskIndex := lo.Filter(vectorFields, func(field *schemapb.FieldSchema, _ int) bool { + if indexType, ok := fieldIndexTypes[field.FieldID]; ok { + return indexparamcheck.IsDiskIndex(indexType) + } + return false + }) + + allDiskIndex := len(vectorFields) == len(vectorFieldsWithDiskIndex) + if allDiskIndex { + // Only if all vector fields index type are DiskANN, recalc segment max size here. + newMaxRows, err := t.reCalcSegmentMaxNumOfRows(collectionID, true) + if err != nil { + return false, err + } + if len(segments) > 0 && int64(newMaxRows) != segments[0].GetMaxRowNum() { + log.Info("segment max rows recalculated for DiskANN collection", + zap.Int64("old max rows", segments[0].GetMaxRowNum()), + zap.Int64("new max rows", int64(newMaxRows))) + for _, segment := range segments { + segment.MaxRowNum = int64(newMaxRows) } } } - // If index type is not DiskANN, recalc segment max size using default policy. - if !isDiskANN && !t.testingOnly { + // If some vector fields index type are not DiskANN, recalc segment max size using default policy. + if !allDiskIndex && !t.testingOnly { newMaxRows, err := t.reCalcSegmentMaxNumOfRows(collectionID, false) if err != nil { - return isDiskANN, err + return allDiskIndex, err } if len(segments) > 0 && int64(newMaxRows) != segments[0].GetMaxRowNum() { log.Info("segment max rows recalculated for non-DiskANN collection", @@ -350,7 +366,7 @@ func (t *compactionTrigger) updateSegmentMaxSize(segments []*SegmentInfo) (bool, } } } - return isDiskANN, nil + return allDiskIndex, nil } func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error { diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index 4d14186f83059..86da5cfacb4d9 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -24,6 +24,7 @@ import ( "time" "github.com/cockroachdb/errors" + "github.com/samber/lo" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" @@ -33,6 +34,7 @@ import ( "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/util/indexparamcheck" "github.com/milvus-io/milvus/pkg/util/tsoutil" ) @@ -1745,7 +1747,7 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) { assert.True(t, couldDo) // if only 10 bin logs, then disk index won't trigger compaction - info.Statslogs = binlogs[0:20] + info.Statslogs = binlogs[0:40] couldDo = trigger.ShouldDoSingleCompaction(info, false, &compactTime{}) assert.True(t, couldDo) @@ -2224,7 +2226,7 @@ func (s *CompactionTriggerSuite) TestHandleSignal() { defer s.SetupTest() tr := s.tr s.compactionHandler.EXPECT().isFull().Return(false) - s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil) + // s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil) s.handler.EXPECT().GetCollection(mock.Anything, int64(100)).Return(nil, errors.New("mocked")) tr.handleSignal(&compactionSignal{ segmentID: 1, @@ -2246,6 +2248,14 @@ func (s *CompactionTriggerSuite) TestHandleSignal() { Properties: map[string]string{ common.CollectionAutoCompactionKey: "bad_value", }, + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: s.vecFieldID, + DataType: schemapb.DataType_FloatVector, + }, + }, + }, }, nil) tr.handleSignal(&compactionSignal{ segmentID: 1, @@ -2319,6 +2329,30 @@ func (s *CompactionTriggerSuite) TestHandleSignal() { } func (s *CompactionTriggerSuite) TestHandleGlobalSignal() { + schema := &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: common.StartOfUserFieldID, + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: "128", + }, + }, + }, + { + FieldID: common.StartOfUserFieldID + 1, + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: "128", + }, + }, + }, + }, + } s.Run("getCompaction_failed", func() { defer s.SetupTest() tr := s.tr @@ -2342,6 +2376,7 @@ func (s *CompactionTriggerSuite) TestHandleGlobalSignal() { s.compactionHandler.EXPECT().isFull().Return(false) s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil) s.handler.EXPECT().GetCollection(mock.Anything, int64(100)).Return(&collectionInfo{ + Schema: schema, Properties: map[string]string{ common.CollectionAutoCompactionKey: "bad_value", }, @@ -2363,6 +2398,7 @@ func (s *CompactionTriggerSuite) TestHandleGlobalSignal() { s.compactionHandler.EXPECT().isFull().Return(false) s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil) s.handler.EXPECT().GetCollection(mock.Anything, int64(100)).Return(&collectionInfo{ + Schema: schema, Properties: map[string]string{ common.CollectionAutoCompactionKey: "false", }, @@ -2385,6 +2421,7 @@ func (s *CompactionTriggerSuite) TestHandleGlobalSignal() { s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil) s.allocator.EXPECT().allocID(mock.Anything).Return(20000, nil) s.handler.EXPECT().GetCollection(mock.Anything, int64(100)).Return(&collectionInfo{ + Schema: schema, Properties: map[string]string{ common.CollectionAutoCompactionKey: "false", }, @@ -2400,6 +2437,285 @@ func (s *CompactionTriggerSuite) TestHandleGlobalSignal() { }) } +// test updateSegmentMaxSize +func Test_compactionTrigger_updateSegmentMaxSize(t *testing.T) { + type fields struct { + meta *meta + allocator allocator + signals chan *compactionSignal + compactionHandler compactionPlanContext + globalTrigger *time.Ticker + } + type args struct { + collectionID int64 + compactTime *compactTime + } + collectionID := int64(2) + vecFieldID1 := int64(201) + vecFieldID2 := int64(202) + segmentInfos := make([]*SegmentInfo, 0) + for i := UniqueID(0); i < 50; i++ { + info := &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: i, + CollectionID: collectionID, + }, + segmentIndexes: map[UniqueID]*model.SegmentIndex{ + indexID: { + SegmentID: i, + CollectionID: collectionID, + PartitionID: 1, + NumRows: 100, + IndexID: indexID, + BuildID: i, + NodeID: 0, + IndexVersion: 1, + IndexState: commonpb.IndexState_Finished, + }, + }, + } + segmentInfos = append(segmentInfos, info) + } + segmentsInfo := &SegmentsInfo{ + segments: lo.SliceToMap(segmentInfos, func(t *SegmentInfo) (UniqueID, *SegmentInfo) { + return t.ID, t + }), + } + info := &collectionInfo{ + ID: collectionID, + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: vecFieldID1, + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: "128", + }, + }, + }, + { + FieldID: vecFieldID2, + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: "128", + }, + }, + }, + }, + }, + } + + tests := []struct { + name string + fields fields + args args + isDiskANN bool + }{ + { + "all mem index", + fields{ + &meta{ + segments: segmentsInfo, + collections: map[int64]*collectionInfo{ + collectionID: info, + }, + indexes: map[UniqueID]map[UniqueID]*model.Index{ + collectionID: { + indexID: { + TenantID: "", + CollectionID: 2, + FieldID: vecFieldID1, + IndexID: indexID, + IndexName: "_default_idx_1", + IsDeleted: false, + CreateTime: 0, + TypeParams: nil, + IndexParams: []*commonpb.KeyValuePair{ + { + Key: common.IndexTypeKey, + Value: "HNSW", + }, + }, + IsAutoIndex: false, + UserIndexParams: nil, + }, + indexID + 1: { + TenantID: "", + CollectionID: 2, + FieldID: vecFieldID2, + IndexID: indexID + 1, + IndexName: "_default_idx_2", + IsDeleted: false, + CreateTime: 0, + TypeParams: nil, + IndexParams: []*commonpb.KeyValuePair{ + { + Key: common.IndexTypeKey, + Value: "HNSW", + }, + }, + IsAutoIndex: false, + UserIndexParams: nil, + }, + }, + }, + }, + newMockAllocator(), + nil, + &spyCompactionHandler{spyChan: make(chan *datapb.CompactionPlan, 2)}, + nil, + }, + args{ + collectionID, + &compactTime{}, + }, + false, + }, + { + "all disk index", + fields{ + &meta{ + segments: segmentsInfo, + collections: map[int64]*collectionInfo{ + collectionID: info, + }, + indexes: map[UniqueID]map[UniqueID]*model.Index{ + collectionID: { + indexID: { + TenantID: "", + CollectionID: 2, + FieldID: vecFieldID1, + IndexID: indexID, + IndexName: "_default_idx_1", + IsDeleted: false, + CreateTime: 0, + TypeParams: nil, + IndexParams: []*commonpb.KeyValuePair{ + { + Key: common.IndexTypeKey, + Value: indexparamcheck.IndexDISKANN, + }, + }, + IsAutoIndex: false, + UserIndexParams: nil, + }, + indexID + 1: { + TenantID: "", + CollectionID: 2, + FieldID: vecFieldID2, + IndexID: indexID + 1, + IndexName: "_default_idx_2", + IsDeleted: false, + CreateTime: 0, + TypeParams: nil, + IndexParams: []*commonpb.KeyValuePair{ + { + Key: common.IndexTypeKey, + Value: indexparamcheck.IndexDISKANN, + }, + }, + IsAutoIndex: false, + UserIndexParams: nil, + }, + }, + }, + }, + newMockAllocator(), + nil, + &spyCompactionHandler{spyChan: make(chan *datapb.CompactionPlan, 2)}, + nil, + }, + args{ + collectionID, + &compactTime{}, + }, + true, + }, + { + "some mme index", + fields{ + &meta{ + segments: segmentsInfo, + collections: map[int64]*collectionInfo{ + collectionID: info, + }, + indexes: map[UniqueID]map[UniqueID]*model.Index{ + collectionID: { + indexID: { + TenantID: "", + CollectionID: 2, + FieldID: vecFieldID1, + IndexID: indexID, + IndexName: "_default_idx_1", + IsDeleted: false, + CreateTime: 0, + TypeParams: nil, + IndexParams: []*commonpb.KeyValuePair{ + { + Key: common.IndexTypeKey, + Value: indexparamcheck.IndexDISKANN, + }, + }, + IsAutoIndex: false, + UserIndexParams: nil, + }, + indexID + 1: { + TenantID: "", + CollectionID: 2, + FieldID: vecFieldID2, + IndexID: indexID + 1, + IndexName: "_default_idx_2", + IsDeleted: false, + CreateTime: 0, + TypeParams: nil, + IndexParams: []*commonpb.KeyValuePair{ + { + Key: common.IndexTypeKey, + Value: indexparamcheck.IndexHNSW, + }, + }, + IsAutoIndex: false, + UserIndexParams: nil, + }, + }, + }, + }, + newMockAllocator(), + nil, + &spyCompactionHandler{spyChan: make(chan *datapb.CompactionPlan, 2)}, + nil, + }, + args{ + collectionID, + &compactTime{}, + }, + false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tr := &compactionTrigger{ + meta: tt.fields.meta, + handler: newMockHandlerWithMeta(tt.fields.meta), + allocator: tt.fields.allocator, + signals: tt.fields.signals, + compactionHandler: tt.fields.compactionHandler, + globalTrigger: tt.fields.globalTrigger, + estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, + estimateNonDiskSegmentPolicy: calBySchemaPolicy, + testingOnly: true, + } + res, err := tr.updateSegmentMaxSize(segmentInfos) + assert.NoError(t, err) + assert.Equal(t, tt.isDiskANN, res) + }) + } +} + func TestCompactionTriggerSuite(t *testing.T) { suite.Run(t, new(CompactionTriggerSuite)) } diff --git a/pkg/util/indexparamcheck/index_type.go b/pkg/util/indexparamcheck/index_type.go index b6fb43049e72d..5bf0ad25d0100 100644 --- a/pkg/util/indexparamcheck/index_type.go +++ b/pkg/util/indexparamcheck/index_type.go @@ -48,3 +48,7 @@ func IsMmapSupported(indexType IndexType) bool { indexType == IndexFaissBinIvfFlat || indexType == IndexHNSW } + +func IsDiskIndex(indexType IndexType) bool { + return indexType == IndexDISKANN +} diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 869a1ffc63497..21fdec1662eca 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2387,7 +2387,7 @@ func (p *dataCoordConfig) init(base *BaseTable) { p.SegmentMaxSize = ParamItem{ Key: "dataCoord.segment.maxSize", Version: "2.0.0", - DefaultValue: "512", + DefaultValue: "1024", Doc: "Maximum size of a segment in MB", Export: true, } @@ -2396,7 +2396,7 @@ func (p *dataCoordConfig) init(base *BaseTable) { p.DiskSegmentMaxSize = ParamItem{ Key: "dataCoord.segment.diskSegmentMaxSize", Version: "2.0.0", - DefaultValue: "512", + DefaultValue: "2048", Doc: "Maximun size of a segment in MB for collection which has Disk index", Export: true, } @@ -2405,7 +2405,7 @@ func (p *dataCoordConfig) init(base *BaseTable) { p.SegmentSealProportion = ParamItem{ Key: "dataCoord.segment.sealProportion", Version: "2.0.0", - DefaultValue: "0.23", + DefaultValue: "0.12", Export: true, } p.SegmentSealProportion.Init(base.mgr)