Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support the TiFlash replica of table #12453

Merged
merged 27 commits into from
Oct 29, 2019
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
4ec146d
*: suport alter table set flash replica syntax
crazycs520 Sep 27, 2019
5ceeb6f
add http api
crazycs520 Sep 27, 2019
e1e46cc
add http api to report replica status
crazycs520 Sep 27, 2019
3eaa30c
add test
crazycs520 Sep 27, 2019
5debf92
fmt code
crazycs520 Sep 27, 2019
de738e8
remove check
crazycs520 Sep 28, 2019
5f0ab30
only store available status in table info
crazycs520 Oct 8, 2019
e17d7bc
Merge branch 'master' of https://github.com/pingcap/tidb into tiflash…
crazycs520 Oct 8, 2019
a99d991
Merge branch 'master' of https://github.com/pingcap/tidb into tiflash…
crazycs520 Oct 21, 2019
8826872
update parser and rename variable
crazycs520 Oct 21, 2019
3a8376a
rename
crazycs520 Oct 21, 2019
ca5f264
update parser
crazycs520 Oct 22, 2019
0d5381b
Merge branch 'master' of https://github.com/pingcap/tidb into tiflash…
crazycs520 Oct 22, 2019
05d02a6
fix typo
crazycs520 Oct 22, 2019
401df18
Merge branch 'master' into tiflash_region
crazycs520 Oct 22, 2019
a82fcff
Merge branch 'master' into tiflash_region
lzmhhh123 Oct 23, 2019
6f93629
Merge branch 'master' of https://github.com/pingcap/tidb into tiflash…
crazycs520 Oct 29, 2019
02aa042
address comment
crazycs520 Oct 29, 2019
529ebd2
Merge branch 'master' into tiflash_region
crazycs520 Oct 29, 2019
9aa9835
address comment
crazycs520 Oct 29, 2019
ff877c8
Merge branch 'tiflash_region' of https://github.com/crazycs520/tidb i…
crazycs520 Oct 29, 2019
fd30dc5
address comment
crazycs520 Oct 29, 2019
ea5275b
address comment
crazycs520 Oct 29, 2019
d6b36d0
add test and fix check
crazycs520 Oct 29, 2019
50f93cb
Merge branch 'master' into tiflash_region
crazycs520 Oct 29, 2019
76a9be5
Merge branch 'master' into tiflash_region
crazycs520 Oct 29, 2019
9f0c9d3
Merge branch 'master' into tiflash_region
crazycs520 Oct 29, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3008,6 +3008,27 @@ func (s *testDBSuite1) TestModifyColumnCharset(c *C) {

}

func (s *testDBSuite1) TestSetTableFlashReplica(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
s.tk.MustExec("use test_db")
s.mustExec(c, "drop table if exists t_flash;")
s.tk.MustExec("create table t_flash(a int, b int)")
defer s.mustExec(c, "drop table t_flash;")

t := s.testGetTable(c, "t_flash")
c.Assert(t.Meta().FlashReplica, IsNil)

s.tk.MustExec("alter table t_flash set flash replica 2 location labels 'a','b';")
t = s.testGetTable(c, "t_flash")
c.Assert(t.Meta().FlashReplica, NotNil)
c.Assert(t.Meta().FlashReplica.Count, Equals, uint64(2))
c.Assert(strings.Join(t.Meta().FlashReplica.LocationLabels, ","), Equals, strings.Join([]string{"a", "b"}, ","))

s.tk.MustExec("alter table t_flash set flash replica 0")
t = s.testGetTable(c, "t_flash")
c.Assert(t.Meta().FlashReplica, IsNil)
}

