Skip to content

Commit

Permalink
executor: handle OOM panic which not be recovered now in distSQL layer (
Browse files Browse the repository at this point in the history
  • Loading branch information
qw4990 authored and winkyao committed May 20, 2019
1 parent f7899de commit cc0f1ce
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 4 deletions.
9 changes: 9 additions & 0 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package distsql

import (
"fmt"
"time"

"github.com/pingcap/errors"
Expand All @@ -25,8 +26,10 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tipb/go-tipb"
"go.uber.org/zap"
"golang.org/x/net/context"
)

Expand Down Expand Up @@ -80,6 +83,12 @@ func (r *selectResult) Fetch(ctx context.Context) {
func (r *selectResult) fetch(ctx context.Context) {
startTime := time.Now()
defer func() {
if c := recover(); c != nil {
err := fmt.Errorf("%v", c)
logutil.Logger(ctx).Error("OOM", zap.Error(err))
r.results <- resultWithErr{err: err}
}

close(r.results)
duration := time.Since(startTime)
metrics.DistSQLQueryHistgram.WithLabelValues(r.label, r.sqlType).Observe(duration.Seconds())
Expand Down
20 changes: 16 additions & 4 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,8 +545,8 @@ func (sender *copIteratorTaskSender) sendToTaskCh(t *copTask) (exit bool) {
return
}

func (worker *copIteratorWorker) sendToRespCh(resp *copResponse, respCh chan<- *copResponse) (exit bool) {
if worker.memTracker != nil {
func (worker *copIteratorWorker) sendToRespCh(resp *copResponse, respCh chan<- *copResponse, checkOOM bool) (exit bool) {
if worker.memTracker != nil && checkOOM {
worker.memTracker.Consume(int64(resp.MemSize()))
}
select {
Expand Down Expand Up @@ -607,12 +607,24 @@ func (it *copIterator) Next(ctx context.Context) (kv.ResultSubset, error) {

// handleTask handles single copTask, sends the result to channel, retry automatically on error.
func (worker *copIteratorWorker) handleTask(bo *Backoffer, task *copTask, respCh chan<- *copResponse) {
defer func() {
r := recover()
if r != nil {
logutil.Logger(context.Background()).Error("copIteratorWork meet panic",
zap.Reflect("r", r),
zap.Stack("stack trace"))
resp := &copResponse{err: errors.Errorf("%v", r)}
// if panic has happened, set checkOOM to false to avoid another panic.
worker.sendToRespCh(resp, task.respChan, false)
}
}()

remainTasks := []*copTask{task}
for len(remainTasks) > 0 {
tasks, err := worker.handleTaskOnce(bo, remainTasks[0], respCh)
if err != nil {
resp := &copResponse{err: errors.Trace(err)}
worker.sendToRespCh(resp, respCh)
worker.sendToRespCh(resp, respCh, true)
return
}
if len(tasks) > 0 {
Expand Down Expand Up @@ -803,7 +815,7 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, resp *copRespo
}
}
}
worker.sendToRespCh(resp, ch)
worker.sendToRespCh(resp, ch, true)
return nil, nil
}

Expand Down

0 comments on commit cc0f1ce

Please sign in to comment.