From 801022fae6ade2f7c54da751749bbe4c7f83a0f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Leszczy=C5=84ski?= <2000michal@wp.pl> Date: Mon, 27 May 2024 13:03:09 +0200 Subject: [PATCH] refactor(repair): move intensityHandler to separate file --- pkg/service/repair/intensity_handler.go | 81 +++++++++++++++++++++++++ pkg/service/repair/service.go | 70 --------------------- 2 files changed, 81 insertions(+), 70 deletions(-) create mode 100644 pkg/service/repair/intensity_handler.go diff --git a/pkg/service/repair/intensity_handler.go b/pkg/service/repair/intensity_handler.go new file mode 100644 index 000000000..4afefa0d2 --- /dev/null +++ b/pkg/service/repair/intensity_handler.go @@ -0,0 +1,81 @@ +// Copyright (C) 2024 ScyllaDB + +package repair + +import ( + "context" + "math" + + "github.com/scylladb/go-log" + "github.com/scylladb/scylla-manager/v3/pkg/util/uuid" + "go.uber.org/atomic" +) + +type sizeSetter interface { + SetSize(size int) +} + +type intensityHandler struct { + taskID uuid.UUID + runID uuid.UUID + logger log.Logger + maxHostIntensity map[string]Intensity + intensity *atomic.Int64 + maxParallel int + parallel *atomic.Int64 + poolController sizeSetter +} + +const ( + maxIntensity Intensity = 0 + defaultIntensity Intensity = 1 + defaultParallel = 0 + chanSize = 10000 +) + +// SetIntensity sets the value of '--intensity' flag. +func (i *intensityHandler) SetIntensity(ctx context.Context, intensity Intensity) { + i.logger.Info(ctx, "Setting repair intensity", "value", intensity, "previous", i.intensity.Load()) + i.intensity.Store(int64(intensity)) +} + +// SetParallel sets the value of '--parallel' flag. +func (i *intensityHandler) SetParallel(ctx context.Context, parallel int) { + i.logger.Info(ctx, "Setting repair parallel", "value", parallel, "previous", i.parallel.Load()) + i.parallel.Store(int64(parallel)) + if parallel == defaultParallel { + i.poolController.SetSize(i.maxParallel) + } else { + i.poolController.SetSize(parallel) + } +} + +func (i *intensityHandler) ReplicaSetMaxIntensity(replicaSet []string) Intensity { + out := NewIntensity(math.MaxInt) + for _, rep := range replicaSet { + if ranges := i.maxHostIntensity[rep]; ranges < out { + out = ranges + } + } + return out +} + +// MaxHostIntensity returns max_token_ranges_in_parallel per host. +func (i *intensityHandler) MaxHostIntensity() map[string]Intensity { + return i.maxHostIntensity +} + +// Intensity returns stored value for intensity. +func (i *intensityHandler) Intensity() Intensity { + return NewIntensity(int(i.intensity.Load())) +} + +// MaxParallel returns maximal achievable parallelism. +func (i *intensityHandler) MaxParallel() int { + return i.maxParallel +} + +// Parallel returns stored value for parallel. +func (i *intensityHandler) Parallel() int { + return int(i.parallel.Load()) +} diff --git a/pkg/service/repair/service.go b/pkg/service/repair/service.go index 019a5b230..3b2590244 100644 --- a/pkg/service/repair/service.go +++ b/pkg/service/repair/service.go @@ -6,7 +6,6 @@ import ( "context" "encoding/json" "fmt" - "math" "strings" "sync" "time" @@ -453,72 +452,3 @@ func (s *Service) SetParallel(ctx context.Context, clusterID uuid.UUID, parallel }).ExecRelease() return errors.Wrap(err, "update db") } - -type sizeSetter interface { - SetSize(size int) -} - -type intensityHandler struct { - taskID uuid.UUID - runID uuid.UUID - logger log.Logger - maxHostIntensity map[string]Intensity - intensity *atomic.Int64 - maxParallel int - parallel *atomic.Int64 - poolController sizeSetter -} - -const ( - maxIntensity Intensity = 0 - defaultIntensity Intensity = 1 - defaultParallel = 0 - chanSize = 10000 -) - -// SetIntensity sets the value of '--intensity' flag. -func (i *intensityHandler) SetIntensity(ctx context.Context, intensity Intensity) { - i.logger.Info(ctx, "Setting repair intensity", "value", intensity, "previous", i.intensity.Load()) - i.intensity.Store(int64(intensity)) -} - -// SetParallel sets the value of '--parallel' flag. -func (i *intensityHandler) SetParallel(ctx context.Context, parallel int) { - i.logger.Info(ctx, "Setting repair parallel", "value", parallel, "previous", i.parallel.Load()) - i.parallel.Store(int64(parallel)) - if parallel == defaultParallel { - i.poolController.SetSize(i.maxParallel) - } else { - i.poolController.SetSize(parallel) - } -} - -func (i *intensityHandler) ReplicaSetMaxIntensity(replicaSet []string) Intensity { - out := NewIntensity(math.MaxInt) - for _, rep := range replicaSet { - if ranges := i.maxHostIntensity[rep]; ranges < out { - out = ranges - } - } - return out -} - -// MaxHostIntensity returns max_token_ranges_in_parallel per host. -func (i *intensityHandler) MaxHostIntensity() map[string]Intensity { - return i.maxHostIntensity -} - -// Intensity returns stored value for intensity. -func (i *intensityHandler) Intensity() Intensity { - return NewIntensity(int(i.intensity.Load())) -} - -// MaxParallel returns maximal achievable parallelism. -func (i *intensityHandler) MaxParallel() int { - return i.maxParallel -} - -// Parallel returns stored value for parallel. -func (i *intensityHandler) Parallel() int { - return int(i.parallel.Load()) -}