Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: enhance graceful stop by closing connections after finish the ongoing txn (#32111) (#48905) #48989

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ go_library(
"@org_golang_google_grpc//channelz/service",
"@org_golang_google_grpc//keepalive",
"@org_golang_google_grpc//peer",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
],
)
Expand Down
33 changes: 17 additions & 16 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ func newClientConn(s *Server) *clientConn {
status: connStatusDispatching,
lastActive: time.Now(),
authPlugin: mysql.AuthNativePassword,
quit: make(chan struct{}),
ppEnabled: s.cfg.ProxyProtocol.Networks != "",
}
}
Expand Down Expand Up @@ -215,6 +216,8 @@ type clientConn struct {
sync.RWMutex
cancelFunc context.CancelFunc
}
// quit is close once clientConn quit Run().
quit chan struct{}
extensions *extension.SessionExtensions

// Proxy Protocol Enabled
Expand Down Expand Up @@ -1093,6 +1096,12 @@ func (cc *clientConn) Run(ctx context.Context) {
terror.Log(err)
metrics.PanicCounter.WithLabelValues(metrics.LabelSession).Inc()
}
if atomic.LoadInt32(&cc.status) != connStatusShutdown {
err := cc.Close()
terror.Log(err)
}

close(cc.quit)
}()

// Usually, client connection status changes between [dispatching] <=> [reading].
Expand All @@ -1101,6 +1110,13 @@ func (cc *clientConn) Run(ctx context.Context) {
// The client connection would detect the events when it fails to change status
// by CAS operation, it would then take some actions accordingly.
for {
// Close connection between txn when we are going to shutdown server.
if cc.server.inShutdownMode.Load() {
if !cc.ctx.GetSessionVars().InTxn() {
return
}
}

if !atomic.CompareAndSwapInt32(&cc.status, connStatusDispatching, connStatusReading) ||
// The judge below will not be hit by all means,
// But keep it stayed as a reminder and for the code reference for connStatusWaitShutdown.
Expand All @@ -1110,6 +1126,7 @@ func (cc *clientConn) Run(ctx context.Context) {

cc.alloc.Reset()
// close connection when idle time is more than wait_timeout
// default 28800(8h), FIXME: should not block at here when we kill the connection.
waitTimeout := cc.getSessionVarsWaitTimeout(ctx)
cc.pkt.setReadTimeout(time.Duration(waitTimeout) * time.Second)
start := time.Now()
Expand Down Expand Up @@ -1196,22 +1213,6 @@ func (cc *clientConn) Run(ctx context.Context) {
}
}

// ShutdownOrNotify will Shutdown this client connection, or do its best to notify.
func (cc *clientConn) ShutdownOrNotify() bool {
if (cc.ctx.Status() & mysql.ServerStatusInTrans) > 0 {
return false
}
// If the client connection status is reading, it's safe to shutdown it.
if atomic.CompareAndSwapInt32(&cc.status, connStatusReading, connStatusShutdown) {
return true
}
// If the client connection status is dispatching, we can't shutdown it immediately,
// so set the status to WaitShutdown as a notification, the loop in clientConn.Run
// will detect it and then exit.
atomic.StoreInt32(&cc.status, connStatusWaitShutdown)
return false
}

func errStrForLog(err error, enableRedactLog bool) string {
if enableRedactLog {
// currently, only ErrParse is considered when enableRedactLog because it may contain sensitive information like
Expand Down
25 changes: 0 additions & 25 deletions server/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -778,31 +778,6 @@ func TestShutDown(t *testing.T) {
require.Equal(t, executor.ErrQueryInterrupted, err)
}

func TestShutdownOrNotify(t *testing.T) {
store := testkit.CreateMockStore(t)
se, err := session.CreateSession4Test(store)
require.NoError(t, err)
tc := &TiDBContext{
Session: se,
stmts: make(map[int]*TiDBStatement),
}
cc := &clientConn{
connectionID: 1,
server: &Server{
capability: defaultCapability,
},
status: connStatusWaitShutdown,
}
cc.setCtx(tc)
require.False(t, cc.ShutdownOrNotify())
cc.status = connStatusReading
require.True(t, cc.ShutdownOrNotify())
require.Equal(t, connStatusShutdown, cc.status)
cc.status = connStatusDispatching
require.False(t, cc.ShutdownOrNotify())
require.Equal(t, connStatusWaitShutdown, cc.status)
}

type snapshotCache interface {
SnapCacheHitCount() int
}
Expand Down
2 changes: 1 addition & 1 deletion server/http_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ func (s *Server) handleStatus(w http.ResponseWriter, req *http.Request) {
// If the server is in the process of shutting down, return a non-200 status.
// It is important not to return status{} as acquiring the s.ConnectionCount()
// acquires a lock that may already be held by the shutdown process.
if s.inShutdownMode {
if !s.health.Load() {
w.WriteHeader(http.StatusInternalServerError)
return
}
Expand Down
Loading