func (s *testDBSuite4) TestAlterShardRowIDBits(c *C) {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/meta/autoid/mockAutoIDChange", `return(true)`), IsNil)
defer func() {
Expand Down
1 change: 1 addition & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ type DDL interface {
LockTables(ctx sessionctx.Context, stmt *ast.LockTablesStmt) error
UnlockTables(ctx sessionctx.Context, lockedTables []model.TableLockTpInfo) error
CleanupTableLock(ctx sessionctx.Context, tables []*ast.TableName) error
UpdateTableReplicaInfo(ctx sessionctx.Context, tid int64, available bool) error

// GetLease returns current schema lease time.
GetLease() time.Duration
Expand Down
58 changes: 58 additions & 0 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2011,6 +2011,8 @@ func (d *ddl) AlterTable(ctx sessionctx.Context, ident ast.Ident, specs []*ast.A
return errors.Trace(err)
}
}
case ast.AlterTableSetFlashReplica:
err = d.AlterTableSetFlashReplica(ctx, ident, spec.FlashReplica)
default:
// Nothing to do now.
}
Expand Down Expand Up @@ -3001,6 +3003,62 @@ func (d *ddl) AlterTableCharsetAndCollate(ctx sessionctx.Context, ident ast.Iden
return errors.Trace(err)
}

// AlterTableSetFlashReplica changes the table charset and collate.
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
func (d *ddl) AlterTableSetFlashReplica(ctx sessionctx.Context, ident ast.Ident, replicaInfo *ast.FlashReplicaSpec) error {
is := d.infoHandle.Get()
schema, ok := is.SchemaByName(ident.Schema)
if !ok {
return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ident.Schema)
}

tb, err := is.TableByName(ident.Schema, ident.Name)
if err != nil {
return errors.Trace(infoschema.ErrTableNotExists.GenWithStackByArgs(ident.Schema, ident.Name))
}
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about alter table t_flash set tiflash replica 2 location labels 'a','a'; ?
If labels are the same, is it OK?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's OK. PD will get the same labels and deduplicate.

job := &model.Job{
SchemaID: schema.ID,
TableID: tb.Meta().ID,
SchemaName: schema.Name.L,
Type: model.ActionSetFlashReplica,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{*replicaInfo},
}
err = d.doDDLJob(ctx, job)
err = d.callHookOnChanged(err)
return errors.Trace(err)
}

// UpdateTableReplicaInfo updates the table flash replica infos.
func (d *ddl) UpdateTableReplicaInfo(ctx sessionctx.Context, tid int64, available bool) error {
is := d.infoHandle.Get()
tb, ok := is.TableByID(tid)
if !ok {
return infoschema.ErrTableNotExists.GenWithStack("Table which ID = %d does not exist.", tid)
}

if tb.Meta().FlashReplica == nil || (tb.Meta().FlashReplica.Available == available) {
return nil
}

db, ok := is.SchemaByTable(tb.Meta())
if !ok {
return infoschema.ErrDatabaseNotExists.GenWithStack("Database of table `%s` does not exist.", tb.Meta().Name)
}

job := &model.Job{
SchemaID: db.ID,
TableID: tb.Meta().ID,
SchemaName: db.Name.L,
Type: model.ActionUpdateFlashReplicaStatus,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{available},
}
err := d.doDDLJob(ctx, job)
err = d.callHookOnChanged(err)
return errors.Trace(err)
}

// checkAlterTableCharset uses to check is it possible to change the charset of table.
// This function returns 2 variable:
// doNothing: if doNothing is true, means no need to change any more, because the target charset is same with the charset of table.
Expand Down
4 changes: 4 additions & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,10 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
ver, err = onLockTables(t, job)
case model.ActionUnlockTable:
ver, err = onUnlockTables(t, job)
case model.ActionSetFlashReplica:
ver, err = onSetTableFlashReplica(t, job)
case model.ActionUpdateFlashReplicaStatus:
ver, err = onUpdateFlashReplicaStatus(t, job)
default:
// Invalid job, cancel it.
job.State = model.JobStateCancelled
Expand Down
54 changes: 54 additions & 0 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/charset"
"github.com/pingcap/parser/model"
field_types "github.com/pingcap/parser/types"
Expand Down Expand Up @@ -676,6 +677,59 @@ func onModifyTableCharsetAndCollate(t *meta.Meta, job *model.Job) (ver int64, _
return ver, nil
}

func onSetTableFlashReplica(t *meta.Meta, job *model.Job) (ver int64, _ error) {
var replicaInfo ast.FlashReplicaSpec
if err := job.DecodeArgs(&replicaInfo); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

tblInfo, err := getTableInfoAndCancelFaultJob(t, job, job.SchemaID)
if err != nil {
return ver, errors.Trace(err)
}

if replicaInfo.Count > 0 {
tblInfo.FlashReplica = &model.FlashReplicaInfo{
Count: replicaInfo.Count,
LocationLabels: replicaInfo.Labels,
zimulala marked this conversation as resolved.
Show resolved Hide resolved
}
} else {
tblInfo.FlashReplica = nil
}

ver, err = updateVersionAndTableInfo(t, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
}
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
return ver, nil
}

func onUpdateFlashReplicaStatus(t *meta.Meta, job *model.Job) (ver int64, _ error) {
var available bool
if err := job.DecodeArgs(&available); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

tblInfo, err := getTableInfoAndCancelFaultJob(t, job, job.SchemaID)
if err != nil {
return ver, errors.Trace(err)
}

if tblInfo.FlashReplica != nil {
tblInfo.FlashReplica.Available = available
}
zimulala marked this conversation as resolved.
Show resolved Hide resolved

ver, err = updateVersionAndTableInfo(t, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
}
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
return ver, nil
}

