Skip to content

Commit

Permalink
executor: add concurrency limit on union executor (#19827) (#19886)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot authored Sep 9, 2020
1 parent 125258b commit be055a3
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 30 deletions.
1 change: 1 addition & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1588,6 +1588,7 @@ func (b *executorBuilder) buildUnionAll(v *plannercore.PhysicalUnionAll) Executo
}
e := &UnionExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID(), childExecs...),
concurrency: b.ctx.GetSessionVars().Concurrency.UnionConcurrency,
}
return e
}
Expand Down
77 changes: 47 additions & 30 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1405,16 +1405,18 @@ func (e *MaxOneRowExec) Next(ctx context.Context, req *chunk.Chunk) error {
// +-------------+
type UnionExec struct {
baseExecutor
concurrency int
childIDChan chan int

stopFetchData atomic.Value

finished chan struct{}
resourcePools []chan *chunk.Chunk
resultPool chan *unionWorkerResult

childrenResults []*chunk.Chunk
wg sync.WaitGroup
initialized bool
results []*chunk.Chunk
wg sync.WaitGroup
initialized bool
}

// unionWorkerResult stores the result for a union worker.
Expand All @@ -1437,32 +1439,40 @@ func (e *UnionExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return err
}
for _, child := range e.children {
e.childrenResults = append(e.childrenResults, newFirstChunk(child))
}
e.stopFetchData.Store(false)
e.initialized = false
e.finished = make(chan struct{})
return nil
}

