Skip to content

Commit

Permalink
server: refine the connection counter logic (#54546) (#54611)
Browse files Browse the repository at this point in the history
close #54428, close #54545
  • Loading branch information
ti-chi-bot authored Jul 16, 2024
1 parent 71c4dbd commit 88ea385
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 13 deletions.
13 changes: 4 additions & 9 deletions pkg/executor/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/auth"
"github.com/pingcap/tidb/pkg/parser/model"
Expand Down Expand Up @@ -2854,20 +2853,16 @@ func (e *SimpleExec) executeAdminFlushPlanCache(s *ast.AdminStmt) error {
}

func (e *SimpleExec) executeSetResourceGroupName(s *ast.SetResourceGroupStmt) error {
originalResourceGroup := e.Ctx().GetSessionVars().ResourceGroupName
var name string
if s.Name.L != "" {
if _, ok := e.is.ResourceGroupByName(s.Name); !ok {
return infoschema.ErrResourceGroupNotExists.GenWithStackByArgs(s.Name.O)
}
e.Ctx().GetSessionVars().ResourceGroupName = s.Name.L
name = s.Name.L
} else {
e.Ctx().GetSessionVars().ResourceGroupName = resourcegroup.DefaultResourceGroupName
}
newResourceGroup := e.Ctx().GetSessionVars().ResourceGroupName
if originalResourceGroup != newResourceGroup {
metrics.ConnGauge.WithLabelValues(originalResourceGroup).Dec()
metrics.ConnGauge.WithLabelValues(newResourceGroup).Inc()
name = resourcegroup.DefaultResourceGroupName
}
e.Ctx().GetSessionVars().SetResourceGroupName(name)
return nil
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ func closeConn(cc *clientConn) error {
logutil.Logger(context.Background()).Debug("could not close connection", zap.Error(err))
}
}

// Close statements and session
// At first, it'll decrese the count of connections in the resource group, update the corresponding gauge.
// Then it'll close the statements and session, which release advisory locks, row locks, etc.
Expand All @@ -375,6 +376,8 @@ func closeConn(cc *clientConn) error {
metrics.ConnGauge.WithLabelValues(resourceGroupName).Dec()

err = ctx.Close()
} else {
metrics.ConnGauge.WithLabelValues(resourcegroup.DefaultResourceGroupName).Dec()
}
})
return err
Expand Down
2 changes: 2 additions & 0 deletions pkg/server/internal/testserverclient/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ go_library(
"//pkg/parser/model",
"//pkg/parser/mysql",
"//pkg/server",
"//pkg/sessionctx/sessionstates",
"//pkg/testkit",
"//pkg/testkit/testenv",
"//pkg/util",
"//pkg/util/versioninfo",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_pingcap_errors//:errors",
Expand Down
95 changes: 95 additions & 0 deletions pkg/server/internal/testserverclient/server_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package testserverclient
import (
"bytes"
"context"
"crypto/x509"
"database/sql"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -45,8 +46,10 @@ import (
"github.com/pingcap/tidb/pkg/parser/model"
tmysql "github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/server"
"github.com/pingcap/tidb/pkg/sessionctx/sessionstates"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/testenv"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/versioninfo"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -2661,6 +2664,98 @@ func (cli *TestServerClient) RunTestConnectionCount(t *testing.T) {
resourceGroupConnCountReached(t, "default", 0.0)
resourceGroupConnCountReached(t, "test", 0.0)
})

// The connection closed before handshake will not decrease the count below 0.
cli.RunTests(t, func(config *mysql.Config) {
config.User = "randomusername"
}, func(dbt *testkit.DBTestKit) {
_, err := dbt.GetDB().Conn(context.Background())
require.NotNil(t, err)
resourceGroupConnCountReached(t, "default", 0.0)
})

// The resource group set by user authantication info is tracked by the count
cli.RunTests(t, nil, func(dbt *testkit.DBTestKit) {
// Create a user with resource group
_, err := dbt.GetDB().Exec("CREATE USER 'testuser'@'%' RESOURCE GROUP test;")
require.NoError(t, err)
})
cli.RunTests(t, func(c *mysql.Config) {
c.User = "testuser"
c.DBName = ""
}, func(dbt *testkit.DBTestKit) {
// By default, the resource group is set to `test`
ctx := context.Background()
dbt.GetDB().SetMaxIdleConns(0)

// start 100 connections
conns := make([]*sql.Conn, 100)
for i := 0; i < 100; i++ {
conn, err := dbt.GetDB().Conn(ctx)
require.NoError(t, err)
conns[i] = conn
}
resourceGroupConnCountReached(t, "test", 100.0)

// close 25 connections
for i := 75; i < 100; i++ {
err := conns[i].Close()
require.NoError(t, err)
}
resourceGroupConnCountReached(t, "test", 75.0)

// close the rest of them
for i := 0; i < 75; i++ {
err := conns[i].Close()
require.NoError(t, err)
}
resourceGroupConnCountReached(t, "test", 0.0)
})

// The resource group set by `SET SESSION_STATE` will be tracked by the counter
// At first, create a new cert/key pair to encode session state
tempDir := t.TempDir()
certPath := filepath.Join(tempDir, "cert.pem")
keyPath := filepath.Join(tempDir, "key.pem")
err := util.CreateCertificates(certPath, keyPath, 1024, x509.RSA, x509.UnknownSignatureAlgorithm)
require.NoError(t, err)

sessionstates.SetCertPath(certPath)
sessionstates.SetKeyPath(keyPath)
sessionstates.ReloadSigningCert()
cli.RunTests(t, nil, func(dbt *testkit.DBTestKit) {
ctx := context.Background()
conn, err := dbt.GetDB().Conn(ctx)
require.NoError(t, err)
resourceGroupConnCountReached(t, "default", 1.0)
// Now set the resource group to `test`
_, err = conn.ExecContext(ctx, "set resource group test")
require.NoError(t, err)
resourceGroupConnCountReached(t, "default", 0.0)
resourceGroupConnCountReached(t, "test", 1.0)

// Encode the session state
rows, err := conn.QueryContext(ctx, "show session_states")
require.NoError(t, err)
var sessionStates, signInfo string
rows.Next()
err = rows.Scan(&sessionStates, &signInfo)
require.NoError(t, err)
require.NoError(t, rows.Close())

// Now reset the resource group to `default`
_, err = conn.ExecContext(ctx, "set resource group default")
require.NoError(t, err)
resourceGroupConnCountReached(t, "default", 1.0)
resourceGroupConnCountReached(t, "test", 0.0)
// Set the session state
sessionStates = strings.ReplaceAll(sessionStates, "\\", "\\\\")
sessionStates = strings.ReplaceAll(sessionStates, "'", "\\'")
_, err = conn.ExecContext(ctx, fmt.Sprintf("set session_states '%s'", sessionStates))
require.NoError(t, err)
resourceGroupConnCountReached(t, "default", 0.0)
resourceGroupConnCountReached(t, "test", 1.0)
})
}

func (cli *TestServerClient) getNewDB(t *testing.T, overrider configOverrider) *testkit.DBTestKit {
Expand Down
3 changes: 2 additions & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
autoid "github.com/pingcap/tidb/pkg/autoid_service"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/domain/resourcegroup"
"github.com/pingcap/tidb/pkg/extension"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/metrics"
Expand Down Expand Up @@ -232,6 +233,7 @@ func (s *Server) newConn(conn net.Conn) *clientConn {
}
cc.setConn(conn)
cc.salt = fastrand.Buf(20)
metrics.ConnGauge.WithLabelValues(resourcegroup.DefaultResourceGroupName).Inc()
return cc
}

Expand Down Expand Up @@ -639,7 +641,6 @@ func (s *Server) registerConn(conn *clientConn) bool {
return false
}
s.clients[conn.connectionID] = conn
metrics.ConnGauge.WithLabelValues(conn.getCtx().GetSessionVars().ResourceGroupName).Inc()
return true
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2718,7 +2718,7 @@ func (s *session) Auth(user *auth.UserIdentity, authentication, salt []byte, aut
}

if variable.EnableResourceControl.Load() && info.ResourceGroupName != "" {
s.sessionVars.ResourceGroupName = strings.ToLower(info.ResourceGroupName)
s.sessionVars.SetResourceGroupName(info.ResourceGroupName)
}

if info.InSandBoxMode {
Expand Down
14 changes: 12 additions & 2 deletions pkg/sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1481,7 +1481,8 @@ type SessionVars struct {
shardRand *rand.Rand

// Resource group name
// NOTE: all statement relate opeartion should use StmtCtx.ResourceGroupName instead.
// NOTE: all statement relate operation should use StmtCtx.ResourceGroupName instead.
// NOTE: please don't change it directly. Use `SetResourceGroupName`, because it'll need to inc/dec the metrics
ResourceGroupName string

// PessimisticTransactionFairLocking controls whether fair locking for pessimistic transaction
Expand Down Expand Up @@ -2685,7 +2686,7 @@ func (s *SessionVars) DecodeSessionStates(_ context.Context, sessionStates *sess
s.SequenceState.SetAllStates(sessionStates.SequenceLatestValues)
s.FoundInPlanCache = sessionStates.FoundInPlanCache
s.FoundInBinding = sessionStates.FoundInBinding
s.ResourceGroupName = sessionStates.ResourceGroupName
s.SetResourceGroupName(sessionStates.ResourceGroupName)
s.HypoIndexes = sessionStates.HypoIndexes
s.HypoTiFlashReplicas = sessionStates.HypoTiFlashReplicas

Expand All @@ -2696,6 +2697,15 @@ func (s *SessionVars) DecodeSessionStates(_ context.Context, sessionStates *sess
return
}

// SetResourceGroupName changes the resource group name and inc/dec the metrics accordingly.
func (s *SessionVars) SetResourceGroupName(groupName string) {
if s.ResourceGroupName != groupName {
metrics.ConnGauge.WithLabelValues(s.ResourceGroupName).Dec()
metrics.ConnGauge.WithLabelValues(groupName).Inc()
}
s.ResourceGroupName = groupName
}

// TableDelta stands for the changed count for one table or partition.
type TableDelta struct {
Delta int64
Expand Down

0 comments on commit 88ea385

Please sign in to comment.