Skip to content

Commit

Permalink
executor: Fix query hang on hash aggregate operator (#6982)
Browse files Browse the repository at this point in the history
  • Loading branch information
zz-jason authored and shenli committed Jul 4, 2018
1 parent 1310996 commit 5601c45
Showing 1 changed file with 13 additions and 4 deletions.
17 changes: 13 additions & 4 deletions executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,8 +481,8 @@ func (w *HashAggFinalWorker) consumeIntermData(sc *stmtctx.StatementContext) (er

func (w *HashAggFinalWorker) getFinalResult(sc *stmtctx.StatementContext) {
groupIter := w.groupSet.NewIterator()
result, ok := <-w.finalResultHolderCh
if !ok {
result, finished := w.receiveFinalResultHolder()
if finished {
return
}
result.Reset()
Expand All @@ -503,15 +503,24 @@ func (w *HashAggFinalWorker) getFinalResult(sc *stmtctx.StatementContext) {
result.AppendRow(w.mutableRow.ToRow())
if result.NumRows() == w.maxChunkSize {
w.outputCh <- &AfFinalResult{chk: result, giveBackCh: w.finalResultHolderCh}
result, ok = <-w.finalResultHolderCh
if !ok {
result, finished = w.receiveFinalResultHolder()
if finished {
return
}
result.Reset()
}
}
}

func (w *HashAggFinalWorker) receiveFinalResultHolder() (*chunk.Chunk, bool) {
select {
case <-w.finishCh:
return nil, true
case result, ok := <-w.finalResultHolderCh:
return result, !ok
}
}

func (w *HashAggFinalWorker) run(ctx sessionctx.Context, waitGroup *sync.WaitGroup) {
defer func() {
waitGroup.Done()
Expand Down

0 comments on commit 5601c45

Please sign in to comment.