Skip to content

Commit

Permalink
server: start to listen after init stats complete (#51472)
Browse files Browse the repository at this point in the history
close #51473
  • Loading branch information
hawkingrei authored Mar 5, 2024
1 parent 6192717 commit 7f8d394
Show file tree
Hide file tree
Showing 21 changed files with 145 additions and 81 deletions.
5 changes: 4 additions & 1 deletion br/pkg/mock/mock_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func NewCluster() (*Cluster, error) {
// Start runs a mock cluster.
func (mock *Cluster) Start() error {
server.RunInGoTest = true
server.RunInGoTestChan = make(chan struct{})
mock.TiDBDriver = server.NewTiDBDriver(mock.Storage)
cfg := config.NewConfig()
// let tidb random select a port
Expand All @@ -107,6 +108,7 @@ func (mock *Cluster) Start() error {
panic(err1)
}
}()
<-server.RunInGoTestChan
mock.DSN = waitUntilServerOnline("127.0.0.1", cfg.Status.StatusPort)
return nil
}
Expand Down Expand Up @@ -181,7 +183,8 @@ func waitUntilServerOnline(addr string, statusPort uint) string {
}
if retry == retryTime {
log.Panic("failed to connect HTTP status in every 10 ms",
zap.Int("retryTime", retryTime))
zap.Int("retryTime", retryTime),
zap.String("url", statusURL))
}
return strings.SplitAfter(dsn, "/")[0]
}
6 changes: 0 additions & 6 deletions cmd/tidb-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,11 +302,6 @@ func main() {
storage, dom := createStoreAndDomain(keyspaceName)
svr := createServer(storage, dom)

// Register error API is not thread-safe, the caller MUST NOT register errors after initialization.
// To prevent misuse, set a flag to indicate that register new error will panic immediately.
// For regression of issue like https://github.com/pingcap/tidb/issues/28190
terror.RegisterFinish()

exited := make(chan struct{})
signal.SetupSignalHandler(func() {
svr.Close()
Expand All @@ -317,7 +312,6 @@ func main() {
close(exited)
})
topsql.SetupTopSQL()

terror.MustNil(svr.Run(dom))
<-exited
syncLog()
Expand Down
1 change: 1 addition & 0 deletions pkg/domain/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

func TestMain(m *testing.M) {
server.RunInGoTest = true
server.RunInGoTestChan = make(chan struct{})
testsetup.SetupForCommonTest()
opts := []goleak.Option{
goleak.IgnoreTopFunction("github.com/golang/glog.(*fileSink).flushDaemon"),
Expand Down
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ go_library(
"@com_github_pingcap_kvproto//pkg/diagnosticspb",
"@com_github_pingcap_kvproto//pkg/mpp",
"@com_github_pingcap_kvproto//pkg/tikvpb",
"@com_github_pingcap_log//:log",
"@com_github_pingcap_sysutil//:sysutil",
"@com_github_prometheus_client_golang//prometheus",
"@com_github_prometheus_client_golang//prometheus/promhttp",
Expand Down
6 changes: 3 additions & 3 deletions pkg/server/handler/extractorhandler/extract_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ func TestExtractHandler(t *testing.T) {
dom, err := session.GetDomain(store)
require.NoError(t, err)
server.SetDomain(dom)

client.Port = testutil.GetPortFromTCPAddr(server.ListenAddr())
client.StatusPort = testutil.GetPortFromTCPAddr(server.StatusListenerAddr())
go func() {
err := server.Run(nil)
require.NoError(t, err)
}()
<-server2.RunInGoTestChan
client.Port = testutil.GetPortFromTCPAddr(server.ListenAddr())
client.StatusPort = testutil.GetPortFromTCPAddr(server.StatusListenerAddr())
client.WaitUntilServerOnline()
startTime := time.Now()
time.Sleep(time.Second)
Expand Down
1 change: 1 addition & 0 deletions pkg/server/handler/extractorhandler/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (

func TestMain(m *testing.M) {
server.RunInGoTest = true
server.RunInGoTestChan = make(chan struct{})
testsetup.SetupForCommonTest()
topsqlstate.EnableTopSQL()
unistore.CheckResourceTagForTopSQLInGoTest = true
Expand Down
1 change: 1 addition & 0 deletions pkg/server/handler/optimizor/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (

func TestMain(m *testing.M) {
server.RunInGoTest = true
server.RunInGoTestChan = make(chan struct{})
testsetup.SetupForCommonTest()
topsqlstate.EnableTopSQL()
unistore.CheckResourceTagForTopSQLInGoTest = true
Expand Down
5 changes: 3 additions & 2 deletions pkg/server/handler/optimizor/optimize_trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,13 @@ func TestDumpOptimizeTraceAPI(t *testing.T) {
require.NoError(t, err)
server.SetDomain(dom)

client.Port = testutil.GetPortFromTCPAddr(server.ListenAddr())
client.StatusPort = testutil.GetPortFromTCPAddr(server.StatusListenerAddr())
go func() {
err := server.Run(nil)
require.NoError(t, err)
}()
<-server2.RunInGoTestChan
client.Port = testutil.GetPortFromTCPAddr(server.ListenAddr())
client.StatusPort = testutil.GetPortFromTCPAddr(server.StatusListenerAddr())
client.WaitUntilServerOnline()

statsHandler := optimizor.NewStatsHandler(dom)
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/handler/optimizor/plan_replayer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func prepareServerAndClientForTest(t *testing.T, store kv.Storage, dom *domain.D
err := srv.Run(nil)
require.NoError(t, err)
}()

<-server.RunInGoTestChan
client.Port = testutil.GetPortFromTCPAddr(srv.ListenAddr())
client.StatusPort = testutil.GetPortFromTCPAddr(srv.StatusListenerAddr())
client.WaitUntilServerOnline()
Expand Down
6 changes: 3 additions & 3 deletions pkg/server/handler/optimizor/statistics_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ func TestDumpStatsAPI(t *testing.T) {
dom, err := session.GetDomain(store)
require.NoError(t, err)
server.SetDomain(dom)

client.Port = testutil.GetPortFromTCPAddr(server.ListenAddr())
client.StatusPort = testutil.GetPortFromTCPAddr(server.StatusListenerAddr())
go func() {
err := server.Run(nil)
require.NoError(t, err)
}()
<-server2.RunInGoTestChan
client.Port = testutil.GetPortFromTCPAddr(server.ListenAddr())
client.StatusPort = testutil.GetPortFromTCPAddr(server.StatusListenerAddr())
client.WaitUntilServerOnline()

statsHandler := optimizor.NewStatsHandler(dom)
Expand Down
7 changes: 4 additions & 3 deletions pkg/server/handler/tests/http_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,17 +463,18 @@ func (ts *basicHTTPHandlerTestSuite) startServer(t *testing.T) {
cfg.Port = 0
cfg.Status.StatusPort = 0
cfg.Status.ReportStatus = true

server2.RunInGoTestChan = make(chan struct{})
server, err := server2.NewServer(cfg, ts.tidbdrv)
require.NoError(t, err)
ts.Port = testutil.GetPortFromTCPAddr(server.ListenAddr())
ts.StatusPort = testutil.GetPortFromTCPAddr(server.StatusListenerAddr())
ts.server = server
ts.server.SetDomain(ts.domain)
go func() {
err := server.Run(ts.domain)
require.NoError(t, err)
}()
<-server2.RunInGoTestChan
ts.Port = testutil.GetPortFromTCPAddr(server.ListenAddr())
ts.StatusPort = testutil.GetPortFromTCPAddr(server.StatusListenerAddr())
ts.WaitUntilServerOnline()

do, err := session.GetDomain(ts.store)
Expand Down
1 change: 1 addition & 0 deletions pkg/server/handler/tests/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (

func TestMain(m *testing.M) {
server.RunInGoTest = true
server.RunInGoTestChan = make(chan struct{})
testsetup.SetupForCommonTest()
topsqlstate.EnableTopSQL()
unistore.CheckResourceTagForTopSQLInGoTest = true
Expand Down
7 changes: 6 additions & 1 deletion pkg/server/http_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,13 @@ import (

const defaultStatusPort = 10080

func (s *Server) startStatusHTTP() {
func (s *Server) startStatusHTTP() error {
err := s.initHTTPListener()
if err != nil {
return err
}
go s.startHTTPServer()
return nil
}

func serveError(w http.ResponseWriter, status int, txt string) {
Expand Down
58 changes: 46 additions & 12 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (

"github.com/blacktear23/go-proxyprotocol"
"github.com/pingcap/errors"
"github.com/pingcap/log"
autoid "github.com/pingcap/tidb/pkg/autoid_service"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/domain"
Expand Down Expand Up @@ -83,6 +84,8 @@ var (
osVersion string
// RunInGoTest represents whether we are run code in test.
RunInGoTest bool
// RunInGoTestChan is used to control the RunInGoTest.
RunInGoTestChan chan struct{}
)

func init() {
Expand Down Expand Up @@ -289,15 +292,19 @@ func NewServer(cfg *config.Config, driver IDriver) (*Server, error) {
if s.tlsConfig != nil {
s.capability |= mysql.ClientSSL
}
variable.RegisterStatistics(s)
return s, nil
}

func (s *Server) initTiDBListener() (err error) {
if s.cfg.Host != "" && (s.cfg.Port != 0 || RunInGoTest) {
addr := net.JoinHostPort(s.cfg.Host, strconv.Itoa(int(s.cfg.Port)))
tcpProto := "tcp"
if s.cfg.EnableTCP4Only {
tcpProto = "tcp4"
}
if s.listener, err = net.Listen(tcpProto, addr); err != nil {
return nil, errors.Trace(err)
return errors.Trace(err)
}
logutil.BgLogger().Info("server is running MySQL protocol", zap.String("addr", addr))
if RunInGoTest && s.cfg.Port == 0 {
Expand All @@ -307,18 +314,18 @@ func NewServer(cfg *config.Config, driver IDriver) (*Server, error) {

if s.cfg.Socket != "" {
if err := cleanupStaleSocket(s.cfg.Socket); err != nil {
return nil, errors.Trace(err)
return errors.Trace(err)
}

if s.socket, err = net.Listen("unix", s.cfg.Socket); err != nil {
return nil, errors.Trace(err)
return errors.Trace(err)
}
logutil.BgLogger().Info("server is running MySQL protocol", zap.String("socket", s.cfg.Socket))
}

if s.socket == nil && s.listener == nil {
err = errors.New("Server not configured to listen on either -socket or -host and -port")
return nil, errors.Trace(err)
return errors.Trace(err)
}

if s.cfg.ProxyProtocol.Networks != "" {
Expand All @@ -330,7 +337,7 @@ func NewServer(cfg *config.Config, driver IDriver) (*Server, error) {
int(s.cfg.ProxyProtocol.HeaderTimeout), s.cfg.ProxyProtocol.Fallbackable)
if err != nil {
logutil.BgLogger().Error("ProxyProtocol networks parameter invalid")
return nil, errors.Trace(err)
return errors.Trace(err)
}
if s.listener != nil {
s.listener = ppListener
Expand All @@ -340,10 +347,13 @@ func NewServer(cfg *config.Config, driver IDriver) (*Server, error) {
logutil.BgLogger().Info("server is running MySQL protocol (through PROXY protocol)", zap.String("socket", s.cfg.Socket))
}
}
return nil
}

func (s *Server) initHTTPListener() (err error) {
if s.cfg.Status.ReportStatus {
if err = s.listenStatusHTTPServer(); err != nil {
return nil, errors.Trace(err)
return errors.Trace(err)
}
}

Expand All @@ -364,10 +374,7 @@ func NewServer(cfg *config.Config, driver IDriver) (*Server, error) {
logutil.BgLogger().Error("Fail to load JWKS from the path", zap.String("jwks", s.cfg.Security.AuthTokenJWKS))
}
}

variable.RegisterStatistics(s)

return s, nil
return
}

func cleanupStaleSocket(socket string) error {
Expand Down Expand Up @@ -426,23 +433,50 @@ func (s *Server) Run(dom *domain.Domain) error {

// Start HTTP API to report tidb info such as TPS.
if s.cfg.Status.ReportStatus {
s.startStatusHTTP()
err := s.startStatusHTTP()
if err != nil {
log.Error("failed to create the server", zap.Error(err), zap.Stack("stack"))
return err
}
}
if config.GetGlobalConfig().Performance.ForceInitStats && dom != nil {
<-dom.StatsHandle().InitStatsDone
}
// If error should be reported and exit the server it can be sent on this
// channel. Otherwise, end with sending a nil error to signal "done"
errChan := make(chan error, 2)
err := s.initTiDBListener()
if err != nil {
log.Error("failed to create the server", zap.Error(err), zap.Stack("stack"))
return err
}
// Register error API is not thread-safe, the caller MUST NOT register errors after initialization.
// To prevent misuse, set a flag to indicate that register new error will panic immediately.
// For regression of issue like https://github.com/pingcap/tidb/issues/28190
terror.RegisterFinish()
go s.startNetworkListener(s.listener, false, errChan)
go s.startNetworkListener(s.socket, true, errChan)
err := <-errChan
if RunInGoTest && !isClosed(RunInGoTestChan) {
close(RunInGoTestChan)
}
err = <-errChan
if err != nil {
return err
}
return <-errChan
}

// isClosed is to check if the channel is closed
func isClosed(ch chan struct{}) bool {
select {
case <-ch:
return true
default:
}

return false
}

func (s *Server) startNetworkListener(listener net.Listener, isUnixSocket bool, errChan chan error) {
if listener == nil {
errChan <- nil
Expand Down
1 change: 1 addition & 0 deletions pkg/server/tests/commontest/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (

func TestMain(m *testing.M) {
server.RunInGoTest = true
server.RunInGoTestChan = make(chan struct{})
testsetup.SetupForCommonTest()
topsqlstate.EnableTopSQL()
unistore.CheckResourceTagForTopSQLInGoTest = true
Expand Down
Loading

0 comments on commit 7f8d394

Please sign in to comment.