Skip to content

Commit

Permalink
Merge pull request pingcap#5 from IggieWang/newidxadv
Browse files Browse the repository at this point in the history
new framework
  • Loading branch information
zouhuan1215 authored Aug 12, 2019
2 parents d9ae3a2 + 18f02a3 commit 48f6c86
Show file tree
Hide file tree
Showing 6 changed files with 279 additions and 237 deletions.
203 changes: 52 additions & 151 deletions idxadvisor/idxadvisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type idxAdvPool []*IdxAdvisor
const queryChanSize int = 10000

var registeredIdxAdv = make(map[uint64]*IdxAdvisor)

var idxadvPool idxAdvPool = make(idxAdvPool, 0)

func (iap *idxAdvPool) push(ia *IdxAdvisor) {
Expand Down Expand Up @@ -65,8 +66,6 @@ func (ci CandidateIdxes) Less(i, j int) bool { return ci[i].Benefit > ci[j].Bene
func (ci CandidateIdxes) Swap(i, j int) { ci[i], ci[j] = ci[j], ci[i] }

// NewIdxAdv create a new IdxAdvisor.
// TODO: *sql.DB is not supposed to a member of IdxAdvisor.
// *sql.DB can interacts with idxadvisor by session variable
func NewIdxAdv(db *sql.DB) *IdxAdvisor {
ia := &IdxAdvisor{dbClient: db}
ia.ready.Store(false)
Expand All @@ -75,14 +74,15 @@ func NewIdxAdv(db *sql.DB) *IdxAdvisor {
return ia
}

// MockNewIdxAdv() return *IdxAdvisor without initiating dbClient member
// MockNewIdxAdv return *IdxAdvisor without initiating dbClient member
func MockNewIdxAdv() *IdxAdvisor {
ia := &IdxAdvisor{}
ia.ready.Store(false)
idxadvPool.push(ia)
return ia
}

// GetIdxAdv returns a IdxAdvisor according to connID.
func GetIdxAdv(connID uint64) *IdxAdvisor {
if ia, ok := registeredIdxAdv[connID]; ok {
return ia
Expand All @@ -96,45 +96,6 @@ func GetIdxAdv(connID uint64) *IdxAdvisor {
}
}

//// GetRecommendIdx return recommend index within given connection
//func GetRecommendIdx(connID uint64) (*CandidateIdxes, error) {
// if ia, ok := registeredIdxAdv[connID]; !ok {
// return nil, errors.New(fmt.Sprintf(
// "bad attempt to get recommend index with no registered index advisor. connID: %v\n", connID))
// } else {
// return &ia.Candidate_idx, nil
// }
//}

// GetRecommendIdxStr return recommend index in string format. Used in test
func GetRecommendIdxStr(connID uint64) (string, error) {
if ia, ok := registeredIdxAdv[connID]; !ok {
return "", errors.New(fmt.Sprintf(
"bad attempt to get recommend index with no registered index advisor. connID: %v\n", connID))
} else {
idxes := ia.Candidate_idx
if len(idxes) == 0 {
return "", nil
}

var idxesStr string
for _, idx := range idxes {
var idxStr string
idxStr = fmt.Sprintf("%s: (", idx.Index.Index.Table.L)
cols := idx.Index.Index.Columns
colLen := len(cols)

for i := 0; i < len(cols)-1; i++ {
idxStr = fmt.Sprintf("%s%s ", idxStr, cols[i].Name.L)
}

idxStr = fmt.Sprintf("%s%s)", idxStr, cols[colLen-1].Name.L)
idxesStr = fmt.Sprintf("%s%s,", idxesStr, idxStr)
}
return idxesStr[:len(idxesStr)-1], nil
}
}

// Init set session variable tidb_enable_index_advisor = true
func (ia *IdxAdvisor) Init() error {
ia.queryChan = make(chan string, queryChanSize)
Expand Down Expand Up @@ -168,7 +129,7 @@ func (ia *IdxAdvisor) StartTask(sqlFile string) {
query, ok := <-ia.queryChan
if !ok {
// No more query
WriteFinaleResult()
WriteFinalResult()
return
}
fmt.Printf("**************************************[%v]******************************************\n", cnt)
Expand All @@ -177,93 +138,6 @@ func (ia *IdxAdvisor) StartTask(sqlFile string) {
}
}

//func readQuery(sqlFile *string, queryChan chan string) {
// fd, _ := os.Open(*sqlFile)
// defer func() {
// fd.Close()
// close(queryChan)
// }()
//
// scanner := bufio.NewScanner(fd)
//
// // TODO: more efficient way to extract select statement from file
// maxCap := bufio.MaxScanTokenSize
// buf := make([]byte, maxCap)
// scanner.Buffer(buf, maxCap)
// split := func(data []byte, atEOF bool) (advance int, token []byte, err error) {
// // Define a split function that separates on "--"
// for i := 0; i < len(data)-1; i++ {
// if data[i] == 0x2d && data[i+1] == 0x2d {
// return i + 2, data[:i], nil
//
// }
//
// }
// return 0, data, bufio.ErrFinalToken
// }
// scanner.Split(split)
//
// // Scan
// cnt := 1
// for scanner.Scan() {
// contents := scanner.Text()
// // fmt.Printf("================================[%v]==================================\n", cnt)
// // fmt.Printf("%v\n", contents)
// sqlBegin := strings.Index(string(contents), "select")
// query := contents[sqlBegin : len(contents)-1]
// queryChan <- query
// cnt++
// }
//
// if err := scanner.Err(); err != nil {
// fmt.Fprintln(os.Stderr, "reading input:", err)
// }
//}

func readQuery(sqlFile string, queryChan chan string) {
defer func() {
close(queryChan)
}()

// If readQuery is called in idxadv_test.go, return immediately
if sqlFile == "test-mode" {
return
}

files, err := ioutil.ReadDir(sqlFile)
if err != nil {
panic(err)
}

n := len(files)

for i := 1; i <= n; i++ {
sqlfile := sqlFile + strconv.Itoa(i) + ".sql"

contents, err := ioutil.ReadFile(sqlfile)
if err != nil {
panic(err)
}
// sqlBegin := strings.Index(string(contents), "select")
// query := contents[sqlBegin:]
queryChan <- string(contents)
}
}

/*
// StartTask start handling queries in idxadv mode after session variable tidb_enable_index_advisor has been set
func (ia *IdxAdvisor) StartTask(query string) {
if ia.IsReady() {
// var err error
sqlFile := "/tmp/queries"
queries := readQuery(&sqlFile)
for i, query := range queries {
fmt.Printf("$$$$$$$$$$$$$$$$$$$$$$[%v]$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$\n", i+1)
ia.dbClient.Exec(query)
}
}
}
*/
func GetVirtualInfoschema(is infoschema.InfoSchema, dbName string, tableInfoSets map[string]*plannercore.TableInfoSets) infoschema.InfoSchema {
// Get a copy of InfoSchema
dbInfos := is.Clone()
Expand All @@ -280,7 +154,7 @@ func GetVirtualInfoschema(is infoschema.InfoSchema, dbName string, tableInfoSets
idxInfo := tblCopy.Meta().Indices

// add virtual indexes to InfoSchemaCopy.TblInfo
virtualIndexes := BuildVirtualIndexes(tblInfoCopy, dbname, tblname, tblInfoSets)
virtualIndexes := buildVirtualIndexes(tblInfoCopy, dbname, tblname, tblInfoSets)
for _, virtualIndex := range virtualIndexes {
if !isExistedInTable(virtualIndex, idxInfo) {
tblInfoCopy.Indices = append(tblInfoCopy.Indices, virtualIndex)
Expand All @@ -290,14 +164,14 @@ func GetVirtualInfoschema(is infoschema.InfoSchema, dbName string, tableInfoSets
return ISCopy
}

func BuildVirtualIndexes(tblInfo *model.TableInfo, dbname, tblname model.CIStr, tblInfoSets *plannercore.TableInfoSets) []*model.IndexInfo {
indexes := GenVirtualIndexCols(tblInfo, dbname, tblname, tblInfoSets)
func buildVirtualIndexes(tblInfo *model.TableInfo, dbname, tblname model.CIStr, tblInfoSets *plannercore.TableInfoSets) []*model.IndexInfo {
indexes := genVirtualIndexCols(tblInfo, dbname, tblname, tblInfoSets)
result := make([]*model.IndexInfo, 0)
for i, idxColNames := range indexes {
indexName := model.NewCIStr("vIndex" + string(i))
indexinfo, err := ddl.BuildIndexInfo(tblInfo, indexName, idxColNames, model.StatePublic)
if err != nil {
fmt.Printf("BuildVirtualIndexes error: %v!\n", err)
fmt.Printf("buildVirtualIndexes error: %v!\n", err)
var idxColNameStr string
for _, idxCol := range idxColNames {
idxColNameStr = fmt.Sprintf("%v %v", idxColNameStr, idxCol)
Expand All @@ -310,14 +184,14 @@ func BuildVirtualIndexes(tblInfo *model.TableInfo, dbname, tblname model.CIStr,
return result
}

func GenVirtualIndexCols(tblInfo *model.TableInfo, dbname, tblname model.CIStr, tblInfoSets *plannercore.TableInfoSets) [][]*ast.IndexColName {
func genVirtualIndexCols(tblInfo *model.TableInfo, dbname, tblname model.CIStr, tblInfoSets *plannercore.TableInfoSets) [][]*ast.IndexColName {
columnInfos := tblInfo.Columns
var result [][]*ast.IndexColName

// one column
for _, columnInfo := range columnInfos {
idxCols := make([]*ast.IndexColName, 1, 1)
idxCols[0] = BuildIdxColNameFromColInfo(columnInfo, dbname, tblname)
idxCols[0] = buildIdxColNameFromColInfo(columnInfo, dbname, tblname)
result = append(result, idxCols)
}

Expand All @@ -327,8 +201,8 @@ func GenVirtualIndexCols(tblInfo *model.TableInfo, dbname, tblname model.CIStr,
for j := 0; j < nCols; j++ {
if i != j {
idxTwoCols := make([]*ast.IndexColName, 2, 2)
idxTwoCols[0] = BuildIdxColNameFromColInfo(columnInfos[i], dbname, tblname)
idxTwoCols[1] = BuildIdxColNameFromColInfo(columnInfos[j], dbname, tblname)
idxTwoCols[0] = buildIdxColNameFromColInfo(columnInfos[i], dbname, tblname)
idxTwoCols[1] = buildIdxColNameFromColInfo(columnInfos[j], dbname, tblname)
result = append(result, idxTwoCols)
}
}
Expand Down Expand Up @@ -391,12 +265,11 @@ func GenVirtualIndexCols(tblInfo *model.TableInfo, dbname, tblname model.CIStr,
}
}
if isExisted {
idxCols = append(idxCols, BuildIdxColNameFromColInfo(columnInfo, dbname, tblname))
idxCols = append(idxCols, buildIdxColNameFromColInfo(columnInfo, dbname, tblname))
}
}
result = append(result, idxCols)
}

return result
}

Expand All @@ -412,26 +285,18 @@ func addToCandidateCols(readyCols []model.CIStr, cols *[]model.CIStr, candidateC
}
}

func BuildIdxColNameFromColInfo(colInfo *model.ColumnInfo, dbname, tblname model.CIStr) *ast.IndexColName {
func buildIdxColNameFromColInfo(colInfo *model.ColumnInfo, dbname, tblname model.CIStr) *ast.IndexColName {
idxColName := &ast.IndexColName{}
idxColName.Column = &ast.ColumnName{Schema: dbname, Table: tblname, Name: colInfo.Name}
idxColName.Length = -1
return idxColName
}

func GenIndexCols(index *model.IndexInfo) []model.CIStr {
cols := []model.CIStr{}
for _, idxColumn := range index.Columns {
cols = append(cols, idxColumn.Name)
}
return cols
}

func isExistedInTable(virtualIndex *model.IndexInfo, indices []*model.IndexInfo) bool {
is := false
virtualIndexCols := GenIndexCols(virtualIndex)
virtualIndexCols := genIndexCols(virtualIndex)
for _, idx := range indices {
indexCols := GenIndexCols(idx)
indexCols := genIndexCols(idx)
if reflect.DeepEqual(virtualIndexCols, indexCols) {
is = true
break
Expand All @@ -440,6 +305,14 @@ func isExistedInTable(virtualIndex *model.IndexInfo, indices []*model.IndexInfo)
return is
}

func genIndexCols(index *model.IndexInfo) []model.CIStr {
cols := []model.CIStr{}
for _, idxColumn := range index.Columns {
cols = append(cols, idxColumn.Name)
}
return cols
}

func (ia *IdxAdvisor) addCandidate(virtualIdx *CandidateIdx) {
in := false
for _, candidateIdx := range ia.Candidate_idx {
Expand All @@ -454,3 +327,31 @@ func (ia *IdxAdvisor) addCandidate(virtualIdx *CandidateIdx) {
ia.Candidate_idx = append(ia.Candidate_idx, virtualIdx)
}
}

func readQuery(sqlFile string, queryChan chan string) {
defer func() {
close(queryChan)
}()

// If readQuery is called in idxadv_test.go, return immediately
if sqlFile == "test-mode" {
return
}

files, err := ioutil.ReadDir(sqlFile)
if err != nil {
panic(err)
}

n := len(files)

for i := 1; i <= n; i++ {
sqlfile := sqlFile + strconv.Itoa(i) + ".sql"

contents, err := ioutil.ReadFile(sqlfile)
if err != nil {
panic(err)
}
queryChan <- string(contents)
}
}
Loading

0 comments on commit 48f6c86

Please sign in to comment.