Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

slack-vitess-r14.0.5: vttablet tx-throttler backports, part 1 #62

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,8 @@ Usage of vttablet:
Synonym to -enable_consolidator_replicas
--enable-lag-throttler
Synonym to -enable_lag_throttler
--enable-per-workload-table-metrics
If true, query counts and query error metrics include a label that identifies the workload
--enable-query-plan-field-caching
Synonym to -enable_query_plan_field_caching (default true)
--enable-tx-throttler
Expand Down
1,415 changes: 714 additions & 701 deletions go/vt/proto/query/query.pb.go

Large diffs are not rendered by default.

43 changes: 43 additions & 0 deletions go/vt/proto/query/query_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions go/vt/sqlparser/comments.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ const (
DirectiveAllowHashJoin = "ALLOW_HASH_JOIN"
// DirectiveQueryPlanner lets the user specify per query which planner should be used
DirectiveQueryPlanner = "PLANNER"
// DirectiveWorkloadName specifies the name of the client application workload issuing the query.
DirectiveWorkloadName = "WORKLOAD_NAME"
)

func isNonSpace(r rune) bool {
Expand Down Expand Up @@ -369,3 +371,17 @@ func CommentsForStatement(stmt Statement) Comments {
}
return nil
}

// GetWorkloadNameFromStatement gets the workload name from the provided Statement, using workloadLabel as the name of
// the query directive that specifies it.
func GetWorkloadNameFromStatement(statement Statement) string {
commentedStatement, ok := statement.(Commented)
// This would mean that the statement lacks comments, so we can't obtain the workload from it. Hence default to
// empty workload name
if !ok {
return ""
}

directives := commentedStatement.GetParsedComments().Directives()
return directives.GetString(DirectiveWorkloadName, "")
}
8 changes: 8 additions & 0 deletions go/vt/vtgate/engine/fake_vcursor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,10 @@ func (t *noopVCursor) SetWorkload(querypb.ExecuteOptions_Workload) {
panic("implement me")
}

func (t *noopVCursor) SetWorkloadName(string) {
panic("implement me")
}

func (t *noopVCursor) SetPlannerVersion(querypb.ExecuteOptions_PlannerVersion) {
panic("implement me")
}
Expand Down Expand Up @@ -694,6 +698,10 @@ func (f *loggingVCursor) SetWorkload(querypb.ExecuteOptions_Workload) {
panic("implement me")
}

func (f *loggingVCursor) SetWorkloadName(string) {
panic("implement me")
}

