-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
server: handle kill signal during write result to connection #52882
Changes from 16 commits
e17def0
3e8ea59
a822d8d
719d49b
695292d
f2e0dfa
0860b2a
8e9e718
d657643
0c3649d
f662fa1
ee96580
601de00
1587299
372f8fa
4159a99
4868d30
43b6052
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -90,17 +90,23 @@ type processinfoSetter interface { | |||||||||||||
|
||||||||||||||
// recordSet wraps an executor, implements sqlexec.RecordSet interface | ||||||||||||||
type recordSet struct { | ||||||||||||||
fields []*ast.ResultField | ||||||||||||||
executor exec.Executor | ||||||||||||||
fields []*ast.ResultField | ||||||||||||||
executor exec.Executor | ||||||||||||||
// `Fields` maybe call after `Close`, and executor will clear in `Close` function, so we need to store the schema in recordSet to avoid null pointer exception. | ||||||||||||||
schema *expression.Schema | ||||||||||||||
wshwsh12 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||
stmt *ExecStmt | ||||||||||||||
lastErrs []error | ||||||||||||||
txnStartTS uint64 | ||||||||||||||
once sync.Once | ||||||||||||||
// finishLock is a mutex used to synchronize access to the `Next` and `Finish` function of the adapter. | ||||||||||||||
// It ensures that only one goroutine can access to the `Next` and `Finish` function at a time, preventing race conditions. | ||||||||||||||
// When we terminate the current SQL externally(e.g. kill query), we may use an additional goroutine to call the Finish function. | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||
finishLock sync.Mutex | ||||||||||||||
wshwsh12 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||
} | ||||||||||||||
|
||||||||||||||
func (a *recordSet) Fields() []*ast.ResultField { | ||||||||||||||
if len(a.fields) == 0 { | ||||||||||||||
a.fields = colNames2ResultFields(a.executor.Schema(), a.stmt.OutputNames, a.stmt.Ctx.GetSessionVars().CurrentDB) | ||||||||||||||
a.fields = colNames2ResultFields(a.schema, a.stmt.OutputNames, a.stmt.Ctx.GetSessionVars().CurrentDB) | ||||||||||||||
} | ||||||||||||||
return a.fields | ||||||||||||||
} | ||||||||||||||
|
@@ -156,6 +162,14 @@ func (a *recordSet) Next(ctx context.Context, req *chunk.Chunk) (err error) { | |||||||||||||
err = util2.GetRecoverError(r) | ||||||||||||||
logutil.Logger(ctx).Error("execute sql panic", zap.String("sql", a.stmt.GetTextToLog(false)), zap.Stack("stack")) | ||||||||||||||
}() | ||||||||||||||
a.finishLock.Lock() | ||||||||||||||
defer a.finishLock.Unlock() | ||||||||||||||
if a.stmt != nil { | ||||||||||||||
err = a.stmt.Ctx.GetSessionVars().SQLKiller.HandleSignal() | ||||||||||||||
wshwsh12 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||
if err != nil { | ||||||||||||||
return err | ||||||||||||||
} | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
err = a.stmt.next(ctx, a.executor, req) | ||||||||||||||
if err != nil { | ||||||||||||||
|
@@ -186,16 +200,27 @@ func (a *recordSet) NewChunk(alloc chunk.Allocator) *chunk.Chunk { | |||||||||||||
|
||||||||||||||
func (a *recordSet) Finish() error { | ||||||||||||||
var err error | ||||||||||||||
a.once.Do(func() { | ||||||||||||||
err = exec.Close(a.executor) | ||||||||||||||
cteErr := resetCTEStorageMap(a.stmt.Ctx) | ||||||||||||||
if cteErr != nil { | ||||||||||||||
logutil.BgLogger().Error("got error when reset cte storage, should check if the spill disk file deleted or not", zap.Error(cteErr)) | ||||||||||||||
} | ||||||||||||||
if err == nil { | ||||||||||||||
err = cteErr | ||||||||||||||
} | ||||||||||||||
}) | ||||||||||||||
if a.finishLock.TryLock() { | ||||||||||||||
defer a.finishLock.Unlock() | ||||||||||||||
a.once.Do(func() { | ||||||||||||||
err = exec.Close(a.executor) | ||||||||||||||
cteErr := resetCTEStorageMap(a.stmt.Ctx) | ||||||||||||||
if cteErr != nil { | ||||||||||||||
logutil.BgLogger().Error("got error when reset cte storage, should check if the spill disk file deleted or not", zap.Error(cteErr)) | ||||||||||||||
} | ||||||||||||||
if err == nil { | ||||||||||||||
err = cteErr | ||||||||||||||
} | ||||||||||||||
a.executor = nil | ||||||||||||||
if a.stmt != nil { | ||||||||||||||
status := a.stmt.Ctx.GetSessionVars().SQLKiller.GetKillSignal() | ||||||||||||||
inWriteResultSet := a.stmt.Ctx.GetSessionVars().SQLKiller.InWriteResultSet.Load() | ||||||||||||||
if status > 0 && inWriteResultSet { | ||||||||||||||
logutil.BgLogger().Warn("kill query, this SQL may be stuck in the network I/O stack.", zap.Uint64("conn", a.stmt.Ctx.GetSessionVars().ConnectionID)) | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will |
||||||||||||||
} | ||||||||||||||
} | ||||||||||||||
}) | ||||||||||||||
} | ||||||||||||||
if err != nil { | ||||||||||||||
a.lastErrs = append(a.lastErrs, err) | ||||||||||||||
} | ||||||||||||||
|
@@ -336,6 +361,7 @@ func (a *ExecStmt) PointGet(ctx context.Context) (*recordSet, error) { | |||||||||||||
|
||||||||||||||
return &recordSet{ | ||||||||||||||
executor: executor, | ||||||||||||||
schema: executor.Schema(), | ||||||||||||||
stmt: a, | ||||||||||||||
txnStartTS: startTs, | ||||||||||||||
}, nil | ||||||||||||||
|
@@ -571,6 +597,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) { | |||||||||||||
|
||||||||||||||
return &recordSet{ | ||||||||||||||
executor: e, | ||||||||||||||
schema: e.Schema(), | ||||||||||||||
stmt: a, | ||||||||||||||
txnStartTS: txnStartTS, | ||||||||||||||
}, nil | ||||||||||||||
|
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -907,6 +907,12 @@ func (s *Server) Kill(connectionID uint64, query bool, maxExecutionTime bool) { | |||||||
// Mark the client connection status as WaitShutdown, when clientConn.Run detect | ||||||||
// this, it will end the dispatch loop and exit. | ||||||||
conn.setStatus(connStatusWaitShutdown) | ||||||||
if conn.bufReadConn != nil { | ||||||||
// When we try `kill connection` and tidb is stuck in the write packet network stack, we can quickly exit the network stack and end the SQL by setting WriteDeadline. | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
if err := conn.bufReadConn.SetWriteDeadline(time.Now()); err != nil { | ||||||||
wshwsh12 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||
logutil.BgLogger().Warn("error setting write deadline for kill.", zap.Error(err)) | ||||||||
} | ||||||||
} | ||||||||
} | ||||||||
killQuery(conn, maxExecutionTime) | ||||||||
} | ||||||||
|
@@ -940,6 +946,7 @@ func killQuery(conn *clientConn, maxExecutionTime bool) { | |||||||
logutil.BgLogger().Warn("error setting read deadline for kill.", zap.Error(err)) | ||||||||
} | ||||||||
} | ||||||||
sessVars.SQLKiller.FinishResultSet() | ||||||||
} | ||||||||
|
||||||||
// KillSysProcesses kill sys processes such as auto analyze. | ||||||||
|
Original file line number | Diff line number | Diff line change | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -111,10 +111,19 @@ func killSessIfNeeded(s *sessionToBeKilled, bt uint64, sm util.SessionManager) { | |||||||||||
zap.String("sql text", fmt.Sprintf("%.100v", info.Info)), | ||||||||||||
zap.Int64("sql memory usage", info.MemTracker.BytesConsumed())) | ||||||||||||
s.lastLogTime = time.Now() | ||||||||||||
|
||||||||||||
if seconds := time.Since(s.killStartTime) / time.Second; seconds >= 60 { | ||||||||||||
// If kill sql fails after 60 seconds, the current SQL may be stuck in the write packet network stack. | ||||||||||||
// Now, we can reclaim the resource by calling `Finish` and start looking for the next SQL with large memory usage. | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||
logutil.BgLogger().Warn(fmt.Sprintf("global memory controller failed to kill the top-consumer in %ds, try to close the executors force", seconds)) | ||||||||||||
wshwsh12 marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||
s.sessionTracker.Killer.FinishResultSet() | ||||||||||||
goto Succ | ||||||||||||
} | ||||||||||||
} | ||||||||||||
return | ||||||||||||
} | ||||||||||||
} | ||||||||||||
Succ: | ||||||||||||
s.reset() | ||||||||||||
IsKilling.Store(false) | ||||||||||||
memory.MemUsageTop1Tracker.CompareAndSwap(s.sessionTracker, nil) | ||||||||||||
|
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -41,11 +41,47 @@ const ( | |||||||||
type SQLKiller struct { | ||||||||||
Signal killSignal | ||||||||||
ConnID uint64 | ||||||||||
Finish func() | ||||||||||
// InWriteResultSet is used to mark whether the query is calling clientConn.writeResultSet(). | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
// If the query is in writeResultSet, and Finish() can acquire rs.finishLock, we can think the query is waiting for network IO. | ||||||||||
wshwsh12 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
InWriteResultSet atomic.Bool | ||||||||||
} | ||||||||||
|
||||||||||
// SendKillSignal sends a kill signal to the query. | ||||||||||
func (killer *SQLKiller) SendKillSignal(reason killSignal) { | ||||||||||
atomic.CompareAndSwapUint32(&killer.Signal, 0, reason) | ||||||||||
if atomic.CompareAndSwapUint32(&killer.Signal, 0, reason) { | ||||||||||
status := atomic.LoadUint32(&killer.Signal) | ||||||||||
err := killer.getKillError(status) | ||||||||||
logutil.BgLogger().Warn("kill query started", zap.Uint64("conn", killer.ConnID), zap.String("reason", err.Error())) | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. logutil.BgLogger().Warn("kill query initiated", zap.Uint64("connection ID", killer.ConnID), zap.String("reason", err.Error())) |
||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
// GetKillSignal gets the kill signal. | ||||||||||
func (killer *SQLKiller) GetKillSignal() killSignal { | ||||||||||
return atomic.LoadUint32(&killer.Signal) | ||||||||||
} | ||||||||||
|
||||||||||
// getKillError gets the error according to the kill signal. | ||||||||||
func (killer *SQLKiller) getKillError(status killSignal) error { | ||||||||||
switch status { | ||||||||||
case QueryInterrupted: | ||||||||||
return exeerrors.ErrQueryInterrupted.GenWithStackByArgs() | ||||||||||
case MaxExecTimeExceeded: | ||||||||||
return exeerrors.ErrMaxExecTimeExceeded.GenWithStackByArgs() | ||||||||||
case QueryMemoryExceeded: | ||||||||||
return exeerrors.ErrMemoryExceedForQuery.GenWithStackByArgs(killer.ConnID) | ||||||||||
case ServerMemoryExceeded: | ||||||||||
return exeerrors.ErrMemoryExceedForInstance.GenWithStackByArgs(killer.ConnID) | ||||||||||
} | ||||||||||
return nil | ||||||||||
} | ||||||||||
|
||||||||||
// FinishResultSet is used to finish the result set. | ||||||||||
// If the cancel signal is received and SQL is waiting for network IO, resource released can be performed first. | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. // FinishResultSet is used to close the result set. |
||||||||||
func (killer *SQLKiller) FinishResultSet() { | ||||||||||
if killer.Finish != nil { | ||||||||||
killer.Finish() | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
// HandleSignal handles the kill signal and return the error. | ||||||||||
|
@@ -61,22 +97,19 @@ func (killer *SQLKiller) HandleSignal() error { | |||||||||
} | ||||||||||
}) | ||||||||||
status := atomic.LoadUint32(&killer.Signal) | ||||||||||
switch status { | ||||||||||
case QueryInterrupted: | ||||||||||
return exeerrors.ErrQueryInterrupted.GenWithStackByArgs() | ||||||||||
case MaxExecTimeExceeded: | ||||||||||
return exeerrors.ErrMaxExecTimeExceeded.GenWithStackByArgs() | ||||||||||
case QueryMemoryExceeded: | ||||||||||
return exeerrors.ErrMemoryExceedForQuery.GenWithStackByArgs(killer.ConnID) | ||||||||||
case ServerMemoryExceeded: | ||||||||||
err := killer.getKillError(status) | ||||||||||
if status == ServerMemoryExceeded { | ||||||||||
logutil.BgLogger().Warn("global memory controller, NeedKill signal is received successfully", | ||||||||||
zap.Uint64("conn", killer.ConnID)) | ||||||||||
return exeerrors.ErrMemoryExceedForInstance.GenWithStackByArgs(killer.ConnID) | ||||||||||
} | ||||||||||
return nil | ||||||||||
return err | ||||||||||
} | ||||||||||
|
||||||||||
// Reset resets the SqlKiller. | ||||||||||
func (killer *SQLKiller) Reset() { | ||||||||||
if atomic.LoadUint32(&killer.Signal) != 0 { | ||||||||||
logutil.BgLogger().Warn("kill query finished", zap.Uint64("conn", killer.ConnID)) | ||||||||||
} | ||||||||||
atomic.StoreUint32(&killer.Signal, 0) | ||||||||||
killer.Finish = nil | ||||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.