func checkTableNotExists(d *ddlCtx, t *meta.Meta, schemaID int64, tableName string) error {
// d.infoHandle maybe nil in some test.
if d.infoHandle == nil || !d.infoHandle.IsValid() {
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,5 @@ require (
)

go 1.13

replace github.com/pingcap/parser => github.com/crazycs520/parser v0.0.0-20191008021851-29cebd7cfd64
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ github.com/coreos/go-systemd v0.0.0-20181031085051-9002847aa142/go.mod h1:F5haX7
github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f h1:lBNOc5arjvs8E5mO2tbpBpLoyyu8B6e44T7hJy6potg=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/crazycs520/parser v0.0.0-20191008021851-29cebd7cfd64 h1:oNwnqu/+pA9xgAaQgcmPo5uNGjZEssFRJ+HEUuZJjww=
github.com/crazycs520/parser v0.0.0-20191008021851-29cebd7cfd64/go.mod h1:xLjI+gnWYexq011WPMEvCNS8rFM9qe1vdojIEzSKPuc=
github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 h1:iwZdTE0PVqJCos1vaoKsclOGD3ADKpshg3SRtYBbwso=
github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548/go.mod h1:e6NPNENfs9mPDVNRekM7lKScauxd5kXTr1Mfyig6TDM=
github.com/cznic/sortutil v0.0.0-20150617083342-4c7342852e65 h1:hxuZop6tSoOi0sxFzoGGYdRqNrPubyaIf9KoBG9tPiE=
Expand Down
1 change: 1 addition & 0 deletions infoschema/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,3 +421,4 @@ func HasAutoIncrementColumn(tbInfo *model.TableInfo) (bool, string) {
}
return false, ""
}

79 changes: 79 additions & 0 deletions server/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,10 @@ type dbTableHandler struct {
*tikvHandlerTool
}

type flashReplicaHandler struct {
*tikvHandlerTool
}

// regionHandler is the common field for http handler. It contains
// some common functions for all handlers.
type regionHandler struct {
Expand Down Expand Up @@ -667,6 +671,81 @@ func (h binlogRecover) ServeHTTP(w http.ResponseWriter, req *http.Request) {
binloginfo.DisableSkipBinlogFlag()
}

type tableFlashReplicaInfo struct {
ID int64 `json:"id"`
zimulala marked this conversation as resolved.
Show resolved Hide resolved
ReplicaCount uint64 `json:"replica_count"`
LocationLabels []string `json:"location_labels"`
Status bool `json:"status"`
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
}

func (h flashReplicaHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if req.Method == http.MethodPost {
h.handleStatusReport(w, req)
return
}
schema, err := h.schema()
if err != nil {
writeError(w, err)
return
}
replicaInfos := make([]*tableFlashReplicaInfo, 0)
allDBs := schema.AllSchemas()
for _, db := range allDBs {
tables := schema.SchemaTables(db.Name)
for _, tbl := range tables {
tblInfo := tbl.Meta()
if tblInfo.FlashReplica == nil || tblInfo.FlashReplica.Count == 0 {
continue
}
replicaInfos = append(replicaInfos, &tableFlashReplicaInfo{
ID: tblInfo.ID,
ReplicaCount: tblInfo.FlashReplica.Count,
LocationLabels: tblInfo.FlashReplica.LocationLabels,
Status: tblInfo.FlashReplica.Available,
})
}
}
writeData(w, replicaInfos)
}

type tableFlashReplicaStatus struct {
ID int64 `json:"id"`
zimulala marked this conversation as resolved.
Show resolved Hide resolved
RegionCount uint64 `json:"region_count"`
FlashRegionCount uint64 `json:"flash_region_count"`
}

// checkTableFlashReplicaAvailable uses to check the available status of table flash replica.
func (tf *tableFlashReplicaStatus) checkTableFlashReplicaAvailable() bool {
return tf.FlashRegionCount == tf.RegionCount
}

func (h flashReplicaHandler) handleStatusReport(w http.ResponseWriter, req *http.Request) {
var status tableFlashReplicaStatus
err := json.NewDecoder(req.Body).Decode(&status)
if err != nil {
writeError(w, err)
return
}
do, err := session.GetDomain(h.Store.(kv.Storage))
if err != nil {
writeError(w, err)
return
}
s, err := session.CreateSession(h.Store.(kv.Storage))
if err != nil {
writeError(w, err)
return
}
err = do.DDL().UpdateTableReplicaInfo(s, status.ID, status.checkTableFlashReplicaAvailable())
if err != nil {
writeError(w, err)
}
logutil.BgLogger().Info("handle flash replica report", zap.Int64("table ID", status.ID), zap.Uint64("region count",
status.RegionCount),
zap.Uint64("flash region count", status.FlashRegionCount),
zap.Error(err))
}

// ServeHTTP handles request of list a database or table's schemas.
func (h schemaHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
schema, err := h.schema()
Expand Down
60 changes: 60 additions & 0 deletions server/http_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package server

import (
"bytes"
"database/sql"
"encoding/base64"
"encoding/json"
Expand All @@ -23,6 +24,7 @@ import (
"net/http"
"net/url"
"sort"
"strings"
"sync/atomic"
"time"

Expand Down Expand Up @@ -393,6 +395,64 @@ func (ts *HTTPHandlerTestSuite) TestGetMVCCNotFound(c *C) {
c.Assert(data.Value.Info.Values, IsNil)
}

func (ts *HTTPHandlerTestSuite) TestFlashReplica(c *C) {
ts.startServer(c)
ts.prepareData(c)
defer ts.stopServer(c)
resp, err := http.Get(fmt.Sprintf("http://127.0.0.1:10090/flash/replica"))
c.Assert(err, IsNil)
decoder := json.NewDecoder(resp.Body)
var data []tableFlashReplicaInfo
err = decoder.Decode(&data)
c.Assert(err, IsNil)
c.Assert(len(data), Equals, 0)

db, err := sql.Open("mysql", getDSN())
c.Assert(err, IsNil, Commentf("Error connecting"))
defer db.Close()
dbt := &DBTest{c, db}

dbt.mustExec("use tidb")
dbt.mustExec("alter table test set flash replica 2 location labels 'a','b';")

resp, err = http.Get(fmt.Sprintf("http://127.0.0.1:10090/flash/replica"))
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
c.Assert(err, IsNil)
decoder = json.NewDecoder(resp.Body)
err = decoder.Decode(&data)
c.Assert(err, IsNil)
c.Assert(len(data), Equals, 1)
c.Assert(data[0].ReplicaCount, Equals, uint64(2))
c.Assert(strings.Join(data[0].LocationLabels, ","), Equals, "a,b")
c.Assert(data[0].Status, Equals, false)

resp, err = http.Post("http://127.0.0.1:10090/flash/replica", "application/json", bytes.NewBuffer([]byte(`{"id":84,"region_count":3,"flash_region_count":3}`)))
c.Assert(err, IsNil)
c.Assert(resp, NotNil)
body, err := ioutil.ReadAll(resp.Body)
c.Assert(err, IsNil)
c.Assert(string(body), Equals, "[schema:1146]Table which ID = 84 does not exist.")

t, err := ts.domain.InfoSchema().TableByName(model.NewCIStr("tidb"), model.NewCIStr("test"))
c.Assert(err, IsNil)
req := fmt.Sprintf(`{"id":%d,"region_count":3,"flash_region_count":3}`, t.Meta().ID)
resp, err = http.Post("http://127.0.0.1:10090/flash/replica", "application/json", bytes.NewBuffer([]byte(req)))
c.Assert(err, IsNil)
c.Assert(resp, NotNil)
body, err = ioutil.ReadAll(resp.Body)
c.Assert(err, IsNil)
c.Assert(string(body), Equals, "")

resp, err = http.Get(fmt.Sprintf("http://127.0.0.1:10090/flash/replica"))
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
c.Assert(err, IsNil)
decoder = json.NewDecoder(resp.Body)
err = decoder.Decode(&data)
c.Assert(err, IsNil)
c.Assert(len(data), Equals, 1)
c.Assert(data[0].ReplicaCount, Equals, uint64(2))
c.Assert(strings.Join(data[0].LocationLabels, ","), Equals, "a,b")
c.Assert(data[0].Status, Equals, true) // The status should be true now.
}

func (ts *HTTPHandlerTestSuite) TestDecodeColumnValue(c *C) {
ts.startServer(c)
ts.prepareData(c)
Expand Down
2 changes: 2 additions & 0 deletions server/http_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ func (s *Server) startHTTPServer() {
router.Handle("/info/all", allServerInfoHandler{tikvHandlerTool}).Name("InfoALL")
// HTTP path for get db and table info that is related to the tableID.
router.Handle("/db-table/{tableID}", dbTableHandler{tikvHandlerTool})
// HTTP path for get table flash replica info.
router.Handle("/flash/replica", flashReplicaHandler{tikvHandlerTool})

if s.cfg.Store == "tikv" {
// HTTP path for tikv.
Expand Down