Skip to content

Commit

Permalink
executor: fix tidb crash when calling Close and Finish (#54390) (#54416)
Browse files Browse the repository at this point in the history
close #54335
  • Loading branch information
ti-chi-bot authored Jul 3, 2024
1 parent e211099 commit ae83818
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 34 deletions.
43 changes: 17 additions & 26 deletions pkg/executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,6 @@ type recordSet struct {
lastErrs []error
txnStartTS uint64
once sync.Once
// finishLock is a mutex used to synchronize access to the `Next` and `Finish` functions of the adapter.
// It ensures that only one goroutine can access the `Next` and `Finish` functions at a time, preventing race conditions.
// When we terminate the current SQL externally (e.g., kill query), an additional goroutine would be used to call the `Finish` function.
finishLock sync.Mutex
}

func (a *recordSet) Fields() []*ast.ResultField {
Expand Down Expand Up @@ -164,8 +160,6 @@ func (a *recordSet) Next(ctx context.Context, req *chunk.Chunk) (err error) {
err = util2.GetRecoverError(r)
logutil.Logger(ctx).Error("execute sql panic", zap.String("sql", a.stmt.GetTextToLog(false)), zap.Stack("stack"))
}()
a.finishLock.Lock()
defer a.finishLock.Unlock()
if a.stmt != nil {
if err := a.stmt.Ctx.GetSessionVars().SQLKiller.HandleSignal(); err != nil {
return err
Expand Down Expand Up @@ -201,27 +195,24 @@ func (a *recordSet) NewChunk(alloc chunk.Allocator) *chunk.Chunk {

func (a *recordSet) Finish() error {
var err error
if a.finishLock.TryLock() {
defer a.finishLock.Unlock()
a.once.Do(func() {
err = exec.Close(a.executor)
cteErr := resetCTEStorageMap(a.stmt.Ctx)
if cteErr != nil {
logutil.BgLogger().Error("got error when reset cte storage, should check if the spill disk file deleted or not", zap.Error(cteErr))
}
if err == nil {
err = cteErr
}
a.executor = nil
if a.stmt != nil {
status := a.stmt.Ctx.GetSessionVars().SQLKiller.GetKillSignal()
inWriteResultSet := a.stmt.Ctx.GetSessionVars().SQLKiller.InWriteResultSet.Load()
if status > 0 && inWriteResultSet {
logutil.BgLogger().Warn("kill, this SQL might be stuck in the network stack while writing packets to the client.", zap.Uint64("connection ID", a.stmt.Ctx.GetSessionVars().ConnectionID))
}
a.once.Do(func() {
err = exec.Close(a.executor)
cteErr := resetCTEStorageMap(a.stmt.Ctx)
if cteErr != nil {
logutil.BgLogger().Error("got error when reset cte storage, should check if the spill disk file deleted or not", zap.Error(cteErr))
}
if err == nil {
err = cteErr
}
a.executor = nil
if a.stmt != nil {
status := a.stmt.Ctx.GetSessionVars().SQLKiller.GetKillSignal()
inWriteResultSet := a.stmt.Ctx.GetSessionVars().SQLKiller.InWriteResultSet.Load()
if status > 0 && inWriteResultSet {
logutil.BgLogger().Warn("kill, this SQL might be stuck in the network stack while writing packets to the client.", zap.Uint64("connection ID", a.stmt.Ctx.GetSessionVars().ConnectionID))
}
})
}
}
})
if err != nil {
a.lastErrs = append(a.lastErrs, err)
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -2059,12 +2059,14 @@ func (cc *clientConn) handleStmt(
if cc.getStatus() == connStatusShutdown {
return false, exeerrors.ErrQueryInterrupted
}
cc.ctx.GetSessionVars().SQLKiller.Finish = func() {
//nolint: errcheck
rs.Finish()
}
cc.ctx.GetSessionVars().SQLKiller.SetFinishFunc(
func() {
//nolint: errcheck
rs.Finish()
})
cc.ctx.GetSessionVars().SQLKiller.InWriteResultSet.Store(true)
defer cc.ctx.GetSessionVars().SQLKiller.InWriteResultSet.Store(false)
defer cc.ctx.GetSessionVars().SQLKiller.ClearFinishFunc()
if retryable, err := cc.writeResultSet(ctx, rs, false, status, 0); err != nil {
return retryable, err
}
Expand Down
49 changes: 49 additions & 0 deletions pkg/server/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2125,3 +2125,52 @@ func TestConnAddMetrics(t *testing.T) {
cc.addMetrics(mysql.ComStmtExecute, time.Now(), nil)
re.Equal(promtestutils.ToFloat64(counter.WithLabelValues("StmtExecute", "OK", "test_rg2")), 1.0)
}

func TestIssue54335(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

// There is no underlying netCon, use failpoint to avoid panic
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/server/FakeClientConn", "return(1)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/server/FakeClientConn"))
}()
tk := testkit.NewTestKit(t, store)

connID := uint64(1)
tk.Session().SetConnectionID(connID)
tc := &TiDBContext{
Session: tk.Session(),
stmts: make(map[int]*TiDBStatement),
}
cc := &clientConn{
connectionID: connID,
server: &Server{
capability: defaultCapability,
},
alloc: arena.NewAllocator(32 * 1024),
chunkAlloc: chunk.NewAllocator(),
}
cc.SetCtx(tc)
srv := &Server{
clients: map[uint64]*clientConn{
connID: cc,
},
dom: dom,
}
handle := dom.ExpensiveQueryHandle().SetSessionManager(srv)
go handle.Run()

tk.MustExec("use test;")
tk.MustExec("CREATE TABLE testTable2 (id bigint, age int)")
str := fmt.Sprintf("insert into testTable2 values(%d, %d)", 1, 1)
tk.MustExec(str)
for i := 0; i < 14; i++ {
tk.MustExec("insert into testTable2 select * from testTable2")
}

times := 100
for ; times > 0; times-- {
// Test with -race
_ = cc.handleQuery(context.Background(), "select /*+ MAX_EXECUTION_TIME(1)*/ * FROM testTable2;")
}
}
16 changes: 14 additions & 2 deletions pkg/server/internal/resultset/resultset.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package resultset

import (
"context"
"sync"
"sync/atomic"

"github.com/pingcap/tidb/pkg/parser/terror"
Expand Down Expand Up @@ -54,24 +55,35 @@ type tidbResultSet struct {
preparedStmt *core.PlanCacheStmt
columns []*column.Info
closed int32
// finishLock is a mutex used to synchronize access to the `Next`,`Finish` and `Close` functions of the adapter.
// It ensures that only one goroutine can access the `Next`,`Finish` and `Close` functions at a time, preventing race conditions.
// When we terminate the current SQL externally (e.g., kill query), an additional goroutine would be used to call the `Finish` function.
finishLock sync.Mutex
}

func (trs *tidbResultSet) NewChunk(alloc chunk.Allocator) *chunk.Chunk {
return trs.recordSet.NewChunk(alloc)
}

func (trs *tidbResultSet) Next(ctx context.Context, req *chunk.Chunk) error {
trs.finishLock.Lock()
defer trs.finishLock.Unlock()
return trs.recordSet.Next(ctx, req)
}

func (trs *tidbResultSet) Finish() error {
if x, ok := trs.recordSet.(interface{ Finish() error }); ok {
return x.Finish()
if trs.finishLock.TryLock() {
defer trs.finishLock.Unlock()
if x, ok := trs.recordSet.(interface{ Finish() error }); ok {
return x.Finish()
}
}
return nil
}

func (trs *tidbResultSet) Close() {
trs.finishLock.Lock()
defer trs.finishLock.Unlock()
if !atomic.CompareAndSwapInt32(&trs.closed, 0, 1) {
return
}
Expand Down
24 changes: 22 additions & 2 deletions pkg/util/sqlkiller/sqlkiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package sqlkiller

import (
"math/rand"
"sync"
"sync/atomic"

"github.com/pingcap/failpoint"
Expand All @@ -41,7 +42,11 @@ const (
type SQLKiller struct {
Signal killSignal
ConnID uint64
Finish func()
// FinishFuncLock is used to ensure that Finish is not called and modified at the same time.
// An external call to the Finish function only allows when the main goroutine to be in the writeResultSet process.
// When the main goroutine exits the writeResultSet process, the Finish function will be cleared.
FinishFuncLock sync.Mutex
Finish func()
// InWriteResultSet is used to indicate whether the query is currently calling clientConn.writeResultSet().
// If the query is in writeResultSet and Finish() can acquire rs.finishLock, we can assume the query is waiting for the client to receive data from the server over network I/O.
InWriteResultSet atomic.Bool
Expand Down Expand Up @@ -80,11 +85,27 @@ func (killer *SQLKiller) getKillError(status killSignal) error {
// If a kill signal is sent but the SQL query is stuck in the network stack while writing packets to the client,
// encountering some bugs that cause it to hang, or failing to detect the kill signal, we can call Finish to release resources used during the SQL execution process.
func (killer *SQLKiller) FinishResultSet() {
killer.FinishFuncLock.Lock()
defer killer.FinishFuncLock.Unlock()
if killer.Finish != nil {
killer.Finish()
}
}

// SetFinishFunc sets the finish function.
func (killer *SQLKiller) SetFinishFunc(fn func()) {
killer.FinishFuncLock.Lock()
defer killer.FinishFuncLock.Unlock()
killer.Finish = fn
}

// ClearFinishFunc clears the finish function.1
func (killer *SQLKiller) ClearFinishFunc() {
killer.FinishFuncLock.Lock()
defer killer.FinishFuncLock.Unlock()
killer.Finish = nil
}

// HandleSignal handles the kill signal and return the error.
func (killer *SQLKiller) HandleSignal() error {
failpoint.Inject("randomPanic", func(val failpoint.Value) {
Expand Down Expand Up @@ -112,5 +133,4 @@ func (killer *SQLKiller) Reset() {
logutil.BgLogger().Warn("kill finished", zap.Uint64("conn", killer.ConnID))
}
atomic.StoreUint32(&killer.Signal, 0)
killer.Finish = nil
}

0 comments on commit ae83818

Please sign in to comment.