Skip to content

Commit

Permalink
refactor(repair): rename intensityHandler to intensityParallelHandler
Browse files Browse the repository at this point in the history
This commit also improves intensityParallelHandler.SetIntensity/SetParallel/ReplicaSetMaxIntensity comments.
  • Loading branch information
Michal-Leszczynski authored and karol-kokoszka committed Jun 20, 2024
1 parent beb9106 commit 47a87c4
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 18 deletions.
4 changes: 2 additions & 2 deletions pkg/service/repair/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ func TestRowLevelRepairController_TryBlock(t *testing.T) {
node5: 16,
node6: 15,
}
defaultIntensityHandler := func() *intensityHandler {
return &intensityHandler{
defaultIntensityHandler := func() *intensityParallelHandler {
return &intensityParallelHandler{
logger: log.Logger{},
maxHostIntensity: maxRangesPerHost,
intensity: atomic.NewInt64(int64(defaultIntensity)),
Expand Down
25 changes: 15 additions & 10 deletions pkg/service/repair/intensity_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type sizeSetter interface {
SetSize(size int)
}

type intensityHandler struct {
type intensityParallelHandler struct {
taskID uuid.UUID
runID uuid.UUID
logger log.Logger
Expand All @@ -33,14 +33,17 @@ const (
chanSize = 10000
)

// SetIntensity sets the value of '--intensity' flag.
func (i *intensityHandler) SetIntensity(ctx context.Context, intensity Intensity) {
// SetIntensity sets the effective '--intensity' value used by newly created repair jobs.
// It can be used to control currently running repair without stopping it and changing '--intensity' flag.
// The change is ephemeral, so it does not change the value of '--intensity' flag in SM DB, meaning that without
// additional actions, it won't be visible in next task runs.
func (i *intensityParallelHandler) SetIntensity(ctx context.Context, intensity Intensity) {
i.logger.Info(ctx, "Setting repair intensity", "value", intensity, "previous", i.intensity.Load())
i.intensity.Store(int64(intensity))
}

// SetParallel sets the value of '--parallel' flag.
func (i *intensityHandler) SetParallel(ctx context.Context, parallel int) {
// SetParallel works in the same way as SetIntensity, but for '--parallel' value.
func (i *intensityParallelHandler) SetParallel(ctx context.Context, parallel int) {
i.logger.Info(ctx, "Setting repair parallel", "value", parallel, "previous", i.parallel.Load())
i.parallel.Store(int64(parallel))
if parallel == defaultParallel {
Expand All @@ -50,7 +53,9 @@ func (i *intensityHandler) SetParallel(ctx context.Context, parallel int) {
}
}

func (i *intensityHandler) ReplicaSetMaxIntensity(replicaSet []string) Intensity {
// ReplicaSetMaxIntensity returns the max amount of ranges that can be repaired in parallel on given replica set.
// It results in returning min(max_repair_ranges_in_parallel) across nodes from replica set.
func (i *intensityParallelHandler) ReplicaSetMaxIntensity(replicaSet []string) Intensity {
out := NewIntensity(math.MaxInt)
for _, rep := range replicaSet {
if ranges := i.maxHostIntensity[rep]; ranges < out {
Expand All @@ -61,21 +66,21 @@ func (i *intensityHandler) ReplicaSetMaxIntensity(replicaSet []string) Intensity
}

// MaxHostIntensity returns max_token_ranges_in_parallel per host.
func (i *intensityHandler) MaxHostIntensity() map[string]Intensity {
func (i *intensityParallelHandler) MaxHostIntensity() map[string]Intensity {
return i.maxHostIntensity
}

// Intensity returns stored value for intensity.
func (i *intensityHandler) Intensity() Intensity {
func (i *intensityParallelHandler) Intensity() Intensity {
return NewIntensity(int(i.intensity.Load()))
}

// MaxParallel returns maximal achievable parallelism.
func (i *intensityHandler) MaxParallel() int {
func (i *intensityParallelHandler) MaxParallel() int {
return i.maxParallel
}

// Parallel returns stored value for parallel.
func (i *intensityHandler) Parallel() int {
func (i *intensityParallelHandler) Parallel() int {
return int(i.parallel.Load())
}
12 changes: 6 additions & 6 deletions pkg/service/repair/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type Service struct {
clusterSession cluster.SessionFunc
logger log.Logger

intensityHandlers map[uuid.UUID]*intensityHandler
intensityHandlers map[uuid.UUID]*intensityParallelHandler
mu sync.Mutex
}

Expand All @@ -62,7 +62,7 @@ func NewService(session gocqlx.Session, config Config, metrics metrics.RepairMet
scyllaClient: scyllaClient,
clusterSession: clusterSession,
logger: logger,
intensityHandlers: make(map[uuid.UUID]*intensityHandler),
intensityHandlers: make(map[uuid.UUID]*intensityParallelHandler),
}, nil
}

Expand Down Expand Up @@ -329,8 +329,8 @@ func (s *Service) killAllRepairs(ctx context.Context, client *scyllaclient.Clien

func (s *Service) newIntensityHandler(ctx context.Context, clusterID, taskID, runID uuid.UUID,
maxHostIntensity map[string]Intensity, maxParallel int, poolController sizeSetter,
) (ih *intensityHandler, cleanup func()) {
ih = &intensityHandler{
) (ih *intensityParallelHandler, cleanup func()) {
ih = &intensityParallelHandler{
taskID: taskID,
runID: runID,
logger: s.logger.Named("control"),
Expand Down Expand Up @@ -429,7 +429,7 @@ func (s *Service) SetIntensity(ctx context.Context, clusterID uuid.UUID, intensi
return service.ErrValidate(errors.Errorf("setting invalid intensity value %.2f", intensity))
}
ih.SetIntensity(ctx, NewIntensityFromDeprecated(intensity))

// Preserve applied change in SM DB, so that it will be visible in next task runs
err := table.RepairRun.UpdateBuilder("intensity").Query(s.session).BindMap(qb.M{
"cluster_id": clusterID,
"task_id": ih.taskID,
Expand All @@ -452,7 +452,7 @@ func (s *Service) SetParallel(ctx context.Context, clusterID uuid.UUID, parallel
return service.ErrValidate(errors.Errorf("setting invalid parallel value %d", parallel))
}
ih.SetParallel(ctx, parallel)

// Preserve applied change in SM DB, so that it will be visible in next task runs
err := table.RepairRun.UpdateBuilder("parallel").Query(s.session).BindMap(qb.M{
"cluster_id": clusterID,
"task_id": ih.taskID,
Expand Down

0 comments on commit 47a87c4

Please sign in to comment.