Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: [cp24]Saperate L0 and Mix trigger interval #37319

Merged
merged 5 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,14 @@ dataCoord:
taskQueueCapacity: 256 # compaction task queue size
rpcTimeout: 10
maxParallelTaskNum: 10
single:
ratio:
threshold: 0.2 # The ratio threshold of a segment to trigger a single compaction, default as 0.2
deltalog:
maxsize: 16777216 # The deltalog size of a segment to trigger a single compaction, default as 16MB
maxnum: 200 # The deltalog count of a segment to trigger a compaction, default as 200
expiredlog:
maxsize: 10485760 # The expired log size of a segment to trigger a compaction, default as 10MB
clustering:
enable: true # Enable clustering compaction
autoEnable: false # Enable auto clustering compaction
Expand Down
4 changes: 1 addition & 3 deletions internal/datacoord/compaction_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func newCompactionTrigger(
}

func (t *compactionTrigger) start() {
t.globalTrigger = time.NewTicker(Params.DataCoordCfg.GlobalCompactionInterval.GetAsDuration(time.Second))
t.globalTrigger = time.NewTicker(Params.DataCoordCfg.MixCompactionTriggerInterval.GetAsDuration(time.Second))
XuanYang-cn marked this conversation as resolved.
Show resolved Hide resolved
t.closeWaiter.Add(2)
go func() {
defer logutil.LogPanic()
Expand All @@ -131,8 +131,6 @@ func (t *compactionTrigger) start() {
default:
// no need to handle err in handleSignal
t.handleSignal(signal)
// shouldn't reset, otherwise a frequent flushed collection will affect other collections
// t.globalTrigger.Reset(Params.DataCoordCfg.GlobalCompactionInterval)
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/compaction_trigger_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,11 @@ func (m *CompactionTriggerManager) startLoop() {
defer logutil.LogPanic()
defer m.closeWg.Done()

l0Ticker := time.NewTicker(Params.DataCoordCfg.GlobalCompactionInterval.GetAsDuration(time.Second))
l0Ticker := time.NewTicker(Params.DataCoordCfg.L0CompactionTriggerInterval.GetAsDuration(time.Second))
defer l0Ticker.Stop()
clusteringTicker := time.NewTicker(Params.DataCoordCfg.ClusteringCompactionTriggerInterval.GetAsDuration(time.Second))
defer clusteringTicker.Stop()
singleTicker := time.NewTicker(Params.DataCoordCfg.GlobalCompactionInterval.GetAsDuration(time.Second))
singleTicker := time.NewTicker(Params.DataCoordCfg.MixCompactionTriggerInterval.GetAsDuration(time.Second))
defer singleTicker.Stop()
log.Info("Compaction trigger manager start")
for {
Expand Down
60 changes: 44 additions & 16 deletions pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -3156,25 +3156,29 @@ type dataCoordConfig struct {
CompactionTaskPrioritizer ParamItem `refreshable:"true"`
CompactionTaskQueueCapacity ParamItem `refreshable:"false"`

CompactionRPCTimeout ParamItem `refreshable:"true"`
CompactionMaxParallelTasks ParamItem `refreshable:"true"`
CompactionWorkerParalleTasks ParamItem `refreshable:"true"`
MinSegmentToMerge ParamItem `refreshable:"true"`
MaxSegmentToMerge ParamItem `refreshable:"true"`
SegmentSmallProportion ParamItem `refreshable:"true"`
SegmentCompactableProportion ParamItem `refreshable:"true"`
SegmentExpansionRate ParamItem `refreshable:"true"`
CompactionTimeoutInSeconds ParamItem `refreshable:"true"`
CompactionDropToleranceInSeconds ParamItem `refreshable:"true"`
CompactionGCIntervalInSeconds ParamItem `refreshable:"true"`
CompactionCheckIntervalInSeconds ParamItem `refreshable:"false"`
CompactionRPCTimeout ParamItem `refreshable:"true"`
CompactionMaxParallelTasks ParamItem `refreshable:"true"`
CompactionWorkerParalleTasks ParamItem `refreshable:"true"`
MinSegmentToMerge ParamItem `refreshable:"true"`
MaxSegmentToMerge ParamItem `refreshable:"true"`
SegmentSmallProportion ParamItem `refreshable:"true"`
SegmentCompactableProportion ParamItem `refreshable:"true"`
SegmentExpansionRate ParamItem `refreshable:"true"`
CompactionTimeoutInSeconds ParamItem `refreshable:"true"`
CompactionDropToleranceInSeconds ParamItem `refreshable:"true"`
CompactionGCIntervalInSeconds ParamItem `refreshable:"true"`
CompactionCheckIntervalInSeconds ParamItem `refreshable:"false"`
MixCompactionTriggerInterval ParamItem `refreshable:"false"`
L0CompactionTriggerInterval ParamItem `refreshable:"false"`
GlobalCompactionInterval ParamItem `refreshable:"false"`

SingleCompactionRatioThreshold ParamItem `refreshable:"true"`
SingleCompactionDeltaLogMaxSize ParamItem `refreshable:"true"`
SingleCompactionExpiredLogMaxSize ParamItem `refreshable:"true"`
SingleCompactionDeltalogMaxNum ParamItem `refreshable:"true"`
GlobalCompactionInterval ParamItem `refreshable:"false"`
ChannelCheckpointMaxLag ParamItem `refreshable:"true"`
SyncSegmentsInterval ParamItem `refreshable:"false"`

ChannelCheckpointMaxLag ParamItem `refreshable:"true"`
SyncSegmentsInterval ParamItem `refreshable:"false"`

// Clustering Compaction
ClusteringCompactionEnable ParamItem `refreshable:"true"`
Expand Down Expand Up @@ -3541,27 +3545,35 @@ During compaction, the size of segment # of rows is able to exceed segment max #
Key: "dataCoord.compaction.single.ratio.threshold",
Version: "2.0.0",
DefaultValue: "0.2",
Doc: "The ratio threshold of a segment to trigger a single compaction, default as 0.2",
Export: true,
}
p.SingleCompactionRatioThreshold.Init(base.mgr)

p.SingleCompactionDeltaLogMaxSize = ParamItem{
Key: "dataCoord.compaction.single.deltalog.maxsize",
Version: "2.0.0",
DefaultValue: strconv.Itoa(2 * 1024 * 1024),
DefaultValue: "16777216",
Doc: "The deltalog size of a segment to trigger a single compaction, default as 16MB",
Export: true,
}
p.SingleCompactionDeltaLogMaxSize.Init(base.mgr)

p.SingleCompactionExpiredLogMaxSize = ParamItem{
Key: "dataCoord.compaction.single.expiredlog.maxsize",
Version: "2.0.0",
DefaultValue: "10485760",
Doc: "The expired log size of a segment to trigger a compaction, default as 10MB",
Export: true,
}
p.SingleCompactionExpiredLogMaxSize.Init(base.mgr)

p.SingleCompactionDeltalogMaxNum = ParamItem{
Key: "dataCoord.compaction.single.deltalog.maxnum",
Version: "2.0.0",
DefaultValue: "200",
Doc: "The deltalog count of a segment to trigger a compaction, default as 200",
Export: true,
}
p.SingleCompactionDeltalogMaxNum.Init(base.mgr)

Expand All @@ -3572,6 +3584,22 @@ During compaction, the size of segment # of rows is able to exceed segment max #
}
p.GlobalCompactionInterval.Init(base.mgr)

p.MixCompactionTriggerInterval = ParamItem{
Key: "dataCoord.compaction.mix.triggerInterval",
Version: "2.4.15",
Doc: "The time interval in seconds for trigger mix compaction, default as 60s",
DefaultValue: "60",
}
p.MixCompactionTriggerInterval.Init(base.mgr)

p.L0CompactionTriggerInterval = ParamItem{
Key: "dataCoord.compaction.levelzero.triggerInterval",
Version: "2.4.15",
Doc: "The time interval in seconds for trigger L0 compaction, default as 10s",
XuanYang-cn marked this conversation as resolved.
Show resolved Hide resolved
DefaultValue: "10",
}
p.L0CompactionTriggerInterval.Init(base.mgr)

XuanYang-cn marked this conversation as resolved.
Show resolved Hide resolved
p.ChannelCheckpointMaxLag = ParamItem{
Key: "dataCoord.compaction.channelMaxCPLag",
Version: "2.4.0",
Expand Down
5 changes: 3 additions & 2 deletions tests/integration/compaction/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@ func (s *CompactionSuite) SetupSuite() {
s.MiniClusterSuite.SetupSuite()

paramtable.Init()
paramtable.Get().Save(paramtable.Get().DataCoordCfg.GlobalCompactionInterval.Key, "1")
paramtable.Get().Save(paramtable.Get().DataCoordCfg.MixCompactionTriggerInterval.Key, "1")
paramtable.Get().Save(paramtable.Get().DataCoordCfg.L0CompactionTriggerInterval.Key, "1")
}

func (s *CompactionSuite) TearDownSuite() {
s.MiniClusterSuite.TearDownSuite()

paramtable.Get().Reset(paramtable.Get().DataCoordCfg.GlobalCompactionInterval.Key)
paramtable.Get().Reset(paramtable.Get().DataCoordCfg.L0CompactionTriggerInterval.Key)
}

func TestCompaction(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/compaction/l2_single_compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,9 @@ func (s *L2SingleCompactionSuite) TestL2SingleCompaction() {
func TestL2SingleCompaction(t *testing.T) {
paramtable.Init()
// to speed up the test
paramtable.Get().Save(paramtable.Get().DataCoordCfg.GlobalCompactionInterval.Key, "10")
paramtable.Get().Save(paramtable.Get().DataCoordCfg.MixCompactionTriggerInterval.Key, "10")
paramtable.Get().Save(paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMinNum.Key, "0")
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.GlobalCompactionInterval.Key)
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.MixCompactionTriggerInterval.Key)
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMinNum.Key)

suite.Run(t, new(L2SingleCompactionSuite))
Expand Down
Loading