func (e *UnionExec) initialize(ctx context.Context) {
e.resultPool = make(chan *unionWorkerResult, len(e.children))
e.resourcePools = make([]chan *chunk.Chunk, len(e.children))
for i := range e.children {
if e.concurrency > len(e.children) {
e.concurrency = len(e.children)
}
for i := 0; i < e.concurrency; i++ {
e.results = append(e.results, newFirstChunk(e.children[0]))
}
e.resultPool = make(chan *unionWorkerResult, e.concurrency)
e.resourcePools = make([]chan *chunk.Chunk, e.concurrency)
e.childIDChan = make(chan int, len(e.children))
for i := 0; i < e.concurrency; i++ {
e.resourcePools[i] = make(chan *chunk.Chunk, 1)
e.resourcePools[i] <- e.childrenResults[i]
e.resourcePools[i] <- e.results[i]
e.wg.Add(1)
go e.resultPuller(ctx, i)
}
for i := 0; i < len(e.children); i++ {
e.childIDChan <- i
}
close(e.childIDChan)
go e.waitAllFinished()
}

func (e *UnionExec) resultPuller(ctx context.Context, childID int) {
func (e *UnionExec) resultPuller(ctx context.Context, workerID int) {
result := &unionWorkerResult{
err: nil,
chk: nil,
src: e.resourcePools[childID],
src: e.resourcePools[workerID],
}
defer func() {
if r := recover(); r != nil {
Expand All @@ -1476,23 +1486,26 @@ func (e *UnionExec) resultPuller(ctx context.Context, childID int) {
}
e.wg.Done()
}()
for {
if e.stopFetchData.Load().(bool) {
return
}
select {
case <-e.finished:
return
case result.chk = <-e.resourcePools[childID]:
}
result.err = Next(ctx, e.children[childID], result.chk)
if result.err == nil && result.chk.NumRows() == 0 {
return
}
e.resultPool <- result
if result.err != nil {
e.stopFetchData.Store(true)
return
for childID := range e.childIDChan {
for {
if e.stopFetchData.Load().(bool) {
return
}
select {
case <-e.finished:
return
case result.chk = <-e.resourcePools[workerID]:
}
result.err = Next(ctx, e.children[childID], result.chk)
if result.err == nil && result.chk.NumRows() == 0 {
e.resourcePools[workerID] <- result.chk
break
}
e.resultPool <- result
if result.err != nil {
e.stopFetchData.Store(true)
return
}
}
}
}
Expand Down Expand Up @@ -1522,12 +1535,16 @@ func (e *UnionExec) Close() error {
if e.finished != nil {
close(e.finished)
}
e.childrenResults = nil
e.results = nil
if e.resultPool != nil {
for range e.resultPool {
}
}
e.resourcePools = nil
if e.childIDChan != nil {
for range e.childIDChan {
}
}
return e.baseExecutor.Close()
}

Expand Down
12 changes: 12 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1342,6 +1342,18 @@ func (s *testSuiteP2) TestUnion(c *C) {
tk.MustQuery("select count(distinct a), sum(distinct a), avg(distinct a) from (select a from t union all select b from t) tmp;").Check(testkit.Rows("1 1.000 1.0000000"))
}

func (s *testSuite2) TestUnionLimit(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists union_limit")
tk.MustExec("create table union_limit (id int) partition by hash(id) partitions 30")
for i := 0; i < 60; i++ {
tk.MustExec(fmt.Sprintf("insert into union_limit values (%d)", i))
}
// Cover the code for worker count limit in the union executor.
tk.MustQuery("select * from union_limit limit 10")
}

func (s *testSuiteP1) TestNeighbouringProj(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
1 change: 1 addition & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1934,6 +1934,7 @@ var builtinGlobalVariable = []string{
variable.TiDBHashAggPartialConcurrency,
variable.TiDBHashAggFinalConcurrency,
variable.TiDBWindowConcurrency,
variable.TiDBUnionConcurrency,
variable.TiDBBackoffLockFast,
variable.TiDBBackOffWeight,
variable.TiDBConstraintCheckInPlace,
Expand Down
6 changes: 6 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,7 @@ func NewSessionVars() *SessionVars {
HashAggPartialConcurrency: DefTiDBHashAggPartialConcurrency,
HashAggFinalConcurrency: DefTiDBHashAggFinalConcurrency,
WindowConcurrency: DefTiDBWindowConcurrency,
UnionConcurrency: DefTiDBUnionConcurrency,
}
vars.MemQuota = MemQuota{
MemQuotaQuery: config.GetGlobalConfig().MemQuotaQuery,
Expand Down Expand Up @@ -1160,6 +1161,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
s.HashAggFinalConcurrency = tidbOptPositiveInt32(val, DefTiDBHashAggFinalConcurrency)
case TiDBWindowConcurrency:
s.WindowConcurrency = tidbOptPositiveInt32(val, DefTiDBWindowConcurrency)
case TiDBUnionConcurrency:
s.UnionConcurrency = tidbOptPositiveInt32(val, DefTiDBUnionConcurrency)
case TiDBDistSQLScanConcurrency:
s.DistSQLScanConcurrency = tidbOptPositiveInt32(val, DefDistSQLScanConcurrency)
case TiDBIndexSerialScanConcurrency:
Expand Down Expand Up @@ -1459,6 +1462,9 @@ type Concurrency struct {

// IndexSerialScanConcurrency is the number of concurrent index serial scan worker.
IndexSerialScanConcurrency int

// UnionConcurrency is the number of concurrent union worker.
UnionConcurrency int
}

// MemQuota defines memory quota values.
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,7 @@ var defaultSysVars = []*SysVar{
{ScopeGlobal | ScopeSession, TiDBHashAggPartialConcurrency, strconv.Itoa(DefTiDBHashAggPartialConcurrency)},
{ScopeGlobal | ScopeSession, TiDBHashAggFinalConcurrency, strconv.Itoa(DefTiDBHashAggFinalConcurrency)},
{ScopeGlobal | ScopeSession, TiDBWindowConcurrency, strconv.Itoa(DefTiDBWindowConcurrency)},
{ScopeGlobal | ScopeSession, TiDBUnionConcurrency, strconv.Itoa(DefTiDBUnionConcurrency)},
{ScopeGlobal | ScopeSession, TiDBBackoffLockFast, strconv.Itoa(kv.DefBackoffLockFast)},
{ScopeGlobal | ScopeSession, TiDBBackOffWeight, strconv.Itoa(kv.DefBackOffWeight)},
{ScopeGlobal | ScopeSession, TiDBRetryLimit, strconv.Itoa(DefTiDBRetryLimit)},
Expand Down
4 changes: 4 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,9 @@ const (
// tidb_window_concurrency is used for window parallel executor.
TiDBWindowConcurrency = "tidb_window_concurrency"

// tidb_union_concurrency is used for union executor.
TiDBUnionConcurrency = "tidb_union_concurrency"

// tidb_backoff_lock_fast is used for tikv backoff base time in milliseconds.
TiDBBackoffLockFast = "tidb_backoff_lock_fast"

Expand Down Expand Up @@ -480,6 +483,7 @@ const (
DefTiDBHashAggPartialConcurrency = 4
DefTiDBHashAggFinalConcurrency = 4
DefTiDBWindowConcurrency = 4
DefTiDBUnionConcurrency = 4
DefTiDBForcePriority = mysql.NoPriority
DefTiDBUseRadixJoin = false
DefEnableWindowFunction = true
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,7 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string, scope Sc
TiDBHashAggPartialConcurrency,
TiDBHashAggFinalConcurrency,
TiDBWindowConcurrency,
TiDBUnionConcurrency,
TiDBDistSQLScanConcurrency,
TiDBIndexSerialScanConcurrency, TiDBDDLReorgWorkerCount,
TiDBBackoffLockFast, TiDBBackOffWeight,
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/varsutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func (s *testVarsutilSuite) TestNewSessionVars(c *C) {
c.Assert(vars.HashAggPartialConcurrency, Equals, DefTiDBHashAggPartialConcurrency)
c.Assert(vars.HashAggFinalConcurrency, Equals, DefTiDBHashAggFinalConcurrency)
c.Assert(vars.WindowConcurrency, Equals, DefTiDBWindowConcurrency)
c.Assert(vars.UnionConcurrency, Equals, DefTiDBUnionConcurrency)
c.Assert(vars.DistSQLScanConcurrency, Equals, DefDistSQLScanConcurrency)
c.Assert(vars.MaxChunkSize, Equals, DefMaxChunkSize)
c.Assert(vars.DMLBatchSize, Equals, DefDMLBatchSize)
Expand Down

0 comments on commit be055a3

Please sign in to comment.