Skip to content

Commit

Permalink
feature: engine plugin and csv dashbase example. (pingcap#5)
Browse files Browse the repository at this point in the history
* first commit
1. plugin add Engine
2. show engine && show table status is work right with plugin
3. csv reader example

* code support OnReaderNext and OnReaderOpen

* plugin support selection
  • Loading branch information
wph95 authored and lfkdsk committed Oct 26, 2019
1 parent 39340ce commit e652752
Show file tree
Hide file tree
Showing 20 changed files with 471 additions and 55 deletions.
26 changes: 26 additions & 0 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"bytes"
"context"
"fmt"
"github.com/pingcap/tidb/plugin"
"strconv"
"strings"
"sync/atomic"
Expand Down Expand Up @@ -1370,6 +1371,7 @@ func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err e
referIdent := ast.Ident{Schema: s.ReferTable.Schema, Name: s.ReferTable.Name}
return d.CreateTableWithLike(ctx, ident, referIdent, s.IfNotExists)
}

is := d.GetInfoSchemaWithInterceptor(ctx)
schema, ok := is.SchemaByName(ident.Schema)
if !ok {
Expand All @@ -1388,6 +1390,17 @@ func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err e
if err != nil {
return errors.Trace(err)
}

engine := findTableOption(s.Options, ast.TableOptionEngine, "InnoDB")
var p *plugin.Plugin
if engine != "InnoDB" {
p = plugin.Get(plugin.Engine, engine)
if p == nil {
return infoschema.ErrorEngineError.GenWithStackByArgs(404)
}
tbInfo.Engine = engine
}

tbInfo.State = model.StatePublic
err = checkTableInfoValid(tbInfo)
if err != nil {
Expand Down Expand Up @@ -1759,6 +1772,19 @@ func resolveDefaultTableCharsetAndCollation(tbInfo *model.TableInfo, dbCharset s
return
}

func findTableOption(options []*ast.TableOption, tp ast.TableOptionType, _default string) string {
value := _default
for i := len(options) - 1; i >= 0; i-- {
op := options[i]
if op.Tp == tp {
// find the last one.
value = op.StrValue
break
}
}
return value
}

func findTableOptionCharset(options []*ast.TableOption) string {
var tableCharset string
for i := len(options) - 1; i >= 0; i-- {
Expand Down
16 changes: 7 additions & 9 deletions executor/admin_plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ package executor
import (
"context"

"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/plugin"
"github.com/pingcap/tidb/util/chunk"
)

Expand All @@ -41,12 +39,12 @@ func (e *AdminPluginsExec) Next(ctx context.Context, _ *chunk.Chunk) error {
}

func (e *AdminPluginsExec) changeDisableFlagAndFlush(disabled bool) error {
dom := domain.GetDomain(e.ctx)
for _, pluginName := range e.Plugins {
err := plugin.ChangeDisableFlagAndFlush(dom, pluginName, disabled)
if err != nil {
return err
}
}
//dom := domain.GetDomain(e.ctx)
//for _, pluginName := range e.Plugins {
// err := plugin.ChangeDisableFlagAndFlush(dom, pluginName, disabled)
// if err != nil {
// return err
// }
//}
return nil
}
29 changes: 27 additions & 2 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"bytes"
"context"
"fmt"
"github.com/pingcap/tidb/plugin"
"math"
"sort"
"strings"
Expand Down Expand Up @@ -208,6 +209,8 @@ func (b *executorBuilder) build(p plannercore.Plan) Executor {
return b.buildSQLBindExec(v)
case *plannercore.SplitRegion:
return b.buildSplitRegion(v)
case *plannercore.PhysicalTableScan:
return b.buildTableScan(v)
default:
if mp, ok := p.(MockPhysicalPlan); ok {
return mp.GetExecutor()
Expand Down Expand Up @@ -1923,16 +1926,38 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea
return e, nil
}

func (b *executorBuilder) buildTableScan(v *plannercore.PhysicalTableScan) Executor {

plugin.Get(plugin.Engine, "csv")
return &PluginScanExecutor{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
Plugin: plugin.Get(plugin.Engine, "csv"),
Table: v.Table,
Columns: v.Columns,
}
}

// buildTableReader builds a table reader executor. It first build a no range table reader,
// and then update it ranges from table scan plan.
func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) *TableReaderExecutor {
func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) Executor {
ts := v.TablePlans[0].(*plannercore.PhysicalTableScan)
if ts.Table.Engine == "csv" {
if len(v.TablePlans) == 2 {
if tSelect, ok:=v.TablePlans[1].(*plannercore.PhysicalSelection);ok{
return b.buildSelection(tSelect)
}

}

return b.buildTableScan(ts)
}

ret, err := buildNoRangeTableReader(b, v)
if err != nil {
b.err = err
return nil
}

ts := v.TablePlans[0].(*plannercore.PhysicalTableScan)
ret.ranges = ts.Ranges
sctx := b.ctx.GetSessionVars().StmtCtx
sctx.TableIDs = append(sctx.TableIDs, ts.Table.ID)
Expand Down
39 changes: 39 additions & 0 deletions executor/plugin_executor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package executor

import (
"fmt"
"github.com/davecgh/go-spew/spew"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/plugin"
"github.com/pingcap/tidb/util/chunk"
"golang.org/x/net/context"
)

type PluginScanExecutor struct {
baseExecutor
Table *model.TableInfo
Columns []*model.ColumnInfo
Plugin *plugin.Plugin
pm *plugin.EngineManifest
meta *plugin.ExecutorMeta
}

func (e *PluginScanExecutor) Open(ctx context.Context) error {
e.pm = plugin.DeclareEngineManifest(e.Plugin.Manifest)
e.meta = &plugin.ExecutorMeta{
Table: e.Table,
}
e.pm.OnReaderOpen(ctx, e.meta)
return nil
}

func (e *PluginScanExecutor) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
err := e.pm.OnReaderNext(ctx, chk, e.meta)
fmt.Println("pe next finished", spew.Sdump(err))
return err
}

func (e *PluginScanExecutor) Close() error {
return nil
}
15 changes: 7 additions & 8 deletions executor/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/plugin"
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
Expand Down Expand Up @@ -987,13 +986,13 @@ func (e *SimpleExec) executeFlush(s *ast.FlushStmt) error {
err = dom.PrivilegeHandle().Update(ctx.(sessionctx.Context))
return err
case ast.FlushTiDBPlugin:
dom := domain.GetDomain(e.ctx)
for _, pluginName := range s.Plugins {
err := plugin.NotifyFlush(dom, pluginName)
if err != nil {
return err
}
}
//dom := domain.GetDomain(e.ctx)
//for _, pluginName := range s.Plugins {
// err := plugin.NotifyFlush(dom, pluginName)
// if err != nil {
// return err
// }
//}
}
return nil
}
Expand Down
4 changes: 4 additions & 0 deletions infoschema/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ var (
ErrTableLocked = terror.ClassSchema.New(codeTableLocked, mysql.MySQLErrName[mysql.ErrTableLocked])
// ErrAccessDenied return when the user doesn't have the permission to access the table.
ErrAccessDenied = terror.ClassSchema.New(codeErrAccessDenied, mysql.MySQLErrName[mysql.ErrAccessDenied])
// ErrAccessDenied return when the user use custom engine have errors.
ErrorEngineError = terror.ClassSchema.New(codeEngineError, "Got error %d from storage engine")
)

// InfoSchema is the interface used to retrieve the schema information.
Expand Down Expand Up @@ -339,6 +341,8 @@ const (
codeKeyNameDuplicate = 1061
codeKeyNotExists = 1176

codeEngineError = mysql.ErrGetErrno

codeErrTableNotLockedForWrite = mysql.ErrTableNotLockedForWrite
codeErrTableNotLocked = mysql.ErrTableNotLocked
codeErrNonuniqTable = mysql.ErrNonuniqTable
Expand Down
22 changes: 21 additions & 1 deletion infoschema/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/pingcap/tidb/plugin"
"sort"
"sync"
"time"
Expand Down Expand Up @@ -969,6 +970,25 @@ func dataForEngines() (records [][]types.Datum) {
"YES", // Savepoints
),
)

thirdEngine := plugin.List(plugin.Engine)
for _, e := range thirdEngine {

records = append(records,
types.MakeDatums(
e.Name, // Engine
"YES", // Support
"Created By Plugin :)", // Comment
"NO", // Transactions
"NO", // XA
"NO", // Savepoints
),
)

}



return records
}

Expand Down Expand Up @@ -1268,7 +1288,7 @@ func dataForTables(ctx sessionctx.Context, schemas []*model.DBInfo) ([][]types.D
schema.Name.O, // TABLE_SCHEMA
table.Name.O, // TABLE_NAME
"BASE TABLE", // TABLE_TYPE
"InnoDB", // ENGINE
table.Engine, // ENGINE
uint64(10), // VERSION
"Compact", // ROW_FORMAT
rowCount, // TABLE_ROWS
Expand Down
3 changes: 3 additions & 0 deletions plugin/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const (
Schema
// Daemon indicate a plugin that can run as daemon task.
Daemon
Engine
// UDF indicate a plugin that can add user define func to tidb.
UDF
)
Expand All @@ -39,6 +40,8 @@ func (k Kind) String() (str string) {
str = "Schema"
case Daemon:
str = "Daemon"
case Engine:
str = "Engine"
case UDF:
str = "UDF"
}
Expand Down
66 changes: 66 additions & 0 deletions plugin/csv/csv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"context"
"fmt"
"github.com/pingcap/tidb/plugin"
"github.com/pingcap/tidb/util/chunk"
)

type ReadExecutor struct {
pos int
}

var Files = make(map[string]*ReadExecutor)

// Validate implements TiDB plugin's Validate SPI.
func Validate(ctx context.Context, m *plugin.Manifest) error {
fmt.Println("csv plugin validate")
return nil
}

// OnInit implements TiDB plugin's OnInit SPI.
func OnInit(ctx context.Context, manifest *plugin.Manifest) error {
fmt.Println("csv init called")
return nil
}

// OnShutdown implements TiDB plugin's OnShutdown SPI.
func OnShutdown(ctx context.Context, manifest *plugin.Manifest) error {
fmt.Println("csv shutdown called")
return nil
}

func OnReaderOpen(ctx context.Context, meta *plugin.ExecutorMeta) {
Files[meta.Table.Name.L] = &ReadExecutor{
pos: 0,
}
}

func OnReaderNext(ctx context.Context, chk *chunk.Chunk, meta *plugin.ExecutorMeta) error {
if _, ok := Files[meta.Table.Name.L]; !ok {
fmt.Println("have some problem")
return nil
}
e := Files[meta.Table.Name.L]
if e.pos > 5 {
return nil
}
chk.AppendInt64(0, int64(e.pos))
chk.AppendString(1, "233333")
e.pos += 1
return nil
}
Loading

0 comments on commit e652752

Please sign in to comment.