Skip to content

Commit

Permalink
update pkg about database (pingcap#142)
Browse files Browse the repository at this point in the history
  • Loading branch information
WangXiangUSTC authored and WangXiangUSTC committed Feb 11, 2019
1 parent db3a058 commit e819b7f
Show file tree
Hide file tree
Showing 8 changed files with 437 additions and 9 deletions.
192 changes: 190 additions & 2 deletions pkg/dbutil/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb-tools/pkg/utils"
"github.com/pingcap/tidb/types"
log "github.com/sirupsen/logrus"
)

Expand All @@ -37,6 +39,9 @@ const (
var (
// ErrVersionNotFound means can't get the database's version
ErrVersionNotFound = errors.New("can't get the database's version")

// ErrNoData means no data in table
ErrNoData = errors.New("no data found")
)

// DBConfig is database configuration.
Expand Down Expand Up @@ -208,6 +213,52 @@ func GetRandomValues(ctx context.Context, db *sql.DB, schemaName, table, column
return randomValue, nil
}

// GetMinMaxValue return min and max value of given column by specified limitRange condition.
func GetMinMaxValue(ctx context.Context, db *sql.DB, schema, table, column string, limitRange string, collation string, args []interface{}) (string, string, error) {
/*
example:
mysql> SELECT MIN(`id`) as MIN, MAX(`id`) as MAX FROM `test`.`testa` WHERE id > 0 AND id < 10;
+------+------+
| MIN | MAX |
+------+------+
| 1 | 2 |
+------+------+
*/

if limitRange == "" {
limitRange = "true"
}

if collation != "" {
collation = fmt.Sprintf(" COLLATE \"%s\"", collation)
}

query := fmt.Sprintf("SELECT /*!40001 SQL_NO_CACHE */ MIN(`%s`%s) as MIN, MAX(`%s`%s) as MAX FROM `%s`.`%s` WHERE %s",
column, collation, column, collation, schema, table, limitRange)
log.Debugf("GetMinMaxValue query: %v, args: %v", query, args)

var min, max sql.NullString
rows, err := db.QueryContext(ctx, query, args...)
if err != nil {
return "", "", errors.Trace(err)
}
defer rows.Close()

for rows.Next() {
err = rows.Scan(&min, &max)
if err != nil {
return "", "", errors.Trace(err)
}
}

if !min.Valid || !max.Valid {
// don't have any data
return "", "", ErrNoData
}

return min.String, max.String, errors.Trace(rows.Err())
}

// GetTables returns name of all tables in the specified schema
func GetTables(ctx context.Context, db *sql.DB, schemaName string) (tables []string, err error) {
/*
Expand Down Expand Up @@ -302,8 +353,8 @@ func GetCRC32Checksum(ctx context.Context, db *sql.DB, schemaName, tableName str
columnIsNull = append(columnIsNull, fmt.Sprintf("ISNULL(`%s`)", col.Name.O))
}

query := fmt.Sprintf("SELECT BIT_XOR(CAST(CRC32(CONCAT_WS(',', %s, CONCAT(%s)))AS UNSIGNED)) AS checksum FROM `%s`.`%s` WHERE %s;",
strings.Join(columnNames, ", "), strings.Join(columnIsNull, ", "), schemaName, tableName, limitRange)
query := fmt.Sprintf("SELECT BIT_XOR(CAST(CRC32(CONCAT_WS(',', %s, CONCAT(%s)))AS UNSIGNED)) AS checksum FROM %s WHERE %s;",
strings.Join(columnNames, ", "), strings.Join(columnIsNull, ", "), TableName(schemaName, tableName), limitRange)
log.Debugf("checksum sql: %s, args: %v", query, args)

var checksum sql.NullInt64
Expand All @@ -320,6 +371,130 @@ func GetCRC32Checksum(ctx context.Context, db *sql.DB, schemaName, tableName str
return checksum.Int64, nil
}

// Bucket saves the bucket information from TiDB.
type Bucket struct {
Count int64
LowerBound string
UpperBound string
}

// GetBucketsInfo SHOW STATS_BUCKETS in TiDB.
func GetBucketsInfo(ctx context.Context, db *sql.DB, schema, table string, tableInfo *model.TableInfo) (map[string][]Bucket, error) {
/*
example in tidb:
mysql> SHOW STATS_BUCKETS WHERE db_name= "test" AND table_name="testa";
+---------+------------+----------------+-------------+----------+-----------+-------+---------+---------------------+---------------------+
| Db_name | Table_name | Partition_name | Column_name | Is_index | Bucket_id | Count | Repeats | Lower_Bound | Upper_Bound |
+---------+------------+----------------+-------------+----------+-----------+-------+---------+---------------------+---------------------+
| test | testa | | PRIMARY | 1 | 0 | 64 | 1 | 1846693550524203008 | 1846838686059069440 |
| test | testa | | PRIMARY | 1 | 1 | 128 | 1 | 1846840885082324992 | 1847056389361369088 |
+---------+------------+----------------+-------------+----------+-----------+-------+---------+---------------------+---------------------+
*/
buckets := make(map[string][]Bucket)
query := "SHOW STATS_BUCKETS WHERE db_name= ? AND table_name= ?;"
log.Debugf("GetBucketsInfo query: %s", query)

rows, err := db.QueryContext(ctx, query, schema, table)
if err != nil {
return nil, errors.Trace(err)
}
defer rows.Close()

cols, err := rows.Columns()
if err != nil {
return nil, errors.Trace(err)
}

for rows.Next() {
var dbName, tableName, partitionName, columnName, lowerBound, upperBound sql.NullString
var isIndex, bucketID, count, repeats sql.NullInt64

// add partiton_name in new version
switch len(cols) {
case 9:
err = rows.Scan(&dbName, &tableName, &columnName, &isIndex, &bucketID, &count, &repeats, &lowerBound, &upperBound)
case 10:
err = rows.Scan(&dbName, &tableName, &partitionName, &columnName, &isIndex, &bucketID, &count, &repeats, &lowerBound, &upperBound)
default:
return nil, errors.New("Unknown struct for buckets info")
}
if err != nil {
return nil, errors.Trace(err)
}

if _, ok := buckets[columnName.String]; !ok {
buckets[columnName.String] = make([]Bucket, 0, 100)
}
buckets[columnName.String] = append(buckets[columnName.String], Bucket{
Count: count.Int64,
LowerBound: lowerBound.String,
UpperBound: upperBound.String,
})
}

// when primary key is int type, the columnName will be column's name, not `PRIMARY`, check and transform here.
indices := FindAllIndex(tableInfo)
for _, index := range indices {
if index.Name.O != "PRIMARY" {
continue
}
_, ok := buckets[index.Name.O]
if !ok && len(index.Columns) == 1 {
if _, ok := buckets[index.Columns[0].Name.O]; !ok {
return nil, errors.NotFoundf("primary key on %s in buckets info", index.Columns[0].Name.O)
}
buckets[index.Name.O] = buckets[index.Columns[0].Name.O]
delete(buckets, index.Columns[0].Name.O)
}
}

return buckets, errors.Trace(rows.Err())
}

// AnalyzeValuesFromBuckets analyze upperBound or lowerBound to string for each column.
// upperBound and lowerBound are looks like '(123, abc)' for multiple fields, or '123' for one field.
func AnalyzeValuesFromBuckets(valueString string, cols []*model.ColumnInfo) ([]string, error) {
// FIXME: maybe some values contains '(', ')' or ', '
vStr := strings.Trim(valueString, "()")
values := strings.Split(vStr, ", ")
if len(values) != len(cols) {
return nil, errors.Errorf("analyze value %s failed", valueString)
}

for i, col := range cols {
if IsTimeTypeAndNeedDecode(col.Tp) {
value, err := DecodeTimeInBucket(values[i])
if err != nil {
return nil, errors.Trace(err)
}

values[i] = value
}
}

return values, nil
}

// DecodeTimeInBucket decodes Time from a packed uint64 value.
func DecodeTimeInBucket(packedStr string) (string, error) {
packed, err := strconv.ParseUint(packedStr, 10, 64)
if err != nil {
return "", err
}

if packed == 0 {
return "", nil
}

t := new(types.Time)
err = t.FromPackedUint(packed)
if err != nil {
return "", err
}

return t.String(), nil
}

// GetTidbLatestTSO returns tidb's current TSO.
func GetTidbLatestTSO(ctx context.Context, db *sql.DB) (int64, error) {
/*
Expand Down Expand Up @@ -421,3 +596,16 @@ func TableName(schema, table string) string {
func escapeName(name string) string {
return strings.Replace(name, "`", "``", -1)
}

// ReplacePlaceholder will use args to replace '?', used for log.
// tips: make sure the num of "?" is same with len(args)
func ReplacePlaceholder(str string, args []string) string {
/*
for example:
str is "a > ? AND a < ?", args is {'1', '2'},
this function will return "a > '1' AND a < '2'"
*/
newStr := strings.Replace(str, "?", "'%s'", -1)
return fmt.Sprintf(newStr, utils.StringsToInterfaces(args)...)

}
71 changes: 71 additions & 0 deletions pkg/dbutil/common_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package dbutil

import (
. "github.com/pingcap/check"
)

func (*testDBSuite) TestReplacePlaceholder(c *C) {
testCases := []struct {
originStr string
args []string
expectStr string
}{
{
"a > ? AND a < ?",
[]string{"1", "2"},
"a > '1' AND a < '2'",
}, {
"a = ? AND b = ?",
[]string{"1", "2"},
"a = '1' AND b = '2'",
},
}

for _, testCase := range testCases {
str := ReplacePlaceholder(testCase.originStr, testCase.args)
c.Assert(str, Equals, testCase.expectStr)
}

}

func (*testDBSuite) TestTableName(c *C) {
testCases := []struct {
schema string
table string
expectTableName string
}{
{
"test",
"testa",
"`test`.`testa`",
},
{
"test-1",
"test-a",
"`test-1`.`test-a`",
},
{
"test",
"t`esta",
"`test`.`t``esta`",
},
}

for _, testCase := range testCases {
tableName := TableName(testCase.schema, testCase.table)
c.Assert(tableName, Equals, testCase.expectTableName)
}
}
48 changes: 46 additions & 2 deletions pkg/dbutil/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"fmt"
"sort"
"strconv"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -67,12 +68,12 @@ func ShowIndex(ctx context.Context, db *sql.DB, schemaName string, table string)
return indices, nil
}

