Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Errant GTID Metrics Refactor #13670

Merged
merged 1 commit into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/cmd/vtorc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func main() {
servenv.ParseFlags("vtorc")
servenv.Init()
config.UpdateConfigValuesFromFlags()
inst.RegisterStats()

log.Info("starting vtorc")
if len(configFile) > 0 {
Expand Down
64 changes: 51 additions & 13 deletions go/test/endtoend/vtorc/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package api
import (
"encoding/json"
"fmt"
"math"
"reflect"
"testing"
"time"
Expand Down Expand Up @@ -200,22 +201,59 @@ func TestAPIEndpoints(t *testing.T) {
assert.Equal(t, 400, status, resp)
assert.Equal(t, "Filtering by shard without keyspace isn't supported\n", resp)

// Also verify that the metric for errant GTIDs is reporting the correct information.
_, resp, err = utils.MakeAPICall(t, vtorc, "/debug/vars")
// Also verify that we see the tablet in the errant GTIDs API call
status, resp, err = utils.MakeAPICall(t, vtorc, "/api/errant-gtids")
require.NoError(t, err)
resultMap := make(map[string]any)
err = json.Unmarshal([]byte(resp), &resultMap)
assert.Equal(t, 200, status, resp)
assert.Contains(t, resp, fmt.Sprintf(`"InstanceAlias": "%v"`, replica.Alias))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In any public API, I would like us to use TabletAlias versus InstanceAlias. They both mean the same thing in vitess, but InstanceAlias is an artifact of the fact that we imported code from orchestrator, and it can be confusing to users to have two terms that mean the same thing.
It will be good to replace Instance/Instances with Tablet/Tablets in function names / tests etc. as well.


// Check that filtering using keyspace and shard works
status, resp, err = utils.MakeAPICall(t, vtorc, "/api/errant-gtids?keyspace=ks&shard=0")
require.NoError(t, err)
assert.Equal(t, 200, status, resp)
assert.Contains(t, resp, fmt.Sprintf(`"InstanceAlias": "%v"`, replica.Alias))

// Check that filtering using keyspace works
status, resp, err = utils.MakeAPICall(t, vtorc, "/api/errant-gtids?keyspace=ks")
require.NoError(t, err)
assert.Equal(t, 200, status, resp)
assert.Contains(t, resp, fmt.Sprintf(`"InstanceAlias": "%v"`, replica.Alias))

// Check that filtering using keyspace and shard works
status, resp, err = utils.MakeAPICall(t, vtorc, "/api/errant-gtids?keyspace=ks&shard=80-")
require.NoError(t, err)
errantGTIDMap := reflect.ValueOf(resultMap["ErrantGtidMap"])
errantGtidTablets := errantGTIDMap.MapKeys()
require.Len(t, errantGtidTablets, 3)
assert.Equal(t, 200, status, resp)
assert.Equal(t, "null", resp)

errantGTIDinReplica := ""
for _, tabletKey := range errantGtidTablets {
if tabletKey.String() == replica.Alias {
errantGTIDinReplica = errantGTIDMap.MapIndex(tabletKey).Interface().(string)
// Check that filtering using just the shard fails
status, resp, err = utils.MakeAPICall(t, vtorc, "/api/errant-gtids?shard=0")
require.NoError(t, err)
assert.Equal(t, 400, status, resp)
assert.Equal(t, "Filtering by shard without keyspace isn't supported\n", resp)

// Also verify that the metric for errant GTIDs is reporting the correct count.
waitForErrantGTIDCount(t, vtorc, 1)
})
}

func waitForErrantGTIDCount(t *testing.T, vtorc *cluster.VTOrcProcess, errantGTIDCountWanted int) {
timeout := time.After(15 * time.Second)
for {
select {
case <-timeout:
t.Fatalf("Timed out waiting for errant gtid count in the metrics to be %v", errantGTIDCountWanted)
return
default:
_, resp, err := utils.MakeAPICall(t, vtorc, "/debug/vars")
require.NoError(t, err)
resultMap := make(map[string]any)
err = json.Unmarshal([]byte(resp), &resultMap)
require.NoError(t, err)
errantGTIDTabletsCount := reflect.ValueOf(resultMap["ErrantGtidTabletCount"])
if int(math.Round(errantGTIDTabletsCount.Float())) == errantGTIDCountWanted {
return
}
time.Sleep(100 * time.Millisecond)
}
require.NotEmpty(t, errantGTIDinReplica)
})
}
}
37 changes: 20 additions & 17 deletions go/vt/vtorc/inst/instance_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,6 @@ const (
var instanceReadChan = make(chan bool, backendDBConcurrency)
var instanceWriteChan = make(chan bool, backendDBConcurrency)

var (
// Mutex to protect the access of the following variable
errantGtidMapMu = sync.Mutex{}
errantGtidMap = make(map[string]string)
)

var forgetAliases *cache.Cache

var accessDeniedCounter = metrics.NewCounter()
Expand All @@ -81,11 +75,6 @@ func init() {
_ = metrics.Register("instance.write", writeInstanceCounter)
_ = writeBufferLatency.AddMany([]string{"wait", "write"})
writeBufferLatency.Start("wait")
stats.NewStringMapFuncWithMultiLabels("ErrantGtidMap", "Metric to track the errant GTIDs detected by VTOrc", []string{"TabletAlias"}, "ErrantGtid", func() map[string]string {
errantGtidMapMu.Lock()
defer errantGtidMapMu.Unlock()
return errantGtidMap
})

go initializeInstanceDao()
}
Expand Down Expand Up @@ -156,6 +145,14 @@ func logReadTopologyInstanceError(tabletAlias string, hint string, err error) er
return fmt.Errorf(msg)
}

// RegisterStats registers stats from the inst package
func RegisterStats() {
stats.NewGaugeFunc("ErrantGtidTabletCount", "Number of tablets with errant GTIDs", func() int64 {
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
instances, _ := ReadInstancesWithErrantGTIds("", "")
return int64(len(instances))
})
}

// ReadTopologyInstance collects information on the state of a MySQL
// server and writes the result synchronously to the vtorc
// backend.
Expand Down Expand Up @@ -385,12 +382,6 @@ Cleanup:
instance.GtidErrant, err = vitessmysql.Subtract(redactedExecutedGtidSet.String(), redactedPrimaryExecutedGtidSet.String())
}
}
// update the errant gtid map
go func() {
errantGtidMapMu.Lock()
defer errantGtidMapMu.Unlock()
errantGtidMap[topoproto.TabletAliasString(tablet.Alias)] = instance.GtidErrant
}()
}

