Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

distsql, sql, ui: update SQL metrics #12299

Merged
merged 1 commit into from
Dec 15, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 58 additions & 36 deletions pkg/sql/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,19 @@ const sqlImplicitTxnName string = "sql txn implicit"

// Fully-qualified names for metrics.
var (
MetaLatency = metric.Metadata{Name: "sql.latency"}
MetaTxnBegin = metric.Metadata{Name: "sql.txn.begin.count"}
MetaTxnCommit = metric.Metadata{Name: "sql.txn.commit.count"}
MetaTxnAbort = metric.Metadata{Name: "sql.txn.abort.count"}
MetaTxnRollback = metric.Metadata{Name: "sql.txn.rollback.count"}
MetaSelect = metric.Metadata{Name: "sql.select.count"}
MetaUpdate = metric.Metadata{Name: "sql.update.count"}
MetaInsert = metric.Metadata{Name: "sql.insert.count"}
MetaDelete = metric.Metadata{Name: "sql.delete.count"}
MetaDdl = metric.Metadata{Name: "sql.ddl.count"}
MetaMisc = metric.Metadata{Name: "sql.misc.count"}
MetaQuery = metric.Metadata{Name: "sql.query.count"}
MetaLatency = metric.Metadata{Name: "sql.latency"}
MetaTxnBegin = metric.Metadata{Name: "sql.txn.begin.count"}
MetaTxnCommit = metric.Metadata{Name: "sql.txn.commit.count"}
MetaTxnAbort = metric.Metadata{Name: "sql.txn.abort.count"}
MetaTxnRollback = metric.Metadata{Name: "sql.txn.rollback.count"}
MetaSelect = metric.Metadata{Name: "sql.select.count"}
MetaDistSQLSelect = metric.Metadata{Name: "sql.distsql.select.count"}
MetaUpdate = metric.Metadata{Name: "sql.update.count"}
MetaInsert = metric.Metadata{Name: "sql.insert.count"}
MetaDelete = metric.Metadata{Name: "sql.delete.count"}
MetaDdl = metric.Metadata{Name: "sql.ddl.count"}
MetaMisc = metric.Metadata{Name: "sql.misc.count"}
MetaQuery = metric.Metadata{Name: "sql.query.count"}
)

// distSQLExecMode controls if and when the Executor uses DistSQL.
Expand Down Expand Up @@ -194,9 +195,11 @@ type Executor struct {
virtualSchemas virtualSchemaHolder

// Transient stats.
Latency *metric.Histogram
SelectCount *metric.Counter
TxnBeginCount *metric.Counter
Latency *metric.Histogram
SelectCount *metric.Counter
// The subset of SELECTs that are processed through DistSQL.
DistSQLSelectCount *metric.Counter
TxnBeginCount *metric.Counter

// txnCommitCount counts the number of times a COMMIT was attempted.
TxnCommitCount *metric.Counter
Expand Down Expand Up @@ -290,18 +293,19 @@ func NewExecutor(cfg ExecutorConfig, stopper *stop.Stopper) *Executor {
stopper: stopper,
reCache: parser.NewRegexpCache(512),

Latency: metric.NewLatency(MetaLatency, cfg.MetricsSampleInterval),
TxnBeginCount: metric.NewCounter(MetaTxnBegin),
TxnCommitCount: metric.NewCounter(MetaTxnCommit),
TxnAbortCount: metric.NewCounter(MetaTxnAbort),
TxnRollbackCount: metric.NewCounter(MetaTxnRollback),
SelectCount: metric.NewCounter(MetaSelect),
UpdateCount: metric.NewCounter(MetaUpdate),
InsertCount: metric.NewCounter(MetaInsert),
DeleteCount: metric.NewCounter(MetaDelete),
DdlCount: metric.NewCounter(MetaDdl),
MiscCount: metric.NewCounter(MetaMisc),
QueryCount: metric.NewCounter(MetaQuery),
Latency: metric.NewLatency(MetaLatency, cfg.MetricsSampleInterval),
TxnBeginCount: metric.NewCounter(MetaTxnBegin),
TxnCommitCount: metric.NewCounter(MetaTxnCommit),
TxnAbortCount: metric.NewCounter(MetaTxnAbort),
TxnRollbackCount: metric.NewCounter(MetaTxnRollback),
SelectCount: metric.NewCounter(MetaSelect),
DistSQLSelectCount: metric.NewCounter(MetaDistSQLSelect),
UpdateCount: metric.NewCounter(MetaUpdate),
InsertCount: metric.NewCounter(MetaInsert),
DeleteCount: metric.NewCounter(MetaDelete),
DdlCount: metric.NewCounter(MetaDdl),
MiscCount: metric.NewCounter(MetaMisc),
QueryCount: metric.NewCounter(MetaQuery),
}
}

