From 47a87c44dddd96f01b3b0a361c6ba94bf36106e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Leszczy=C5=84ski?= <2000michal@wp.pl> Date: Wed, 12 Jun 2024 12:33:34 +0200 Subject: [PATCH] refactor(repair): rename intensityHandler to intensityParallelHandler This commit also improves intensityParallelHandler.SetIntensity/SetParallel/ReplicaSetMaxIntensity comments. --- pkg/service/repair/controller_test.go | 4 ++-- pkg/service/repair/intensity_handler.go | 25 +++++++++++++++---------- pkg/service/repair/service.go | 12 ++++++------ 3 files changed, 23 insertions(+), 18 deletions(-) diff --git a/pkg/service/repair/controller_test.go b/pkg/service/repair/controller_test.go index 2ab90bead..8cc4f5048 100644 --- a/pkg/service/repair/controller_test.go +++ b/pkg/service/repair/controller_test.go @@ -29,8 +29,8 @@ func TestRowLevelRepairController_TryBlock(t *testing.T) { node5: 16, node6: 15, } - defaultIntensityHandler := func() *intensityHandler { - return &intensityHandler{ + defaultIntensityHandler := func() *intensityParallelHandler { + return &intensityParallelHandler{ logger: log.Logger{}, maxHostIntensity: maxRangesPerHost, intensity: atomic.NewInt64(int64(defaultIntensity)), diff --git a/pkg/service/repair/intensity_handler.go b/pkg/service/repair/intensity_handler.go index 4afefa0d2..b84e8c653 100644 --- a/pkg/service/repair/intensity_handler.go +++ b/pkg/service/repair/intensity_handler.go @@ -15,7 +15,7 @@ type sizeSetter interface { SetSize(size int) } -type intensityHandler struct { +type intensityParallelHandler struct { taskID uuid.UUID runID uuid.UUID logger log.Logger @@ -33,14 +33,17 @@ const ( chanSize = 10000 ) -// SetIntensity sets the value of '--intensity' flag. -func (i *intensityHandler) SetIntensity(ctx context.Context, intensity Intensity) { +// SetIntensity sets the effective '--intensity' value used by newly created repair jobs. +// It can be used to control currently running repair without stopping it and changing '--intensity' flag. +// The change is ephemeral, so it does not change the value of '--intensity' flag in SM DB, meaning that without +// additional actions, it won't be visible in next task runs. +func (i *intensityParallelHandler) SetIntensity(ctx context.Context, intensity Intensity) { i.logger.Info(ctx, "Setting repair intensity", "value", intensity, "previous", i.intensity.Load()) i.intensity.Store(int64(intensity)) } -// SetParallel sets the value of '--parallel' flag. -func (i *intensityHandler) SetParallel(ctx context.Context, parallel int) { +// SetParallel works in the same way as SetIntensity, but for '--parallel' value. +func (i *intensityParallelHandler) SetParallel(ctx context.Context, parallel int) { i.logger.Info(ctx, "Setting repair parallel", "value", parallel, "previous", i.parallel.Load()) i.parallel.Store(int64(parallel)) if parallel == defaultParallel { @@ -50,7 +53,9 @@ func (i *intensityHandler) SetParallel(ctx context.Context, parallel int) { } } -func (i *intensityHandler) ReplicaSetMaxIntensity(replicaSet []string) Intensity { +// ReplicaSetMaxIntensity returns the max amount of ranges that can be repaired in parallel on given replica set. +// It results in returning min(max_repair_ranges_in_parallel) across nodes from replica set. +func (i *intensityParallelHandler) ReplicaSetMaxIntensity(replicaSet []string) Intensity { out := NewIntensity(math.MaxInt) for _, rep := range replicaSet { if ranges := i.maxHostIntensity[rep]; ranges < out { @@ -61,21 +66,21 @@ func (i *intensityHandler) ReplicaSetMaxIntensity(replicaSet []string) Intensity } // MaxHostIntensity returns max_token_ranges_in_parallel per host. -func (i *intensityHandler) MaxHostIntensity() map[string]Intensity { +func (i *intensityParallelHandler) MaxHostIntensity() map[string]Intensity { return i.maxHostIntensity } // Intensity returns stored value for intensity. -func (i *intensityHandler) Intensity() Intensity { +func (i *intensityParallelHandler) Intensity() Intensity { return NewIntensity(int(i.intensity.Load())) } // MaxParallel returns maximal achievable parallelism. -func (i *intensityHandler) MaxParallel() int { +func (i *intensityParallelHandler) MaxParallel() int { return i.maxParallel } // Parallel returns stored value for parallel. -func (i *intensityHandler) Parallel() int { +func (i *intensityParallelHandler) Parallel() int { return int(i.parallel.Load()) } diff --git a/pkg/service/repair/service.go b/pkg/service/repair/service.go index 0abe96198..c52e34933 100644 --- a/pkg/service/repair/service.go +++ b/pkg/service/repair/service.go @@ -40,7 +40,7 @@ type Service struct { clusterSession cluster.SessionFunc logger log.Logger - intensityHandlers map[uuid.UUID]*intensityHandler + intensityHandlers map[uuid.UUID]*intensityParallelHandler mu sync.Mutex } @@ -62,7 +62,7 @@ func NewService(session gocqlx.Session, config Config, metrics metrics.RepairMet scyllaClient: scyllaClient, clusterSession: clusterSession, logger: logger, - intensityHandlers: make(map[uuid.UUID]*intensityHandler), + intensityHandlers: make(map[uuid.UUID]*intensityParallelHandler), }, nil } @@ -329,8 +329,8 @@ func (s *Service) killAllRepairs(ctx context.Context, client *scyllaclient.Clien func (s *Service) newIntensityHandler(ctx context.Context, clusterID, taskID, runID uuid.UUID, maxHostIntensity map[string]Intensity, maxParallel int, poolController sizeSetter, -) (ih *intensityHandler, cleanup func()) { - ih = &intensityHandler{ +) (ih *intensityParallelHandler, cleanup func()) { + ih = &intensityParallelHandler{ taskID: taskID, runID: runID, logger: s.logger.Named("control"), @@ -429,7 +429,7 @@ func (s *Service) SetIntensity(ctx context.Context, clusterID uuid.UUID, intensi return service.ErrValidate(errors.Errorf("setting invalid intensity value %.2f", intensity)) } ih.SetIntensity(ctx, NewIntensityFromDeprecated(intensity)) - + // Preserve applied change in SM DB, so that it will be visible in next task runs err := table.RepairRun.UpdateBuilder("intensity").Query(s.session).BindMap(qb.M{ "cluster_id": clusterID, "task_id": ih.taskID, @@ -452,7 +452,7 @@ func (s *Service) SetParallel(ctx context.Context, clusterID uuid.UUID, parallel return service.ErrValidate(errors.Errorf("setting invalid parallel value %d", parallel)) } ih.SetParallel(ctx, parallel) - + // Preserve applied change in SM DB, so that it will be visible in next task runs err := table.RepairRun.UpdateBuilder("parallel").Query(s.session).BindMap(qb.M{ "cluster_id": clusterID, "task_id": ih.taskID,