func (f *loggingVCursor) SetPlannerVersion(querypb.ExecuteOptions_PlannerVersion) {
panic("implement me")
}
Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/engine/primitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ type (
SetTransactionMode(vtgatepb.TransactionMode)
SetWorkload(querypb.ExecuteOptions_Workload)
SetPlannerVersion(querypb.ExecuteOptions_PlannerVersion)
SetWorkloadName(string)
SetFoundRows(uint64)

SetDDLStrategy(string)
Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -973,6 +973,7 @@ func (e *Executor) getPlan(vcursor *vcursorImpl, sql string, comments sqlparser.
}
ignoreMaxMemoryRows := sqlparser.IgnoreMaxMaxMemoryRowsDirective(stmt)
vcursor.SetIgnoreMaxMemoryRows(ignoreMaxMemoryRows)
vcursor.SetWorkloadName(sqlparser.GetWorkloadNameFromStatement(stmt))

setVarComment, err := prepareSetVarComment(vcursor, stmt)
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions go/vt/vtgate/vcursor_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,12 @@ func (vc *vcursorImpl) SetPlannerVersion(v plancontext.PlannerVersion) {
vc.safeSession.GetOrCreateOptions().PlannerVersion = v
}

func (vc *vcursorImpl) SetWorkloadName(workloadName string) {
if workloadName != "" {
vc.safeSession.GetOrCreateOptions().WorkloadName = workloadName
}
}

// SetFoundRows implements the SessionActions interface
func (vc *vcursorImpl) SetFoundRows(foundRows uint64) {
vc.safeSession.FoundRows = foundRows
Expand Down
38 changes: 26 additions & 12 deletions go/vt/vttablet/tabletserver/query_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ type QueryEngine struct {
// stats
queryCounts, queryTimes, queryRowCounts, queryErrorCounts, queryRowsAffected, queryRowsReturned *stats.CountersWithMultiLabels

// stats flags
enablePerWorkloadTableMetrics bool

// Loggers
accessCheckerLogger *logutil.ThrottledLogger
}
Expand All @@ -170,11 +173,12 @@ func NewQueryEngine(env tabletenv.Env, se *schema.Engine) *QueryEngine {
}

qe := &QueryEngine{
env: env,
se: se,
tables: make(map[string]*schema.Table),
plans: cache.NewDefaultCacheImpl(cacheCfg),
queryRuleSources: rules.NewMap(),
env: env,
se: se,
tables: make(map[string]*schema.Table),
plans: cache.NewDefaultCacheImpl(cacheCfg),
queryRuleSources: rules.NewMap(),
enablePerWorkloadTableMetrics: config.EnablePerWorkloadTableMetrics,
}

qe.conns = connpool.NewPool(env, "ConnPool", config.OltpReadPool)
Expand Down Expand Up @@ -224,12 +228,18 @@ func NewQueryEngine(env tabletenv.Env, se *schema.Engine) *QueryEngine {
env.Exporter().NewGaugeFunc("QueryCacheSize", "Query engine query cache size", qe.plans.UsedCapacity)
env.Exporter().NewGaugeFunc("QueryCacheCapacity", "Query engine query cache capacity", qe.plans.MaxCapacity)
env.Exporter().NewCounterFunc("QueryCacheEvictions", "Query engine query cache evictions", qe.plans.Evictions)
qe.queryCounts = env.Exporter().NewCountersWithMultiLabels("QueryCounts", "query counts", []string{"Table", "Plan"})
qe.queryTimes = env.Exporter().NewCountersWithMultiLabels("QueryTimesNs", "query times in ns", []string{"Table", "Plan"})
qe.queryRowCounts = env.Exporter().NewCountersWithMultiLabels("QueryRowCounts", "(DEPRECATED - use QueryRowsAffected and QueryRowsReturned instead) query row counts", []string{"Table", "Plan"})
qe.queryRowsAffected = env.Exporter().NewCountersWithMultiLabels("QueryRowsAffected", "query rows affected", []string{"Table", "Plan"})
qe.queryRowsReturned = env.Exporter().NewCountersWithMultiLabels("QueryRowsReturned", "query rows returned", []string{"Table", "Plan"})
qe.queryErrorCounts = env.Exporter().NewCountersWithMultiLabels("QueryErrorCounts", "query error counts", []string{"Table", "Plan"})

labels := []string{"Table", "Plan"}
if config.EnablePerWorkloadTableMetrics {
labels = []string{"Table", "Plan", "Workload"}
}

qe.queryCounts = env.Exporter().NewCountersWithMultiLabels("QueryCounts", "query counts", labels)
qe.queryTimes = env.Exporter().NewCountersWithMultiLabels("QueryTimesNs", "query times in ns", labels)
qe.queryRowCounts = env.Exporter().NewCountersWithMultiLabels("QueryRowCounts", "(DEPRECATED - use QueryRowsAffected and QueryRowsReturned instead) query row counts", labels)
qe.queryRowsAffected = env.Exporter().NewCountersWithMultiLabels("QueryRowsAffected", "query rows affected", labels)
qe.queryRowsReturned = env.Exporter().NewCountersWithMultiLabels("QueryRowsReturned", "query rows returned", labels)
qe.queryErrorCounts = env.Exporter().NewCountersWithMultiLabels("QueryErrorCounts", "query error counts", labels)

env.Exporter().HandleFunc("/debug/hotrows", qe.txSerializer.ServeHTTP)
env.Exporter().HandleFunc("/debug/tablet_plans", qe.handleHTTPQueryPlans)
Expand Down Expand Up @@ -443,9 +453,13 @@ func (qe *QueryEngine) QueryPlanCacheLen() int {
}

// AddStats adds the given stats for the planName.tableName
func (qe *QueryEngine) AddStats(planType planbuilder.PlanType, tableName string, queryCount int64, duration, mysqlTime time.Duration, rowsAffected, rowsReturned, errorCount int64) {
func (qe *QueryEngine) AddStats(planType planbuilder.PlanType, tableName, workload string, queryCount int64, duration, mysqlTime time.Duration, rowsAffected, rowsReturned, errorCount int64) {
// table names can contain "." characters, replace them!
keys := []string{tableName, planType.String()}
// Only use the workload as a label if that's enabled in the configuration.
if qe.enablePerWorkloadTableMetrics {
keys = append(keys, workload)
}
qe.queryCounts.Add(keys, queryCount)
qe.queryTimes.Add(keys, int64(duration))
qe.queryRowCounts.Add(keys, rowsAffected)
Expand Down
Loading