// FindSuitableIndex returns first column of a suitable index.
// FindSuitableColumnWithIndex returns first column of a suitable index.
// The priority is
// * primary key
// * unique key
// * normal index which has max cardinality
func FindSuitableIndex(ctx context.Context, db *sql.DB, schemaName string, tableInfo *model.TableInfo) (*model.ColumnInfo, error) {
func FindSuitableColumnWithIndex(ctx context.Context, db *sql.DB, schemaName string, tableInfo *model.TableInfo) (*model.ColumnInfo, error) {
// find primary key
for _, index := range tableInfo.Indices {
if index.Primary {
Expand Down Expand Up @@ -113,6 +114,49 @@ func FindSuitableIndex(ctx context.Context, db *sql.DB, schemaName string, table
return c, nil
}

// FindAllIndex returns all index, order is pk, uk, and normal index.
func FindAllIndex(tableInfo *model.TableInfo) []*model.IndexInfo {
indices := make([]*model.IndexInfo, len(tableInfo.Indices))
copy(indices, tableInfo.Indices)
sort.SliceStable(indices, func(i, j int) bool {
a := indices[i]
b := indices[j]
switch {
case b.Primary:
return false
case a.Primary:
return true
case b.Unique:
return false
case a.Unique:
return true
default:
return false
}
})
return indices
}

// FindAllColumnWithIndex returns columns with index, order is pk, uk and normal index.
func FindAllColumnWithIndex(tableInfo *model.TableInfo) []*model.ColumnInfo {
colsMap := make(map[string]interface{})
cols := make([]*model.ColumnInfo, 0, 2)

for _, index := range FindAllIndex(tableInfo) {
// index will be guaranteed to be visited in order PK -> UK -> IK
for _, indexCol := range index.Columns {
col := FindColumnByName(tableInfo.Columns, indexCol.Name.O)
if _, ok := colsMap[col.Name.O]; ok {
continue
}
colsMap[col.Name.O] = struct{}{}
cols = append(cols, col)
}
}

return cols
}

// SelectUniqueOrderKey returns some columns for order by condition.
func SelectUniqueOrderKey(tbInfo *model.TableInfo) ([]string, []*model.ColumnInfo) {
keys := make([]string, 0, 2)
Expand Down
Loading

0 comments on commit e819b7f

Please sign in to comment.