Skip to content

Commit

Permalink
workerpool: generic result type for worker pool (#46185)
Browse files Browse the repository at this point in the history
ref #46258
  • Loading branch information
tangenta authored Aug 21, 2023
1 parent 7b5f48b commit bc88e13
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 54 deletions.
11 changes: 6 additions & 5 deletions ddl/backfilling_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ type ingestBackfillScheduler struct {

copReqSenderPool *copReqSenderPool

writerPool *workerpool.WorkerPool[idxRecResult]
writerPool *workerpool.WorkerPool[idxRecResult, workerpool.None]
writerMaxID int
poolErr chan error
backendCtx ingest.BackendCtx
Expand Down Expand Up @@ -308,12 +308,12 @@ func (b *ingestBackfillScheduler) setupWorkers() error {
}
b.copReqSenderPool = copReqSenderPool
readerCnt, writerCnt := b.expectedWorkerSize()
skipReg := workerpool.OptionSkipRegister[idxRecResult]{}
writerPool, err := workerpool.NewWorkerPool[idxRecResult]("ingest_writer",
poolutil.DDL, writerCnt, b.createWorker, skipReg)
poolutil.DDL, writerCnt, b.createWorker)
if err != nil {
return errors.Trace(err)
}
writerPool.Start()
b.writerPool = writerPool
b.copReqSenderPool.chunkSender = writerPool
b.copReqSenderPool.adjustSize(readerCnt)
Expand Down Expand Up @@ -382,7 +382,7 @@ func (b *ingestBackfillScheduler) adjustWorkerSize() error {
return nil
}

func (b *ingestBackfillScheduler) createWorker() workerpool.Worker[idxRecResult] {
func (b *ingestBackfillScheduler) createWorker() workerpool.Worker[idxRecResult, workerpool.None] {
reorgInfo := b.reorgInfo
job := reorgInfo.Job
sessCtx, err := newSessCtx(reorgInfo)
Expand Down Expand Up @@ -447,7 +447,7 @@ func (*ingestBackfillScheduler) expectedWorkerSize() (readerSize int, writerSize
return readerSize, writerSize
}

func (w *addIndexIngestWorker) HandleTask(rs idxRecResult) {
func (w *addIndexIngestWorker) HandleTask(rs idxRecResult) (_ workerpool.None) {
defer util.Recover(metrics.LabelDDL, "ingestWorker.HandleTask", func() {
w.resultCh <- &backfillResult{taskID: rs.id, err: dbterror.ErrReorgPanic}
}, false)
Expand Down Expand Up @@ -494,6 +494,7 @@ func (w *addIndexIngestWorker) HandleTask(rs idxRecResult) {
ResultCounterForTest.Add(1)
}
w.resultCh <- result
return
}

func (*addIndexIngestWorker) Close() {}
Expand Down
8 changes: 5 additions & 3 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2350,7 +2350,7 @@ func getCheckSum(ctx context.Context, se sessionctx.Context, sql string) ([]grou
}

// HandleTask implements the Worker interface.
func (w *checkIndexWorker) HandleTask(task checkIndexTask) {
func (w *checkIndexWorker) HandleTask(task checkIndexTask) (_ workerpool.None) {
defer w.e.wg.Done()
idxInfo := w.indexInfos[task.indexOffset]
bucketSize := int(CheckTableFastBucketSize.Load())
Expand Down Expand Up @@ -2688,12 +2688,13 @@ func (w *checkIndexWorker) HandleTask(task checkIndexTask) {
}
}
}
return
}

// Close implements the Worker interface.
func (*checkIndexWorker) Close() {}

func (e *FastCheckTableExec) createWorker() workerpool.Worker[checkIndexTask] {
func (e *FastCheckTableExec) createWorker() workerpool.Worker[checkIndexTask, workerpool.None] {
return &checkIndexWorker{sctx: e.Ctx(), dbName: e.dbName, table: e.table, indexInfos: e.indexInfos, e: e}
}

Expand All @@ -2711,10 +2712,11 @@ func (e *FastCheckTableExec) Next(context.Context, *chunk.Chunk) error {
}()

workerPool, err := workerpool.NewWorkerPool[checkIndexTask]("checkIndex",
poolutil.CheckTable, 3, e.createWorker, workerpool.OptionSkipRegister[checkIndexTask]{})
poolutil.CheckTable, 3, e.createWorker)
if err != nil {
return errors.Trace(err)
}
workerPool.Start()

e.wg.Add(len(e.indexInfos))
for i := range e.indexInfos {
Expand Down
3 changes: 2 additions & 1 deletion resourcemanager/pool/workerpool/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//metrics",
"//resourcemanager",
"//resourcemanager/util",
"//util",
"//util/syncutil",
Expand All @@ -25,11 +24,13 @@ go_test(
embed = [":workerpool"],
flaky = True,
race = "on",
shard_count = 3,
deps = [
"//resourcemanager/util",
"//testkit/testsetup",
"//util/logutil",
"@com_github_stretchr_testify//require",
"@org_golang_x_sync//errgroup",
"@org_uber_go_goleak//:goleak",
"@org_uber_go_zap//:zap",
],
Expand Down
98 changes: 58 additions & 40 deletions resourcemanager/pool/workerpool/workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,93 +18,106 @@ import (
"time"

"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/resourcemanager"
"github.com/pingcap/tidb/resourcemanager/util"
tidbutil "github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/syncutil"
atomicutil "go.uber.org/atomic"
)

// Worker is worker interface.
type Worker[T any] interface {
HandleTask(task T)
type Worker[T, R any] interface {
HandleTask(task T) R
Close()
}

// WorkerPool is a pool of workers.
type WorkerPool[T any] struct {
type WorkerPool[T, R any] struct {
name string
numWorkers int32
originWorkers int32
runningTask atomicutil.Int32
taskChan chan T
resChan chan R
quitChan chan struct{}
wg tidbutil.WaitGroupWrapper
createWorker func() Worker[T]
createWorker func() Worker[T, R]
lastTuneTs atomicutil.Time
mu syncutil.RWMutex
skipRegister bool
}

// Option is the config option for WorkerPool.
type Option[T any] interface {
Apply(pool *WorkerPool[T])
type Option[T, R any] interface {
Apply(pool *WorkerPool[T, R])
}

// OptionSkipRegister is an option to skip register the worker pool to resource manager.
type OptionSkipRegister[T any] struct{}

// Apply implements the Option interface.
func (OptionSkipRegister[T]) Apply(pool *WorkerPool[T]) {
pool.skipRegister = true
}
// None is a type placeholder for the worker pool that does not have a result receiver.
type None struct{}

// NewWorkerPool creates a new worker pool.
func NewWorkerPool[T any](name string, component util.Component, numWorkers int,
createWorker func() Worker[T], opts ...Option[T]) (*WorkerPool[T], error) {
func NewWorkerPool[T, R any](name string, _ util.Component, numWorkers int,
createWorker func() Worker[T, R], opts ...Option[T, R]) (*WorkerPool[T, R], error) {
if numWorkers <= 0 {
numWorkers = 1
}

p := &WorkerPool[T]{
p := &WorkerPool[T, R]{
name: name,
numWorkers: int32(numWorkers),
originWorkers: int32(numWorkers),
taskChan: make(chan T),
quitChan: make(chan struct{}),
createWorker: createWorker,
}

for _, opt := range opts {
opt.Apply(p)
}

if !p.skipRegister {
err := resourcemanager.InstanceResourceManager.Register(p, name, component)
if err != nil {
return nil, err
p.createWorker = createWorker
return p, nil
}

// SetTaskReceiver sets the task receiver for the pool.
func (p *WorkerPool[T, R]) SetTaskReceiver(recv chan T) {
p.taskChan = recv
}

// SetResultSender sets the result sender for the pool.
func (p *WorkerPool[T, R]) SetResultSender(sender chan R) {
p.resChan = sender
}

// Start starts default count of workers.
func (p *WorkerPool[T, R]) Start() {
if p.taskChan == nil {
p.taskChan = make(chan T)
}

if p.resChan == nil {
var zero R
var r interface{} = zero
if _, ok := r.(None); !ok {
p.resChan = make(chan R)
}
}

// Start default count of workers.
for i := 0; i < int(p.numWorkers); i++ {
p.runAWorker()
}

return p, nil
}

func (p *WorkerPool[T]) handleTaskWithRecover(w Worker[T], task T) {
func (p *WorkerPool[T, R]) handleTaskWithRecover(w Worker[T, R], task T) {
p.runningTask.Add(1)
defer func() {
p.runningTask.Add(-1)
}()
defer tidbutil.Recover(metrics.LabelWorkerPool, "handleTaskWithRecover", nil, false)

w.HandleTask(task)
r := w.HandleTask(task)
if p.resChan != nil {
p.resChan <- r
}
}

func (p *WorkerPool[T]) runAWorker() {
func (p *WorkerPool[T, R]) runAWorker() {
w := p.createWorker()
if w == nil {
return // Fail to create worker, quit.
Expand All @@ -123,12 +136,17 @@ func (p *WorkerPool[T]) runAWorker() {
}

// AddTask adds a task to the pool.
func (p *WorkerPool[T]) AddTask(task T) {
func (p *WorkerPool[T, R]) AddTask(task T) {
p.taskChan <- task
}

// GetResultChan gets the result channel from the pool.
func (p *WorkerPool[T, R]) GetResultChan() <-chan R {
return p.resChan
}

// Tune tunes the pool to the specified number of workers.
func (p *WorkerPool[T]) Tune(numWorkers int32) {
func (p *WorkerPool[T, R]) Tune(numWorkers int32) {
if numWorkers <= 0 {
numWorkers = 1
}
Expand All @@ -151,37 +169,37 @@ func (p *WorkerPool[T]) Tune(numWorkers int32) {
}

// LastTunerTs returns the last time when the pool was tuned.
func (p *WorkerPool[T]) LastTunerTs() time.Time {
func (p *WorkerPool[T, R]) LastTunerTs() time.Time {
return p.lastTuneTs.Load()
}

// Cap returns the capacity of the pool.
func (p *WorkerPool[T]) Cap() int32 {
func (p *WorkerPool[T, R]) Cap() int32 {
p.mu.RLock()
defer p.mu.RUnlock()
return p.numWorkers
}

// Running returns the number of running workers.
func (p *WorkerPool[T]) Running() int32 {
func (p *WorkerPool[T, R]) Running() int32 {
return p.runningTask.Load()
}

// Name returns the name of the pool.
func (p *WorkerPool[T]) Name() string {
func (p *WorkerPool[T, R]) Name() string {
return p.name
}

// ReleaseAndWait releases the pool and wait for complete.
func (p *WorkerPool[T]) ReleaseAndWait() {
func (p *WorkerPool[T, R]) ReleaseAndWait() {
close(p.quitChan)
p.wg.Wait()
if !p.skipRegister {
resourcemanager.InstanceResourceManager.Unregister(p.Name())
if p.resChan != nil {
close(p.resChan)
}
}

// GetOriginConcurrency return the concurrency of the pool at the init.
func (p *WorkerPool[T]) GetOriginConcurrency() int32 {
func (p *WorkerPool[T, R]) GetOriginConcurrency() int32 {
return p.originWorkers
}
Loading

0 comments on commit bc88e13

Please sign in to comment.