diff --git a/ddl/db_test.go b/ddl/db_test.go index 056b28e616384..94c657caafc7f 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -3136,6 +3136,27 @@ func (s *testDBSuite1) TestModifyColumnCharset(c *C) { } +func (s *testDBSuite1) TestSetTableFlashReplica(c *C) { + s.tk = testkit.NewTestKit(c, s.store) + s.tk.MustExec("use test_db") + s.mustExec(c, "drop table if exists t_flash;") + s.tk.MustExec("create table t_flash(a int, b int)") + defer s.mustExec(c, "drop table t_flash;") + + t := s.testGetTable(c, "t_flash") + c.Assert(t.Meta().TiFlashReplica, IsNil) + + s.tk.MustExec("alter table t_flash set tiflash replica 2 location labels 'a','b';") + t = s.testGetTable(c, "t_flash") + c.Assert(t.Meta().TiFlashReplica, NotNil) + c.Assert(t.Meta().TiFlashReplica.Count, Equals, uint64(2)) + c.Assert(strings.Join(t.Meta().TiFlashReplica.LocationLabels, ","), Equals, strings.Join([]string{"a", "b"}, ",")) + + s.tk.MustExec("alter table t_flash set tiflash replica 0") + t = s.testGetTable(c, "t_flash") + c.Assert(t.Meta().TiFlashReplica, IsNil) +} + func (s *testDBSuite4) TestAlterShardRowIDBits(c *C) { c.Assert(failpoint.Enable("github.com/pingcap/tidb/meta/autoid/mockAutoIDChange", `return(true)`), IsNil) defer func() { diff --git a/ddl/ddl.go b/ddl/ddl.go index fc5461297b43b..f5dc974895603 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -256,6 +256,7 @@ type DDL interface { LockTables(ctx sessionctx.Context, stmt *ast.LockTablesStmt) error UnlockTables(ctx sessionctx.Context, lockedTables []model.TableLockTpInfo) error CleanupTableLock(ctx sessionctx.Context, tables []*ast.TableName) error + UpdateTableReplicaInfo(ctx sessionctx.Context, tid int64, available bool) error // GetLease returns current schema lease time. GetLease() time.Duration diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 8ea7bac831ba9..71700fd4a1b0e 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -1998,6 +1998,8 @@ func (d *ddl) AlterTable(ctx sessionctx.Context, ident ast.Ident, specs []*ast.A return errors.Trace(err) } } + case ast.AlterTableSetTiFlashReplica: + err = d.AlterTableSetTiFlashReplica(ctx, ident, spec.TiFlashReplica) default: // Nothing to do now. } @@ -2988,6 +2990,71 @@ func (d *ddl) AlterTableCharsetAndCollate(ctx sessionctx.Context, ident ast.Iden return errors.Trace(err) } +// AlterTableSetTiFlashReplica sets the TiFlash replicas info. +func (d *ddl) AlterTableSetTiFlashReplica(ctx sessionctx.Context, ident ast.Ident, replicaInfo *ast.TiFlashReplicaSpec) error { + schema, tb, err := d.getSchemaAndTableByIdent(ctx, ident) + if err != nil { + return errors.Trace(err) + } + + tbReplicaInfo := tb.Meta().TiFlashReplica + if tbReplicaInfo != nil && tbReplicaInfo.Count == replicaInfo.Count && + len(tbReplicaInfo.LocationLabels) == len(replicaInfo.Labels) { + changed := false + for i, lable := range tbReplicaInfo.LocationLabels { + if replicaInfo.Labels[i] != lable { + changed = true + break + } + } + if !changed { + return nil + } + } + + job := &model.Job{ + SchemaID: schema.ID, + TableID: tb.Meta().ID, + SchemaName: schema.Name.L, + Type: model.ActionSetTiFlashReplica, + BinlogInfo: &model.HistoryInfo{}, + Args: []interface{}{*replicaInfo}, + } + err = d.doDDLJob(ctx, job) + err = d.callHookOnChanged(err) + return errors.Trace(err) +} + +// UpdateTableReplicaInfo updates the table flash replica infos. +func (d *ddl) UpdateTableReplicaInfo(ctx sessionctx.Context, tid int64, available bool) error { + is := d.infoHandle.Get() + tb, ok := is.TableByID(tid) + if !ok { + return infoschema.ErrTableNotExists.GenWithStack("Table which ID = %d does not exist.", tid) + } + + if tb.Meta().TiFlashReplica == nil || (tb.Meta().TiFlashReplica.Available == available) { + return nil + } + + db, ok := is.SchemaByTable(tb.Meta()) + if !ok { + return infoschema.ErrDatabaseNotExists.GenWithStack("Database of table `%s` does not exist.", tb.Meta().Name) + } + + job := &model.Job{ + SchemaID: db.ID, + TableID: tb.Meta().ID, + SchemaName: db.Name.L, + Type: model.ActionUpdateTiFlashReplicaStatus, + BinlogInfo: &model.HistoryInfo{}, + Args: []interface{}{available}, + } + err := d.doDDLJob(ctx, job) + err = d.callHookOnChanged(err) + return errors.Trace(err) +} + // checkAlterTableCharset uses to check is it possible to change the charset of table. // This function returns 2 variable: // doNothing: if doNothing is true, means no need to change any more, because the target charset is same with the charset of table. diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 334b9b1cabd33..9454fe9ea3a3f 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -557,6 +557,10 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, ver, err = onLockTables(t, job) case model.ActionUnlockTable: ver, err = onUnlockTables(t, job) + case model.ActionSetTiFlashReplica: + ver, err = onSetTableFlashReplica(t, job) + case model.ActionUpdateTiFlashReplicaStatus: + ver, err = onUpdateFlashReplicaStatus(t, job) default: // Invalid job, cancel it. job.State = model.JobStateCancelled diff --git a/ddl/table.go b/ddl/table.go index e61e281b06f8b..636cfa3588635 100644 --- a/ddl/table.go +++ b/ddl/table.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "github.com/pingcap/parser/ast" "github.com/pingcap/parser/charset" "github.com/pingcap/parser/model" field_types "github.com/pingcap/parser/types" @@ -676,6 +677,62 @@ func onModifyTableCharsetAndCollate(t *meta.Meta, job *model.Job) (ver int64, _ return ver, nil } +func onSetTableFlashReplica(t *meta.Meta, job *model.Job) (ver int64, _ error) { + var replicaInfo ast.TiFlashReplicaSpec + if err := job.DecodeArgs(&replicaInfo); err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + + tblInfo, err := getTableInfoAndCancelFaultJob(t, job, job.SchemaID) + if err != nil { + return ver, errors.Trace(err) + } + + if replicaInfo.Count > 0 { + tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{ + Count: replicaInfo.Count, + LocationLabels: replicaInfo.Labels, + } + } else { + tblInfo.TiFlashReplica = nil + } + + ver, err = updateVersionAndTableInfo(t, job, tblInfo, true) + if err != nil { + return ver, errors.Trace(err) + } + job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo) + return ver, nil +} + +func onUpdateFlashReplicaStatus(t *meta.Meta, job *model.Job) (ver int64, _ error) { + var available bool + if err := job.DecodeArgs(&available); err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + + tblInfo, err := getTableInfoAndCancelFaultJob(t, job, job.SchemaID) + if err != nil { + return ver, errors.Trace(err) + } + if tblInfo.TiFlashReplica == nil || (tblInfo.TiFlashReplica.Available == available) { + return ver, nil + } + + if tblInfo.TiFlashReplica != nil { + tblInfo.TiFlashReplica.Available = available + } + + ver, err = updateVersionAndTableInfo(t, job, tblInfo, true) + if err != nil { + return ver, errors.Trace(err) + } + job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo) + return ver, nil +} + func checkTableNotExists(d *ddlCtx, t *meta.Meta, schemaID int64, tableName string) error { // d.infoHandle maybe nil in some test. if d.infoHandle == nil || !d.infoHandle.IsValid() { diff --git a/server/http_handler.go b/server/http_handler.go index a010f0a89554c..df44009cb1022 100644 --- a/server/http_handler.go +++ b/server/http_handler.go @@ -344,6 +344,10 @@ type dbTableHandler struct { *tikvHandlerTool } +type flashReplicaHandler struct { + *tikvHandlerTool +} + // regionHandler is the common field for http handler. It contains // some common functions for all handlers. type regionHandler struct { @@ -668,6 +672,83 @@ func (h binlogRecover) ServeHTTP(w http.ResponseWriter, req *http.Request) { binloginfo.DisableSkipBinlogFlag() } +type tableFlashReplicaInfo struct { + // Modifying the field name needs to negotiate with TiFlash colleague. + ID int64 `json:"id"` + ReplicaCount uint64 `json:"replica_count"` + LocationLabels []string `json:"location_labels"` + Available bool `json:"available"` +} + +func (h flashReplicaHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + if req.Method == http.MethodPost { + h.handleStatusReport(w, req) + return + } + schema, err := h.schema() + if err != nil { + writeError(w, err) + return + } + replicaInfos := make([]*tableFlashReplicaInfo, 0) + allDBs := schema.AllSchemas() + for _, db := range allDBs { + tables := schema.SchemaTables(db.Name) + for _, tbl := range tables { + tblInfo := tbl.Meta() + if tblInfo.TiFlashReplica == nil { + continue + } + replicaInfos = append(replicaInfos, &tableFlashReplicaInfo{ + ID: tblInfo.ID, + ReplicaCount: tblInfo.TiFlashReplica.Count, + LocationLabels: tblInfo.TiFlashReplica.LocationLabels, + Available: tblInfo.TiFlashReplica.Available, + }) + } + } + writeData(w, replicaInfos) +} + +type tableFlashReplicaStatus struct { + // Modifying the field name needs to negotiate with TiFlash colleague. + ID int64 `json:"id"` + RegionCount uint64 `json:"region_count"` + FlashRegionCount uint64 `json:"flash_region_count"` +} + +// checkTableFlashReplicaAvailable uses to check the available status of table flash replica. +func (tf *tableFlashReplicaStatus) checkTableFlashReplicaAvailable() bool { + return tf.FlashRegionCount == tf.RegionCount +} + +func (h flashReplicaHandler) handleStatusReport(w http.ResponseWriter, req *http.Request) { + var status tableFlashReplicaStatus + err := json.NewDecoder(req.Body).Decode(&status) + if err != nil { + writeError(w, err) + return + } + do, err := session.GetDomain(h.Store.(kv.Storage)) + if err != nil { + writeError(w, err) + return + } + s, err := session.CreateSession(h.Store.(kv.Storage)) + if err != nil { + writeError(w, err) + return + } + err = do.DDL().UpdateTableReplicaInfo(s, status.ID, status.checkTableFlashReplicaAvailable()) + if err != nil { + writeError(w, err) + } + logutil.BgLogger().Info("handle flash replica report", zap.Int64("table ID", status.ID), zap.Uint64("region count", + status.RegionCount), + zap.Uint64("flash region count", status.FlashRegionCount), + zap.Error(err)) +} + // ServeHTTP handles request of list a database or table's schemas. func (h schemaHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { schema, err := h.schema() diff --git a/server/http_handler_test.go b/server/http_handler_test.go index 8771d616acf83..d1e8c141610cc 100644 --- a/server/http_handler_test.go +++ b/server/http_handler_test.go @@ -14,6 +14,7 @@ package server import ( + "bytes" "database/sql" "encoding/base64" "encoding/json" @@ -23,6 +24,7 @@ import ( "net/http" "net/url" "sort" + "strings" "sync/atomic" "time" @@ -393,6 +395,76 @@ func (ts *HTTPHandlerTestSuite) TestGetMVCCNotFound(c *C) { c.Assert(data.Value.Info.Values, IsNil) } +func (ts *HTTPHandlerTestSuite) TestTiFlashReplica(c *C) { + ts.startServer(c) + ts.prepareData(c) + defer ts.stopServer(c) + resp, err := http.Get("http://127.0.0.1:10090/tiflash/replica") + c.Assert(err, IsNil) + decoder := json.NewDecoder(resp.Body) + var data []tableFlashReplicaInfo + err = decoder.Decode(&data) + c.Assert(err, IsNil) + c.Assert(len(data), Equals, 0) + + db, err := sql.Open("mysql", getDSN()) + c.Assert(err, IsNil, Commentf("Error connecting")) + defer db.Close() + dbt := &DBTest{c, db} + + dbt.mustExec("use tidb") + dbt.mustExec("alter table test set tiflash replica 2 location labels 'a','b';") + + resp, err = http.Get("http://127.0.0.1:10090/tiflash/replica") + c.Assert(err, IsNil) + decoder = json.NewDecoder(resp.Body) + err = decoder.Decode(&data) + c.Assert(err, IsNil) + c.Assert(len(data), Equals, 1) + c.Assert(data[0].ReplicaCount, Equals, uint64(2)) + c.Assert(strings.Join(data[0].LocationLabels, ","), Equals, "a,b") + c.Assert(data[0].Available, Equals, false) + + resp, err = http.Post("http://127.0.0.1:10090/tiflash/replica", "application/json", bytes.NewBuffer([]byte(`{"id":84,"region_count":3,"flash_region_count":3}`))) + c.Assert(err, IsNil) + c.Assert(resp, NotNil) + body, err := ioutil.ReadAll(resp.Body) + c.Assert(err, IsNil) + c.Assert(string(body), Equals, "[schema:1146]Table which ID = 84 does not exist.") + + t, err := ts.domain.InfoSchema().TableByName(model.NewCIStr("tidb"), model.NewCIStr("test")) + c.Assert(err, IsNil) + req := fmt.Sprintf(`{"id":%d,"region_count":3,"flash_region_count":3}`, t.Meta().ID) + resp, err = http.Post("http://127.0.0.1:10090/tiflash/replica", "application/json", bytes.NewBuffer([]byte(req))) + c.Assert(err, IsNil) + c.Assert(resp, NotNil) + body, err = ioutil.ReadAll(resp.Body) + c.Assert(err, IsNil) + c.Assert(string(body), Equals, "") + + resp, err = http.Get("http://127.0.0.1:10090/tiflash/replica") + c.Assert(err, IsNil) + decoder = json.NewDecoder(resp.Body) + err = decoder.Decode(&data) + c.Assert(err, IsNil) + c.Assert(len(data), Equals, 1) + c.Assert(data[0].ReplicaCount, Equals, uint64(2)) + c.Assert(strings.Join(data[0].LocationLabels, ","), Equals, "a,b") + c.Assert(data[0].Available, Equals, true) // The status should be true now. + + // Should not take effect. + dbt.mustExec("alter table test set tiflash replica 2 location labels 'a','b';") + resp, err = http.Get("http://127.0.0.1:10090/tiflash/replica") + c.Assert(err, IsNil) + decoder = json.NewDecoder(resp.Body) + err = decoder.Decode(&data) + c.Assert(err, IsNil) + c.Assert(len(data), Equals, 1) + c.Assert(data[0].ReplicaCount, Equals, uint64(2)) + c.Assert(strings.Join(data[0].LocationLabels, ","), Equals, "a,b") + c.Assert(data[0].Available, Equals, true) // The status should be true now. +} + func (ts *HTTPHandlerTestSuite) TestDecodeColumnValue(c *C) { ts.startServer(c) ts.prepareData(c) diff --git a/server/http_status.go b/server/http_status.go index e8730448f72bb..1710007b7dc74 100644 --- a/server/http_status.go +++ b/server/http_status.go @@ -93,6 +93,8 @@ func (s *Server) startHTTPServer() { router.Handle("/info/all", allServerInfoHandler{tikvHandlerTool}).Name("InfoALL") // HTTP path for get db and table info that is related to the tableID. router.Handle("/db-table/{tableID}", dbTableHandler{tikvHandlerTool}) + // HTTP path for get table tiflash replica info. + router.Handle("/tiflash/replica", flashReplicaHandler{tikvHandlerTool}) if s.cfg.Store == "tikv" { // HTTP path for tikv.