Expand Down Expand Up @@ -590,7 +594,10 @@ func (e *Executor) execRequest(session *Session, sql string, copymsg copyMsg) St
var results []Result
origState := txnState.State

// Track if we are retrying this query, so that we do not double count.
isAutomaticRetry := false
txnClosure := func(txn *client.Txn, opt *client.TxnExecOptions) error {
defer func() { isAutomaticRetry = true }()
if txnState.State == Open && txnState.txn != txn {
panic(fmt.Sprintf("closure wasn't called in the txn we set up for it."+
"\ntxnState.txn:%+v\ntxn:%+v\ntxnState:%+v", txnState.txn, txn, txnState))
Expand All @@ -606,7 +613,7 @@ func (e *Executor) execRequest(session *Session, sql string, copymsg copyMsg) St
// Some results were produced by a previous attempt. Discard them.
ResultList(results).Close()
}
results, remainingStmts, err = runTxnAttempt(e, planMaker, origState, txnState, opt, stmtsToExec)
results, remainingStmts, err = runTxnAttempt(e, planMaker, origState, txnState, opt, stmtsToExec, isAutomaticRetry)

// TODO(andrei): Until #7881 fixed.
if err == nil && txnState.State == Aborted {
Expand Down Expand Up @@ -742,6 +749,7 @@ func runTxnAttempt(
txnState *txnState,
opt *client.TxnExecOptions,
stmts parser.StatementList,
isAutomaticRetry bool,
) ([]Result, parser.StatementList, error) {

// Ignore the state that might have been set by a previous try
Expand All @@ -752,7 +760,7 @@ func runTxnAttempt(
planMaker.setTxn(txnState.txn)
results, remainingStmts, err := e.execStmtsInCurrentTxn(
stmts, planMaker, txnState,
opt.AutoCommit /* implicitTxn */, opt.AutoRetry /* txnBeginning */)
opt.AutoCommit /* implicitTxn */, opt.AutoRetry /* txnBeginning */, isAutomaticRetry)
if opt.AutoCommit {
if len(remainingStmts) > 0 {
panic("implicit txn failed to execute all stmts")
Expand Down Expand Up @@ -797,6 +805,7 @@ func (e *Executor) execStmtsInCurrentTxn(
txnState *txnState,
implicitTxn bool,
txnBeginning bool,
isAutomaticRetry bool,
) ([]Result, parser.StatementList, error) {
var results []Result
if txnState.State == NoTxn {
Expand Down Expand Up @@ -833,7 +842,7 @@ func (e *Executor) execStmtsInCurrentTxn(
case Open:
res, err = e.execStmtInOpenTxn(
stmt, planMaker, implicitTxn, txnBeginning && (i == 0), /* firstInTxn */
txnState)
txnState, isAutomaticRetry)
case Aborted, RestartWait:
res, err = e.execStmtInAbortedTxn(stmt, txnState, planMaker)
case CommitWait:
Expand Down Expand Up @@ -967,12 +976,19 @@ func (e *Executor) execStmtInCommitWaitTxn(
// firstInTxn: set for the first statement in a transaction. Used
// so that nested BEGIN statements are caught.
// stmtTimestamp: Used as the statement_timestamp().
// isAutomaticRetry: A boolean that is set for retries so that we don't double
// count in metrics.
//
// Returns:
// - a Result
// - an error, if any. In case of error, the result returned also reflects this error.
func (e *Executor) execStmtInOpenTxn(
stmt parser.Statement, planMaker *planner, implicitTxn bool, firstInTxn bool, txnState *txnState,
stmt parser.Statement,
planMaker *planner,
implicitTxn bool,
firstInTxn bool,
txnState *txnState,
isAutomaticRetry bool,
) (Result, error) {
if txnState.State != Open {
panic("execStmtInOpenTxn called outside of an open txn")
Expand All @@ -987,8 +1003,10 @@ func (e *Executor) execStmtInOpenTxn(
session := planMaker.session
log.Eventf(session.context, "%s", stmt)

// TODO(cdo): Figure out how to not double count on retries.
e.updateStmtCounts(stmt)
// Do not double count automatically retried transactions.
if !isAutomaticRetry {
e.updateStmtCounts(stmt)
}
switch s := stmt.(type) {
case *parser.BeginTransaction:
if !firstInTxn {
Expand Down Expand Up @@ -1087,7 +1105,7 @@ func (e *Executor) execStmtInOpenTxn(
}

autoCommit := implicitTxn && !e.cfg.TestingKnobs.DisableAutoCommit
result, err := e.execStmt(stmt, planMaker, autoCommit)
result, err := e.execStmt(stmt, planMaker, autoCommit, isAutomaticRetry)
if err != nil {
if result.Rows != nil {
result.Rows.Close()
Expand Down Expand Up @@ -1278,7 +1296,7 @@ func (e *Executor) shouldUseDistSQL(session *Session, plan planNode) (bool, erro
// The current transaction might have been committed/rolled back when this returns.
// The caller closes result.Rows (even in error cases).
func (e *Executor) execStmt(
stmt parser.Statement, planMaker *planner, autoCommit bool,
stmt parser.Statement, planMaker *planner, autoCommit bool, isAutomaticRetry bool,
) (Result, error) {
plan, err := planMaker.makePlan(stmt, autoCommit)
if err != nil {
Expand All @@ -1305,7 +1323,11 @@ func (e *Executor) execStmt(
if err != nil {
return result, err
}
if useDistSQL {
if useDistSQL && !isAutomaticRetry {
switch stmt.(type) {
case *parser.Select:
e.DistSQLSelectCount.Inc(1)
}
err = e.execDistSQL(planMaker, plan, &result)
} else {
err = e.execClassic(plan, &result)
Expand Down
143 changes: 77 additions & 66 deletions pkg/sql/metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,84 +28,95 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
)

type queryCounter struct {
query string
txnBeginCount int64
selectCount int64
distSQLSelectCount int64
updateCount int64
insertCount int64
deleteCount int64
ddlCount int64
miscCount int64
txnCommitCount int64
txnRollbackCount int64
}

func TestQueryCounts(t *testing.T) {
defer leaktest.AfterTest(t)()
params, _ := createTestServerParams()
s, sqlDB, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop()

var testcases = []struct {
query string
txnBeginCount int64
selectCount int64
updateCount int64
insertCount int64
deleteCount int64
ddlCount int64
miscCount int64
txnCommitCount int64
txnRollbackCount int64
}{
{"", 0, 0, 0, 0, 0, 1, 0, 0, 0},
{"BEGIN; END", 1, 0, 0, 0, 0, 1, 0, 1, 0},
{"SELECT 1", 1, 1, 0, 0, 0, 1, 0, 1, 0},
{"CREATE DATABASE mt", 1, 1, 0, 0, 0, 2, 0, 1, 0},
{"CREATE TABLE mt.n (num INTEGER)", 1, 1, 0, 0, 0, 3, 0, 1, 0},
{"INSERT INTO mt.n VALUES (3)", 1, 1, 0, 1, 0, 3, 0, 1, 0},
{"UPDATE mt.n SET num = num + 1", 1, 1, 1, 1, 0, 3, 0, 1, 0},
{"DELETE FROM mt.n", 1, 1, 1, 1, 1, 3, 0, 1, 0},
{"ALTER TABLE mt.n ADD COLUMN num2 INTEGER", 1, 1, 1, 1, 1, 4, 0, 1, 0},
{"EXPLAIN SELECT * FROM mt.n", 1, 1, 1, 1, 1, 4, 1, 1, 0},
{"BEGIN; UPDATE mt.n SET num = num + 1; END", 2, 1, 2, 1, 1, 4, 1, 2, 0},
{"SELECT * FROM mt.n; SELECT * FROM mt.n; SELECT * FROM mt.n", 2, 4, 2, 1, 1, 4, 1, 2, 0},
{"DROP TABLE mt.n", 2, 4, 2, 1, 1, 5, 1, 2, 0},
{"SET database = system", 2, 4, 2, 1, 1, 5, 2, 2, 0},
}

var testcases = []queryCounter{
// The counts are deltas for each query.
{"", 0, 0, 0, 0, 0, 0, 1, 0, 0, 0},
{"BEGIN; END", 1, 0, 0, 0, 0, 0, 0, 0, 1, 0},
{"SELECT 1", 0, 1, 0, 0, 0, 0, 0, 0, 1, 0},
{"CREATE DATABASE mt", 0, 0, 0, 0, 0, 0, 1, 0, 0, 0},
{"CREATE TABLE mt.n (num INTEGER)", 0, 0, 0, 0, 0, 0, 1, 0, 0, 0},
{"INSERT INTO mt.n VALUES (3)", 0, 0, 0, 0, 1, 0, 0, 0, 0, 0},
{"UPDATE mt.n SET num = num + 1", 0, 0, 0, 1, 0, 0, 0, 0, 0, 0},
{"DELETE FROM mt.n", 0, 0, 0, 0, 0, 1, 0, 0, 0, 0},
{"ALTER TABLE mt.n ADD COLUMN num2 INTEGER", 0, 0, 0, 0, 0, 0, 1, 0, 0, 0},
{"EXPLAIN SELECT * FROM mt.n", 0, 0, 0, 0, 0, 0, 0, 1, 0, 0},
{"BEGIN; UPDATE mt.n SET num = num + 1; END", 1, 0, 0, 1, 0, 0, 0, 0, 1, 0},
{"SELECT * FROM mt.n; SELECT * FROM mt.n; SELECT * FROM mt.n", 0, 3, 0, 0, 0, 0, 0, 0, 0, 0},
{"SET DIST_SQL = 'on'", 0, 0, 0, 0, 0, 0, 0, 1, 0, 0},
{"SELECT * FROM mt.n", 0, 1, 1, 0, 0, 0, 0, 0, 0, 0},
{"SET DIST_SQL = 'off'", 0, 0, 0, 0, 0, 0, 0, 1, 0, 0},
{"DROP TABLE mt.n", 0, 0, 0, 0, 0, 0, 1, 0, 0, 0},
{"SET database = system", 0, 0, 0, 0, 0, 0, 0, 1, 0, 0},
}

accum := testcases[0]
for _, tc := range testcases {
if tc.query != "" {
if tc.query == "" {
continue
}

t.Run(tc.query, func(t *testing.T) {
if _, err := sqlDB.Exec(tc.query); err != nil {
t.Fatalf("unexpected error executing '%s': %s'", tc.query, err)
}
}

// Force metric snapshot refresh.
if err := s.WriteSummaries(); err != nil {
t.Fatal(err)
}

if err := checkCounterEQ(s, sql.MetaTxnBegin, tc.txnBeginCount); err != nil {
t.Errorf("%q: %s", tc.query, err)
}
if err := checkCounterEQ(s, sql.MetaTxnRollback, tc.txnRollbackCount); err != nil {
t.Errorf("%q: %s", tc.query, err)
}
if err := checkCounterEQ(s, sql.MetaTxnAbort, 0); err != nil {
t.Errorf("%q: %s", tc.query, err)
}
if err := checkCounterEQ(s, sql.MetaSelect, tc.selectCount); err != nil {
t.Errorf("%q: %s", tc.query, err)
}
if err := checkCounterEQ(s, sql.MetaUpdate, tc.updateCount); err != nil {
t.Errorf("%q: %s", tc.query, err)
}
if err := checkCounterEQ(s, sql.MetaInsert, tc.insertCount); err != nil {
t.Errorf("%q: %s", tc.query, err)
}
if err := checkCounterEQ(s, sql.MetaDelete, tc.deleteCount); err != nil {
t.Errorf("%q: %s", tc.query, err)
}
if err := checkCounterEQ(s, sql.MetaDdl, tc.ddlCount); err != nil {
t.Errorf("%q: %s", tc.query, err)
}
if err := checkCounterEQ(s, sql.MetaMisc, tc.miscCount); err != nil {
t.Errorf("%q: %s", tc.query, err)
}
}
// Force metric snapshot refresh.
if err := s.WriteSummaries(); err != nil {
t.Fatal(err)
}

// Everything after this query will also fail, so quit now to avoid deluge of errors.
if t.Failed() {
t.FailNow()
var err error
if accum.txnBeginCount, err = checkCounterDelta(s, sql.MetaTxnBegin, accum.txnBeginCount, tc.txnBeginCount); err != nil {
t.Errorf("%q: %s", tc.query, err)
}
if accum.distSQLSelectCount, err = checkCounterDelta(s, sql.MetaDistSQLSelect, accum.distSQLSelectCount, tc.distSQLSelectCount); err != nil {
t.Errorf("%q: %s", tc.query, err)
}
if accum.txnRollbackCount, err = checkCounterDelta(s, sql.MetaTxnRollback, accum.txnRollbackCount, tc.txnRollbackCount); err != nil {
t.Errorf("%q: %s", tc.query, err)
}
if err := checkCounterEQ(s, sql.MetaTxnAbort, 0); err != nil {
t.Errorf("%q: %s", tc.query, err)
}
if accum.selectCount, err = checkCounterDelta(s, sql.MetaSelect, accum.selectCount, tc.selectCount); err != nil {
t.Errorf("%q: %s", tc.query, err)
}
if accum.updateCount, err = checkCounterDelta(s, sql.MetaUpdate, accum.updateCount, tc.updateCount); err != nil {
t.Errorf("%q: %s", tc.query, err)
}
if accum.insertCount, err = checkCounterDelta(s, sql.MetaInsert, accum.insertCount, tc.insertCount); err != nil {
t.Errorf("%q: %s", tc.query, err)
}
if accum.deleteCount, err = checkCounterDelta(s, sql.MetaDelete, accum.deleteCount, tc.deleteCount); err != nil {
t.Errorf("%q: %s", tc.query, err)
}
if accum.ddlCount, err = checkCounterDelta(s, sql.MetaDdl, accum.ddlCount, tc.ddlCount); err != nil {
t.Errorf("%q: %s", tc.query, err)
}
if accum.miscCount, err = checkCounterDelta(s, sql.MetaMisc, accum.miscCount, tc.miscCount); err != nil {
t.Errorf("%q: %s", tc.query, err)
}
})
}
}

Expand Down
20 changes: 15 additions & 5 deletions pkg/sql/metric_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,26 @@ import (
"github.com/pkg/errors"
)

func checkCounterEQ(s serverutils.TestServerInterface, meta metric.Metadata, e int64) error {
if a := s.MustGetSQLCounter(meta.Name); a != e {
return errors.Errorf("stat %s: actual %d != expected %d", meta.Name, a, e)
func checkCounterDelta(
s serverutils.TestServerInterface, meta metric.Metadata, init, delta int64,
) (int64, error) {
actual := s.MustGetSQLCounter(meta.Name)
if actual != (init + delta) {
return actual, errors.Errorf("query %s: actual %d != (init %d + delta %d)",
meta.Name, actual, init, delta)
}
return nil
return actual, nil
}

func checkCounterEQ(s serverutils.TestServerInterface, meta metric.Metadata, e int64) error {
_, err := checkCounterDelta(s, meta, 0, e)
return err
}

func checkCounterGE(s serverutils.TestServerInterface, meta metric.Metadata, e int64) error {
if a := s.MustGetSQLCounter(meta.Name); a < e {
return errors.Errorf("stat %s: expected: actual %d >= %d", meta.Name, a, e)
return errors.Errorf("stat %s: expected: actual %d >= %d",
meta.Name, a, e)
}
return nil
}
Loading