latency.Stop("instance")
Expand Down Expand Up @@ -682,6 +673,18 @@ func ReadProblemInstances(keyspace string, shard string) ([](*Instance), error)
return readInstancesByCondition(condition, args, "")
}

// ReadInstancesWithErrantGTIds reads all instances with errant GTIDs
func ReadInstancesWithErrantGTIds(keyspace string, shard string) ([]*Instance, error) {
condition := `
keyspace LIKE (CASE WHEN ? = '' THEN '%' ELSE ? END)
and shard LIKE (CASE WHEN ? = '' THEN '%' ELSE ? END)
and gtid_errant != ''
`

args := sqlutils.Args(keyspace, keyspace, shard, shard)
return readInstancesByCondition(condition, args, "")
}

// GetKeyspaceShardName gets the keyspace shard name for the given instance key
func GetKeyspaceShardName(tabletAlias string) (keyspace string, shard string, err error) {
query := `
Expand Down
84 changes: 84 additions & 0 deletions go/vt/vtorc/inst/instance_dao_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,90 @@ func TestReadProblemInstances(t *testing.T) {
}
}

// TestReadInstancesWithErrantGTIds is used to test the functionality of ReadInstancesWithErrantGTIds and verify its failure modes and successes.
func TestReadInstancesWithErrantGTIds(t *testing.T) {
// The test is intended to be used as follows. The initial data is stored into the database. Following this, some specific queries are run that each individual test specifies to get the desired state.
tests := []struct {
name string
keyspace string
shard string
sql []string
instancesRequired []string
}{
{
name: "No instances with errant GTID",
sql: nil,
instancesRequired: nil,
}, {
name: "errant GTID",
sql: []string{
"update database_instance set gtid_errant = '729a4cc4-8680-11ed-a104-47706090afbd:1' where alias = 'zone1-0000000112'",
},
instancesRequired: []string{"zone1-0000000112"},
}, {
name: "keyspace filtering - success",
keyspace: "ks",
sql: []string{
"update database_instance set gtid_errant = '729a4cc4-8680-11ed-a104-47706090afbd:1' where alias = 'zone1-0000000112'",
},
instancesRequired: []string{"zone1-0000000112"},
}, {
name: "keyspace filtering - failure",
keyspace: "unknown",
sql: []string{
"update database_instance set gtid_errant = '729a4cc4-8680-11ed-a104-47706090afbd:1' where alias = 'zone1-0000000112'",
},
instancesRequired: nil,
}, {
name: "shard filtering - success",
keyspace: "ks",
shard: "0",
sql: []string{
"update database_instance set gtid_errant = '729a4cc4-8680-11ed-a104-47706090afbd:1' where alias = 'zone1-0000000112'",
},
instancesRequired: []string{"zone1-0000000112"},
}, {
name: "shard filtering - failure",
keyspace: "ks",
shard: "unknown",
sql: []string{
"update database_instance set gtid_errant = '729a4cc4-8680-11ed-a104-47706090afbd:1' where alias = 'zone1-0000000112'",
},
instancesRequired: nil,
},
}

// We need to set InstancePollSeconds to a large value otherwise all the instances are reported as having problems since their last_checked is very old.
// Setting this value to a hundred years, we ensure that this test doesn't fail with this issue for the next hundred years.
oldVal := config.Config.InstancePollSeconds
defer func() {
config.Config.InstancePollSeconds = oldVal
}()
config.Config.InstancePollSeconds = 60 * 60 * 24 * 365 * 100

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Each test should clear the database. The easiest way to do that is to run all the initialization commands again
defer func() {
db.ClearVTOrcDatabase()
}()

for _, query := range append(initialSQL, tt.sql...) {
_, err := db.ExecVTOrc(query)
require.NoError(t, err)
}

instances, err := ReadInstancesWithErrantGTIds(tt.keyspace, tt.shard)
require.NoError(t, err)
var tabletAliases []string
for _, instance := range instances {
tabletAliases = append(tabletAliases, instance.InstanceAlias)
}
require.ElementsMatch(t, tabletAliases, tt.instancesRequired)
})
}
}

// TestReadInstancesByCondition is used to test the functionality of readInstancesByCondition and verify its failure modes and successes.
func TestReadInstancesByCondition(t *testing.T) {
tests := []struct {
Expand Down
24 changes: 23 additions & 1 deletion go/vt/vtorc/server/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type vtorcAPI struct{}

const (
problemsAPI = "/api/problems"
errantGTIDsAPI = "/api/errant-gtids"
disableGlobalRecoveriesAPI = "/api/disable-global-recoveries"
enableGlobalRecoveriesAPI = "/api/enable-global-recoveries"
replicationAnalysisAPI = "/api/replication-analysis"
Expand All @@ -55,6 +56,7 @@ var (
apiHandler = &vtorcAPI{}
vtorcAPIPaths = []string{
problemsAPI,
errantGTIDsAPI,
disableGlobalRecoveriesAPI,
enableGlobalRecoveriesAPI,
replicationAnalysisAPI,
Expand All @@ -80,6 +82,8 @@ func (v *vtorcAPI) ServeHTTP(response http.ResponseWriter, request *http.Request
healthAPIHandler(response, request)
case problemsAPI:
problemsAPIHandler(response, request)
case errantGTIDsAPI:
errantGTIDsAPIHandler(response, request)
case replicationAnalysisAPI:
replicationAnalysisAPIHandler(response, request)
case AggregatedDiscoveryMetricsAPI:
Expand All @@ -94,7 +98,7 @@ func (v *vtorcAPI) ServeHTTP(response http.ResponseWriter, request *http.Request
// getACLPermissionLevelForAPI returns the acl permission level that is required to run a given API
func getACLPermissionLevelForAPI(apiEndpoint string) string {
switch apiEndpoint {
case problemsAPI:
case problemsAPI, errantGTIDsAPI:
return acl.MONITORING
case disableGlobalRecoveriesAPI, enableGlobalRecoveriesAPI:
return acl.ADMIN
Expand Down Expand Up @@ -144,6 +148,24 @@ func problemsAPIHandler(response http.ResponseWriter, request *http.Request) {
returnAsJSON(response, http.StatusOK, instances)
}

// errantGTIDsAPIHandler is the handler for the errantGTIDsAPI endpoint
func errantGTIDsAPIHandler(response http.ResponseWriter, request *http.Request) {
// This api also supports filtering by shard and keyspace provided.
shard := request.URL.Query().Get("shard")
keyspace := request.URL.Query().Get("keyspace")
if shard != "" && keyspace == "" {
http.Error(response, shardWithoutKeyspaceFilteringErrorStr, http.StatusBadRequest)
return
}

instances, err := inst.ReadInstancesWithErrantGTIds(keyspace, shard)
if err != nil {
http.Error(response, err.Error(), http.StatusInternalServerError)
return
}
returnAsJSON(response, http.StatusOK, instances)
}

// AggregatedDiscoveryMetricsAPIHandler is the handler for the discovery metrics endpoint
func AggregatedDiscoveryMetricsAPIHandler(response http.ResponseWriter, request *http.Request) {
// return metrics for last x seconds
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vtorc/server/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ func TestGetACLPermissionLevelForAPI(t *testing.T) {
{
apiEndpoint: problemsAPI,
want: acl.MONITORING,
}, {
apiEndpoint: errantGTIDsAPI,
want: acl.MONITORING,
}, {
apiEndpoint: disableGlobalRecoveriesAPI,
want: acl.ADMIN,
Expand Down