Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

syncer: add more metrics; refine batch handle #590

Merged
merged 7 commits into from
Apr 10, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions loader/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (conn *DBConn) executeSQL(ctx *tcontext.Context, queries []string, args ...
params,
func(ctx *tcontext.Context) (interface{}, error) {
startTime := time.Now()
_, err := conn.baseConn.ExecuteSQL(ctx, queries, args...)
_, err := conn.baseConn.ExecuteSQL(ctx, stmtHistogram, conn.cfg.Name, queries, args...)
failpoint.Inject("LoadExecCreateTableFailed", func(val failpoint.Value) {
errCode, err1 := strconv.ParseUint(val.(string), 10, 16)
if err1 != nil {
Expand All @@ -163,7 +163,10 @@ func (conn *DBConn) executeSQL(ctx *tcontext.Context, queries []string, args ...
cost := time.Since(startTime)
txnHistogram.WithLabelValues(conn.cfg.Name).Observe(cost.Seconds())
if cost.Seconds() > 1 {
ctx.L().Warn("transaction execute successfully", zap.Duration("cost time", cost))
ctx.L().Warn("execute transaction",
zap.String("query", utils.TruncateInterface(queries, -1)),
zap.String("argument", utils.TruncateInterface(args, -1)),
zap.Duration("cost time", cost))
}
}
return nil, err
Expand Down
14 changes: 12 additions & 2 deletions loader/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ var (
Subsystem: "loader",
Name: "query_duration_time",
Help: "Bucketed histogram of query time (s) of a txn.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 16),
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 18),
}, []string{"task"})

txnHistogram = prometheus.NewHistogramVec(
Expand All @@ -42,9 +42,18 @@ var (
Subsystem: "loader",
Name: "txn_duration_time",
Help: "Bucketed histogram of processing time (s) of a txn.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 16),
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 18),
}, []string{"task"})

stmtHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "dm",
Subsystem: "loader",
Name: "stmt_duration_time",
Help: "Bucketed histogram of every statement query time (s).",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 18),
}, []string{"type", "task"})

dataFileGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dm",
Expand Down Expand Up @@ -92,6 +101,7 @@ func RegisterMetrics(registry *prometheus.Registry) {
registry.MustRegister(tidbExecutionErrorCounter)
registry.MustRegister(txnHistogram)
registry.MustRegister(queryHistogram)
registry.MustRegister(stmtHistogram)
registry.MustRegister(dataFileGauge)
registry.MustRegister(tableGauge)
registry.MustRegister(dataSizeGauge)
Expand Down
20 changes: 16 additions & 4 deletions pkg/conn/baseconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ import (
"database/sql/driver"
"fmt"
"strings"
"time"

"github.com/go-sql-driver/mysql"
"github.com/pingcap/failpoint"
"github.com/prometheus/client_golang/prometheus"
gmysql "github.com/siddontang/go-mysql/mysql"
"go.uber.org/zap"

Expand Down Expand Up @@ -114,7 +116,7 @@ func (conn *BaseConn) QuerySQL(tctx *tcontext.Context, query string, args ...int
// return
// 1. failed: (the index of sqls executed error, error)
// 2. succeed: (len(sqls), nil)
func (conn *BaseConn) ExecuteSQLWithIgnoreError(tctx *tcontext.Context, ignoreErr func(error) bool, queries []string, args ...[]interface{}) (int, error) {
func (conn *BaseConn) ExecuteSQLWithIgnoreError(tctx *tcontext.Context, hVec *prometheus.HistogramVec, task string, ignoreErr func(error) bool, queries []string, args ...[]interface{}) (int, error) {
// inject an error to trigger retry, this should be placed before the real execution of the SQL statement.
failpoint.Inject("retryableError", func(val failpoint.Value) {
if mark, ok := val.(string); ok {
Expand All @@ -140,11 +142,13 @@ func (conn *BaseConn) ExecuteSQLWithIgnoreError(tctx *tcontext.Context, ignoreEr
return 0, terror.ErrDBUnExpect.Generate("database connection not valid")
}

startTime := time.Now()
txn, err := conn.DBConn.BeginTx(tctx.Context(), nil)

if err != nil {
return 0, terror.ErrDBExecuteFailed.Delegate(err, "begin")
}
hVec.WithLabelValues("begin", task).Observe(time.Since(startTime).Seconds())

l := len(queries)

Expand All @@ -158,8 +162,11 @@ func (conn *BaseConn) ExecuteSQLWithIgnoreError(tctx *tcontext.Context, ignoreEr
zap.String("query", utils.TruncateString(query, -1)),
zap.String("argument", utils.TruncateInterface(arg, -1)))

startTime = time.Now()
_, err = txn.ExecContext(tctx.Context(), query, arg...)
if err != nil {
if err == nil {
hVec.WithLabelValues("stmt", task).Observe(time.Since(startTime).Seconds())
} else {
if ignoreErr != nil && ignoreErr(err) {
tctx.L().Warn("execute statement failed and will ignore this error",
zap.String("query", utils.TruncateString(query, -1)),
Expand All @@ -172,30 +179,35 @@ func (conn *BaseConn) ExecuteSQLWithIgnoreError(tctx *tcontext.Context, ignoreEr
zap.String("query", utils.TruncateString(query, -1)),
zap.String("argument", utils.TruncateInterface(arg, -1)), log.ShortError(err))

startTime = time.Now()
rerr := txn.Rollback()
if rerr != nil {
tctx.L().Error("rollback failed",
zap.String("query", utils.TruncateString(query, -1)),
zap.String("argument", utils.TruncateInterface(arg, -1)),
log.ShortError(rerr))
} else {
hVec.WithLabelValues("rollback", task).Observe(time.Since(startTime).Seconds())
}
// we should return the exec err, instead of the rollback rerr.
return i, terror.ErrDBExecuteFailed.Delegate(err, utils.TruncateString(query, -1))
}
}
startTime = time.Now()
err = txn.Commit()
if err != nil {
return l - 1, terror.ErrDBExecuteFailed.Delegate(err, "commit") // mark failed on the last one
}
hVec.WithLabelValues("commit", task).Observe(time.Since(startTime).Seconds())
return l, nil
}

// ExecuteSQL executes sql on real DB,
// return
// 1. failed: (the index of sqls executed error, error)
// 2. succeed: (len(sqls), nil)
func (conn *BaseConn) ExecuteSQL(tctx *tcontext.Context, queries []string, args ...[]interface{}) (int, error) {
return conn.ExecuteSQLWithIgnoreError(tctx, nil, queries, args...)
func (conn *BaseConn) ExecuteSQL(tctx *tcontext.Context, hVec *prometheus.HistogramVec, task string, queries []string, args ...[]interface{}) (int, error) {
return conn.ExecuteSQLWithIgnoreError(tctx, hVec, task, nil, queries, args...)
}

// ApplyRetryStrategy apply specify strategy for BaseConn
Expand Down
22 changes: 17 additions & 5 deletions pkg/conn/baseconn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/DATA-DOG/go-sqlmock"
. "github.com/pingcap/check"
"github.com/prometheus/client_golang/prometheus"
)

func TestSuite(t *testing.T) {
Expand All @@ -34,6 +35,17 @@ var _ = Suite(&testBaseConnSuite{})
type testBaseConnSuite struct {
}

var (
testStmtHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "dm",
Subsystem: "conn",
Name: "stmt_duration_time",
Help: "Bucketed histogram of every statement query time (s).",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 18),
}, []string{"type", "task"})
)

func (t *testBaseConnSuite) TestBaseConn(c *C) {
baseConn := NewBaseConn(nil, nil)

Expand All @@ -44,7 +56,7 @@ func (t *testBaseConnSuite) TestBaseConn(c *C) {
_, err = baseConn.QuerySQL(tctx, "select 1")
c.Assert(terror.ErrDBUnExpect.Equal(err), IsTrue)

_, err = baseConn.ExecuteSQL(tctx, []string{""})
_, err = baseConn.ExecuteSQL(tctx, testStmtHistogram, "test", []string{""})
c.Assert(terror.ErrDBUnExpect.Equal(err), IsTrue)

db, mock, err := sqlmock.New()
Expand Down Expand Up @@ -74,24 +86,24 @@ func (t *testBaseConnSuite) TestBaseConn(c *C) {
_, err = baseConn.QuerySQL(tctx, "select 1")
c.Assert(terror.ErrDBQueryFailed.Equal(err), IsTrue)

affected, _ := baseConn.ExecuteSQL(tctx, []string{""})
affected, _ := baseConn.ExecuteSQL(tctx, testStmtHistogram, "test", []string{""})
c.Assert(affected, Equals, 0)

mock.ExpectBegin()
mock.ExpectExec("create database test").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()
affected, err = baseConn.ExecuteSQL(tctx, []string{"create database test"})
affected, err = baseConn.ExecuteSQL(tctx, testStmtHistogram, "test", []string{"create database test"})
c.Assert(err, IsNil)
c.Assert(affected, Equals, 1)

mock.ExpectBegin().WillReturnError(errors.New("begin error"))
_, err = baseConn.ExecuteSQL(tctx, []string{"create database test"})
_, err = baseConn.ExecuteSQL(tctx, testStmtHistogram, "test", []string{"create database test"})
c.Assert(terror.ErrDBExecuteFailed.Equal(err), IsTrue)

mock.ExpectBegin()
mock.ExpectExec("create database test").WillReturnError(errors.New("invalid connection"))
mock.ExpectRollback()
_, err = baseConn.ExecuteSQL(tctx, []string{"create database test"})
_, err = baseConn.ExecuteSQL(tctx, testStmtHistogram, "test", []string{"create database test"})
c.Assert(terror.ErrDBExecuteFailed.Equal(err), IsTrue)

if err = mock.ExpectationsWereMet(); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/conn/basedb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (t *testBaseDBSuite) TestGetBaseConn(c *C) {
mock.ExpectBegin()
mock.ExpectExec("create database test").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()
affected, err := dbConn.ExecuteSQL(tctx, []string{"create database test"})
affected, err := dbConn.ExecuteSQL(tctx, testStmtHistogram, "test", []string{"create database test"})
c.Assert(err, IsNil)
c.Assert(affected, Equals, 1)
}
6 changes: 3 additions & 3 deletions relay/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ var (
Subsystem: "relay",
Name: "write_duration",
Help: "bucketed histogram of write time (s) of single relay log event",
Buckets: prometheus.ExponentialBuckets(0.00005, 2, 20),
Buckets: prometheus.ExponentialBuckets(0.00005, 2, 21),
})

// should alert
Expand All @@ -110,7 +110,7 @@ var (
Subsystem: "relay",
Name: "read_binlog_duration",
Help: "bucketed histogram of read time (s) of single binlog event from the master.",
Buckets: prometheus.ExponentialBuckets(0.00005, 2, 20),
Buckets: prometheus.ExponentialBuckets(0.00005, 2, 21),
})

binlogTransformDurationHistogram = prometheus.NewHistogram(
Expand All @@ -119,7 +119,7 @@ var (
Subsystem: "relay",
Name: "read_transform_duration",
Help: "bucketed histogram of transform time (s) of single binlog event.",
Buckets: prometheus.ExponentialBuckets(0.00005, 2, 20),
Buckets: prometheus.ExponentialBuckets(0.00005, 2, 21),
})

// should alert
Expand Down
21 changes: 18 additions & 3 deletions syncer/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,19 @@ func (conn *DBConn) querySQL(tctx *tcontext.Context, query string, args ...inter
tctx,
params,
func(ctx *tcontext.Context) (interface{}, error) {
return conn.baseConn.QuerySQL(ctx, query, args...)
startTime := time.Now()
ret, err := conn.baseConn.QuerySQL(ctx, query, args...)
if err == nil {
cost := time.Since(startTime)
queryHistogram.WithLabelValues(conn.cfg.Name).Observe(cost.Seconds())
if cost.Seconds() > 1 {
ctx.L().Warn("query statement",
zap.String("query", utils.TruncateString(query, -1)),
zap.String("argument", utils.TruncateInterface(args, -1)),
zap.Duration("cost time", cost))
}
}
return ret, err
},
)

Expand Down Expand Up @@ -265,12 +277,15 @@ func (conn *DBConn) executeSQLWithIgnore(tctx *tcontext.Context, ignoreError fun
params,
func(ctx *tcontext.Context) (interface{}, error) {
startTime := time.Now()
ret, err := conn.baseConn.ExecuteSQLWithIgnoreError(ctx, ignoreError, queries, args...)
ret, err := conn.baseConn.ExecuteSQLWithIgnoreError(ctx, stmtHistogram, conn.cfg.Name, ignoreError, queries, args...)
if err == nil {
cost := time.Since(startTime)
txnHistogram.WithLabelValues(conn.cfg.Name).Observe(cost.Seconds())
if cost.Seconds() > 1 {
ctx.L().Warn("transaction execute successfully", zap.Duration("cost time", cost))
ctx.L().Warn("execute transaction",
zap.String("query", utils.TruncateInterface(queries, -1)),
zap.String("argument", utils.TruncateInterface(args, -1)),
zap.Duration("cost time", cost))
}
}
return ret, err
Expand Down
61 changes: 60 additions & 1 deletion syncer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,24 @@ import (
)

var (
binlogReadDurationHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "dm",
Subsystem: "syncer",
Name: "read_binlog_duration",
Help: "bucketed histogram of read time (s) for single binlog event from the relay log or master.",
Buckets: prometheus.ExponentialBuckets(0.00005, 2, 21),
}, []string{"task"})

binlogEventSizeHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "dm",
Subsystem: "syncer",
Name: "binlog_event_size",
Help: "size of a binlog event",
Buckets: prometheus.ExponentialBuckets(16, 2, 20),
}, []string{"task"})

binlogEvent = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "dm",
Expand All @@ -37,6 +55,15 @@ var (
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 18),
}, []string{"type", "task"})

conflictDetectDurationHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "dm",
Subsystem: "syncer",
Name: "conflict_detect_duration",
Help: "bucketed histogram of conflict detect time (s) for single DML statement",
Buckets: prometheus.ExponentialBuckets(0.00005, 2, 21),
}, []string{"task"})

binlogSkippedEventsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "dm",
Expand All @@ -61,6 +88,14 @@ var (
Help: "total number of finished jobs",
}, []string{"type", "task", "queueNo"})

queueSizeGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dm",
Subsystem: "syncer",
Name: "queue_size",
Help: "remain size of the DML queue",
}, []string{"task", "queueNo"})

binlogPosGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dm",
Expand All @@ -82,7 +117,7 @@ var (
Namespace: "dm",
Subsystem: "syncer",
Name: "sql_retries_total",
Help: "total number of sql retryies",
Help: "total number of sql retries",
}, []string{"type", "task"})

txnHistogram = prometheus.NewHistogramVec(
Expand All @@ -94,6 +129,24 @@ var (
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 18),
}, []string{"task"})

queryHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "dm",
Subsystem: "syncer",
Name: "query_duration_time",
Help: "Bucketed histogram of query time (s).",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 18),
}, []string{"task"})

stmtHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "dm",
Subsystem: "syncer",
Name: "stmt_duration_time",
Help: "Bucketed histogram of every statement query time (s).",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 18),
}, []string{"type", "task"})

// FIXME: should I move it to dm-worker?
cpuUsageGauge = prometheus.NewGauge(
prometheus.GaugeOpts{
Expand Down Expand Up @@ -148,14 +201,20 @@ var (

// RegisterMetrics registers metrics
func RegisterMetrics(registry *prometheus.Registry) {
registry.MustRegister(binlogReadDurationHistogram)
registry.MustRegister(binlogEventSizeHistogram)
registry.MustRegister(binlogEvent)
registry.MustRegister(conflictDetectDurationHistogram)
registry.MustRegister(binlogSkippedEventsTotal)
registry.MustRegister(addedJobsTotal)
registry.MustRegister(finishedJobsTotal)
registry.MustRegister(queueSizeGauge)
registry.MustRegister(sqlRetriesTotal)
registry.MustRegister(binlogPosGauge)
registry.MustRegister(binlogFileGauge)
registry.MustRegister(txnHistogram)
registry.MustRegister(stmtHistogram)
registry.MustRegister(queryHistogram)
registry.MustRegister(cpuUsageGauge)
registry.MustRegister(syncerExitWithErrorCounter)
registry.MustRegister(replicationLagGauge)
Expand Down
Loading