Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

diff: can use multiple columns to split chunks #197

Merged
merged 6 commits into from
Feb 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
242 changes: 218 additions & 24 deletions pkg/dbutil/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb-tools/pkg/utils"
"github.com/pingcap/tidb/types"
log "github.com/sirupsen/logrus"
)

Expand All @@ -37,6 +39,9 @@ const (
var (
// ErrVersionNotFound means can't get the database's version
ErrVersionNotFound = errors.New("can't get the database's version")

// ErrNoData means no data in table
ErrNoData = errors.New("no data found")
)

// DBConfig is database configuration.
Expand Down Expand Up @@ -164,48 +169,101 @@ func GetRowCount(ctx context.Context, db *sql.DB, schemaName string, tableName s
return cnt.Int64, nil
}

// GetRandomValues returns some random value of a column.
func GetRandomValues(ctx context.Context, db *sql.DB, schemaName, table, column string, num int64, min, max interface{}, limitRange string, collation string) ([]interface{}, error) {
// GetRandomValues returns some random value and these value's count of a column, just like sampling. Tips: limitArgs is the value in limitRange.
func GetRandomValues(ctx context.Context, db *sql.DB, schemaName, table, column string, num int, limitRange string, limitArgs []interface{}, collation string) ([]string, []int, error) {
/*
example:
mysql> SELECT `id` FROM (SELECT `id` FROM `test`.`test` WHERE `id` COLLATE "latin1_bin" > 0 AND `id` COLLATE "latin1_bin" < 100 AND true ORDER BY RAND() LIMIT 3)rand_tmp ORDER BY `id` COLLATE "latin1_bin";
+----------+
| rand_tmp |
+----------+
| 15 |
| 58 |
| 67 |
+----------+
mysql> SELECT `id`, COUNT(*) count FROM (SELECT `id` FROM `test`.`test` WHERE `id` COLLATE "latin1_bin" > 0 AND `id` COLLATE "latin1_bin" < 100 ORDER BY RAND() LIMIT 5) rand_tmp GROUP BY `id` ORDER BY `id` COLLATE "latin1_bin";
+------+-------+
| id | count |
+------+-------+
| 1 | 2 |
| 2 | 2 |
| 3 | 1 |
+------+-------+

FIXME: TiDB now don't return rand value when use `ORDER BY RAND()`
*/

if limitRange != "" {
limitRange = "true"
if limitRange == "" {
limitRange = "TRUE"
}

if collation != "" {
collation = fmt.Sprintf(" COLLATE \"%s\"", collation)
}

randomValue := make([]interface{}, 0, num)
query := fmt.Sprintf("SELECT `%s` FROM (SELECT `%s` FROM `%s`.`%s` WHERE `%s`%s > ? AND `%s`%s < ? AND %s ORDER BY RAND() LIMIT %d)rand_tmp ORDER BY `%s`%s",
column, column, schemaName, table, column, collation, column, collation, limitRange, num, column, collation)
log.Debugf("get random values sql: %s, min: %v, max: %v", query, min, max)
rows, err := db.QueryContext(ctx, query, min, max)
randomValue := make([]string, 0, num)
valueCount := make([]int, 0, num)

query := fmt.Sprintf("SELECT %[1]s, COUNT(*) count FROM (SELECT %[1]s FROM %[2]s WHERE %[3]s ORDER BY RAND() LIMIT %[4]d)rand_tmp GROUP BY %[1]s ORDER BY %[1]s%[5]s",
escapeName(column), TableName(schemaName, table), limitRange, num, collation)
log.Debugf("get random values sql: %s, args: %v", query, limitArgs)

rows, err := db.QueryContext(ctx, query, limitArgs...)
if err != nil {
return nil, errors.Trace(err)
return nil, nil, errors.Trace(err)
}
defer rows.Close()

for rows.Next() {
var value interface{}
err = rows.Scan(&value)
var value string
var count int
err = rows.Scan(&value, &count)
if err != nil {
return nil, errors.Trace(err)
return nil, nil, errors.Trace(err)
}
randomValue = append(randomValue, value)
valueCount = append(valueCount, count)
}

return randomValue, nil
return randomValue, valueCount, errors.Trace(rows.Err())
}

// GetMinMaxValue return min and max value of given column by specified limitRange condition.
func GetMinMaxValue(ctx context.Context, db *sql.DB, schema, table, column string, limitRange string, limitArgs []interface{}, collation string) (string, string, error) {
/*
example:
mysql> SELECT MIN(`id`) as MIN, MAX(`id`) as MAX FROM `test`.`testa` WHERE id > 0 AND id < 10;
+------+------+
| MIN | MAX |
+------+------+
| 1 | 2 |
+------+------+
*/

if limitRange == "" {
limitRange = "TRUE"
}

if collation != "" {
collation = fmt.Sprintf(" COLLATE \"%s\"", collation)
}

query := fmt.Sprintf("SELECT /*!40001 SQL_NO_CACHE */ MIN(`%s`%s) as MIN, MAX(`%s`%s) as MAX FROM `%s`.`%s` WHERE %s",
column, collation, column, collation, schema, table, limitRange)
log.Debugf("GetMinMaxValue query: %v, args: %v", query, limitArgs)

var min, max sql.NullString
rows, err := db.QueryContext(ctx, query, limitArgs...)
if err != nil {
return "", "", errors.Trace(err)
}
defer rows.Close()

for rows.Next() {
err = rows.Scan(&min, &max)
if err != nil {
return "", "", errors.Trace(err)
}
}

if !min.Valid || !max.Valid {
// don't have any data
return "", "", ErrNoData
}

return min.String, max.String, errors.Trace(rows.Err())
}

// GetTables returns name of all tables in the specified schema
Expand Down Expand Up @@ -302,8 +360,8 @@ func GetCRC32Checksum(ctx context.Context, db *sql.DB, schemaName, tableName str
columnIsNull = append(columnIsNull, fmt.Sprintf("ISNULL(`%s`)", col.Name.O))
}

query := fmt.Sprintf("SELECT BIT_XOR(CAST(CRC32(CONCAT_WS(',', %s, CONCAT(%s)))AS UNSIGNED)) AS checksum FROM `%s`.`%s` WHERE %s;",
strings.Join(columnNames, ", "), strings.Join(columnIsNull, ", "), schemaName, tableName, limitRange)
query := fmt.Sprintf("SELECT BIT_XOR(CAST(CRC32(CONCAT_WS(',', %s, CONCAT(%s)))AS UNSIGNED)) AS checksum FROM %s WHERE %s;",
strings.Join(columnNames, ", "), strings.Join(columnIsNull, ", "), TableName(schemaName, tableName), limitRange)
log.Debugf("checksum sql: %s, args: %v", query, args)

var checksum sql.NullInt64
Expand All @@ -320,6 +378,130 @@ func GetCRC32Checksum(ctx context.Context, db *sql.DB, schemaName, tableName str
return checksum.Int64, nil
}

// Bucket saves the bucket information from TiDB.
type Bucket struct {
Count int64
LowerBound string
UpperBound string
}

// GetBucketsInfo SHOW STATS_BUCKETS in TiDB.
func GetBucketsInfo(ctx context.Context, db *sql.DB, schema, table string, tableInfo *model.TableInfo) (map[string][]Bucket, error) {
/*
example in tidb:
mysql> SHOW STATS_BUCKETS WHERE db_name= "test" AND table_name="testa";
+---------+------------+----------------+-------------+----------+-----------+-------+---------+---------------------+---------------------+
| Db_name | Table_name | Partition_name | Column_name | Is_index | Bucket_id | Count | Repeats | Lower_Bound | Upper_Bound |
+---------+------------+----------------+-------------+----------+-----------+-------+---------+---------------------+---------------------+
| test | testa | | PRIMARY | 1 | 0 | 64 | 1 | 1846693550524203008 | 1846838686059069440 |
| test | testa | | PRIMARY | 1 | 1 | 128 | 1 | 1846840885082324992 | 1847056389361369088 |
+---------+------------+----------------+-------------+----------+-----------+-------+---------+---------------------+---------------------+
*/
buckets := make(map[string][]Bucket)
query := "SHOW STATS_BUCKETS WHERE db_name= ? AND table_name= ?;"
log.Debugf("GetBucketsInfo query: %s", query)

rows, err := db.QueryContext(ctx, query, schema, table)
if err != nil {
return nil, errors.Trace(err)
}
defer rows.Close()

cols, err := rows.Columns()
if err != nil {
return nil, errors.Trace(err)
}

for rows.Next() {
var dbName, tableName, partitionName, columnName, lowerBound, upperBound sql.NullString
var isIndex, bucketID, count, repeats sql.NullInt64

// add partiton_name in new version
switch len(cols) {
case 9:
err = rows.Scan(&dbName, &tableName, &columnName, &isIndex, &bucketID, &count, &repeats, &lowerBound, &upperBound)
case 10:
err = rows.Scan(&dbName, &tableName, &partitionName, &columnName, &isIndex, &bucketID, &count, &repeats, &lowerBound, &upperBound)
default:
return nil, errors.New("Unknown struct for buckets info")
}
if err != nil {
return nil, errors.Trace(err)
}

if _, ok := buckets[columnName.String]; !ok {
buckets[columnName.String] = make([]Bucket, 0, 100)
}
buckets[columnName.String] = append(buckets[columnName.String], Bucket{
Count: count.Int64,
LowerBound: lowerBound.String,
UpperBound: upperBound.String,
})
}

// when primary key is int type, the columnName will be column's name, not `PRIMARY`, check and transform here.
indices := FindAllIndex(tableInfo)
for _, index := range indices {
if index.Name.O != "PRIMARY" {
continue
}
_, ok := buckets[index.Name.O]
if !ok && len(index.Columns) == 1 {
if _, ok := buckets[index.Columns[0].Name.O]; !ok {
return nil, errors.NotFoundf("primary key on %s in buckets info", index.Columns[0].Name.O)
}
buckets[index.Name.O] = buckets[index.Columns[0].Name.O]
delete(buckets, index.Columns[0].Name.O)
}
}

return buckets, errors.Trace(rows.Err())
}

// AnalyzeValuesFromBuckets analyze upperBound or lowerBound to string for each column.
// upperBound and lowerBound are looks like '(123, abc)' for multiple fields, or '123' for one field.
func AnalyzeValuesFromBuckets(valueString string, cols []*model.ColumnInfo) ([]string, error) {
// FIXME: maybe some values contains '(', ')' or ', '
vStr := strings.Trim(valueString, "()")
values := strings.Split(vStr, ", ")
if len(values) != len(cols) {
return nil, errors.Errorf("analyze value %s failed", valueString)
}

for i, col := range cols {
if IsTimeTypeAndNeedDecode(col.Tp) {
value, err := DecodeTimeInBucket(values[i])
if err != nil {
return nil, errors.Trace(err)
}

values[i] = value
}
}

return values, nil
}

// DecodeTimeInBucket decodes Time from a packed uint64 value.
func DecodeTimeInBucket(packedStr string) (string, error) {
packed, err := strconv.ParseUint(packedStr, 10, 64)
if err != nil {
return "", err
}

if packed == 0 {
return "", nil
}

t := new(types.Time)
err = t.FromPackedUint(packed)
if err != nil {
return "", err
}

return t.String(), nil
}

// GetTidbLatestTSO returns tidb's current TSO.
func GetTidbLatestTSO(ctx context.Context, db *sql.DB) (int64, error) {
/*
Expand Down Expand Up @@ -421,3 +603,15 @@ func TableName(schema, table string) string {
func escapeName(name string) string {
return strings.Replace(name, "`", "``", -1)
}

// ReplacePlaceholder will use args to replace '?', used for log.
// tips: make sure the num of "?" is same with len(args)
func ReplacePlaceholder(str string, args []string) string {
/*
for example:
str is "a > ? AND a < ?", args is {'1', '2'},
this function will return "a > '1' AND a < '2'"
*/
newStr := strings.Replace(str, "?", "'%s'", -1)
return fmt.Sprintf(newStr, utils.StringsToInterfaces(args)...)
}
71 changes: 71 additions & 0 deletions pkg/dbutil/common_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package dbutil

import (
. "github.com/pingcap/check"
)

func (*testDBSuite) TestReplacePlaceholder(c *C) {
testCases := []struct {
originStr string
args []string
expectStr string
}{
{
"a > ? AND a < ?",
[]string{"1", "2"},
"a > '1' AND a < '2'",
}, {
"a = ? AND b = ?",
[]string{"1", "2"},
"a = '1' AND b = '2'",
},
}

for _, testCase := range testCases {
str := ReplacePlaceholder(testCase.originStr, testCase.args)
c.Assert(str, Equals, testCase.expectStr)
}

}

func (*testDBSuite) TestTableName(c *C) {
testCases := []struct {
schema string
table string
expectTableName string
}{
{
"test",
"testa",
"`test`.`testa`",
},
{
"test-1",
"test-a",
"`test-1`.`test-a`",
},
{
"test",
"t`esta",
"`test`.`t``esta`",
},
}

for _, testCase := range testCases {
tableName := TableName(testCase.schema, testCase.table)
c.Assert(tableName, Equals, testCase.expectTableName)
}
}
Loading