Skip to content

Commit

Permalink
fix region worker run once
Browse files Browse the repository at this point in the history
  • Loading branch information
amyangfei committed Mar 5, 2021
1 parent be8fa25 commit c40f154
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 15 deletions.
10 changes: 10 additions & 0 deletions cdc/kv/client_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,11 @@ func BenchmarkResolvedTsClientV1(b *testing.B) {
}

func BenchmarkResolvedTsClientV2(b *testing.B) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
RunWorkerPool(ctx) //nolint:errcheck
}()
benchmarkSingleWorkerResolvedTs(b, true /* clientV2 */)
}

Expand Down Expand Up @@ -494,5 +499,10 @@ func BenchmarkMultiStoreResolvedTsClientV1(b *testing.B) {
}

func BenchmarkMultiStoreResolvedTsClientV2(b *testing.B) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
RunWorkerPool(ctx) //nolint:errcheck
}()
benchmarkMultipleStoreResolvedTs(b, true /* clientV2 */)
}
10 changes: 3 additions & 7 deletions cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,10 @@ import (
"google.golang.org/grpc"
)

var kvClientTestOnce sync.Once

func Test(t *testing.T) {
kvClientTestOnce.Do(func() {
go func() {
RunWorkerPool(context.Background()) //nolint:errcheck
}()
})
go func() {
RunWorkerPool(context.Background()) //nolint:errcheck
}()
check.TestingT(t)
}

Expand Down
23 changes: 15 additions & 8 deletions cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ import (
"golang.org/x/time/rate"
)

var regionWorkerPool workerpool.WorkerPool
var (
regionWorkerPool workerpool.WorkerPool
workerPoolOnce sync.Once
)

const (
minRegionStateBucket = 4
Expand Down Expand Up @@ -563,12 +566,16 @@ func RunWorkerPool(ctx context.Context) error {
if !enableKVClientV2 {
return nil
}
// TODO: make the pool size configurable
size := runtime.NumCPU()
regionWorkerPool = workerpool.NewDefaultWorkerPool(size)
errg, ctx := errgroup.WithContext(ctx)
errg.Go(func() error {
return errors.Trace(regionWorkerPool.Run(ctx))
var err error
workerPoolOnce.Do(func() {
// TODO: make the pool size configurable
size := runtime.NumCPU()
regionWorkerPool = workerpool.NewDefaultWorkerPool(size)
errg, ctx := errgroup.WithContext(ctx)
errg.Go(func() error {
return errors.Trace(regionWorkerPool.Run(ctx))
})
err = errg.Wait()
})
return errors.Trace(errg.Wait())
return err
}

0 comments on commit c40f154

Please sign in to comment.