Skip to content
This repository has been archived by the owner on Dec 9, 2024. It is now read-only.

Add a workload that insert in new partitions and drop old partitions #425

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ build: bindir consistency isolation pocket on-dup sqllogic block-writer \
region-available crud \
read-stress follower-read pessimistic resolve-lock cdc-bank \
example ttl \
partition-write \
# +tipocket:scaffold:makefile_build

bindir:
Expand Down Expand Up @@ -134,6 +135,10 @@ ttl:
cd testcase/ttl ; make build; \
cp bin/* ../../bin/

partition-write:
cd testcase/partition-write; make build; \
cp bin/* ../../bin/

# +tipocket:scaffold:makefile_build_cmd

tipocket:
Expand Down
12 changes: 12 additions & 0 deletions run/lib/case.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,17 @@
[
'/bin/tll',
],
'partition-write'(args={})::
[
'/bin/partition-write',
],
'partition-write'(args={})::
[
'/bin/partition-write',
],
'partition-write'(args={})::
[
'/bin/partition-write',
],
// +tipocket:scaffold:case_decls
}
11 changes: 11 additions & 0 deletions run/workflow/partition-write.jsonnet
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
_config+:: {
case_name: 'partition-write',
image_name: 'hub.pingcap.net/qa/tipocket',
args+: {
// k8s configurations
// 'storage-class': 'local-storage',
},
command: {},
},
}
60 changes: 60 additions & 0 deletions testcase/partition-write/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@

GOARCH := $(if $(GOARCH),$(GOARCH),amd64)
GO=GO15VENDOREXPERIMENT="1" CGO_ENABLED=0 GOOS=$(GOOS) GOARCH=$(GOARCH) GO111MODULE=on go
GOTEST=GO15VENDOREXPERIMENT="1" CGO_ENABLED=1 GO111MODULE=on go test # go race detector requires cgo
VERSION := $(if $(VERSION),$(VERSION),latest)

PACKAGES := go list ./...| grep -vE 'vendor'

LDFLAGS += -s -w
LDFLAGS += -X "github.com/pingcap/tipocket/pkg/test-infra/fixture.BuildTS=$(shell date -u '+%Y-%m-%d %I:%M:%S')"
LDFLAGS += -X "github.com/pingcap/tipocket/pkg/test-infra/fixture.BuildHash=$(shell git rev-parse HEAD)"

GOBUILD=$(GO) build -ldflags '$(LDFLAGS)'

DOCKER_REGISTRY_PREFIX := $(if $(DOCKER_REGISTRY),$(DOCKER_REGISTRY)/,)

default: tidy fmt lint build

build: mod-sum partition-write

partition-write:
$(GOBUILD) $(GOMOD) -o bin/partition-write cmd/*.go

fmt: groupimports
go fmt ./...

mod-sum:
$(GO) mod tidy

tidy:
@echo "go mod tidy"
GO111MODULE=on go mod tidy
@git diff --exit-code -- go.mod

lint: revive
@echo "linting"
revive -formatter friendly -config revive.toml $$($(PACKAGES))

revive:
ifeq (,$(shell which revive))
@echo "installing revive"
$(GO) get github.com/mgechev/revive@v1.0.2
endif

groupimports: install-goimports
goimports -w -l -local github.com/pingcap/tipocket .

install-goimports:
ifeq (,$(shell which goimports))
@echo "installing goimports"
go get golang.org/x/tools/cmd/goimports
endif

clean:
@rm -rf bin/*

test:
$(GOTEST) ./...

.PHONY: all clean build
145 changes: 145 additions & 0 deletions testcase/partition-write/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package testcase

import (
"context"
"database/sql"
"fmt"
"math/rand"
"strings"
"sync"

"github.com/ngaut/log"

"github.com/pingcap/tipocket/pkg/cluster"
"github.com/pingcap/tipocket/pkg/core"
"github.com/pingcap/tipocket/pkg/test-infra/fixture"
"github.com/pingcap/tipocket/util"
)

// CaseCreator is a creator of test client
type CaseCreator struct{}

// Create creates a test client
func (c CaseCreator) Create(node cluster.ClientNode) core.Client {
return &Client{}
}

// Client defines how our test case works
type Client struct {
db *sql.DB
}

// SetUp implements the core.Client interface.
func (c *Client) SetUp(ctx context.Context, _ []cluster.Node, clientNodes []cluster.ClientNode, idx int) error {
log.Info("start to setup client...")
node := clientNodes[idx]
dsn := fmt.Sprintf("root@tcp(%s:%d)/test", node.IP, node.Port)
util.SetMySQLProxy(fixture.Context.MySQLProxy)
db, err := util.OpenDB(dsn, 16)
if err != nil {
log.Fatalf("open db error: %v", err)
}

util.MustExec(db, "drop table if exists t")
util.MustExec(db, "create table t(id int)")
util.MustExec(db, "insert into t(id) values(1)")

c.db = db
return nil
}

// TearDown implements the core.Client interface.
func (c *Client) TearDown(ctx context.Context, nodes []cluster.ClientNode, idx int) error {
util.MustExec(c.db, "drop table t")
return nil
}

var alphabet []byte = []byte("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789")

func padString(rng *rand.Rand) string {
var buf strings.Builder
buf.Grow(256)
for i := 0; i < 256; i++ {
buf.WriteByte(alphabet[rand.Intn(62)])
}
return buf.String()
}

// Start implements the core.StandardClientExtensions interface.
func (c *Client) Start(ctx context.Context, cfg interface{}, clientNodes []cluster.ClientNode) error {
if _, err := c.db.ExecContext(ctx, "drop table if exists t"); err != nil {
return err
}
if _, err := c.db.ExecContext(ctx, "create table t (a bigint, pad text, key k(a)) "+
"SHARD_ROW_ID_BITS = 4 PRE_SPLIT_REGIONS = 4 partition by range (a) ("+
"partition p0 values less than (1000000), "+
"partition p1 values less than (2000000), "+
"partition p2 values less than (3000000))"); err != nil {
return err
}

current := 0
for ctx.Err() == nil {
var wg sync.WaitGroup
for i := 0; i < 512; i++ {
conn, err := c.db.Conn(ctx)
if err != nil {
log.Errorf("failed to get connection: %v", err)
continue
}
wg.Add(1)
go func(conn *sql.Conn) {
defer func() {
conn.Close()
wg.Done()
}()
rng := rand.New(rand.NewSource(rand.Int63()))
for j := 0; j < 60000; j++ {
a := rng.Intn(1000000) + current*1000000
pad := padString(rng)
if _, err := conn.ExecContext(ctx, "insert into t (a, pad) values (?, ?)", a, pad); err != nil {
log.Errorf("failed to insert: %v", err)
if ctx.Err() != nil {
return
}
}
}
}(conn)
}
wg.Wait()

go func(p int) {
toBeCreated := p + 3
if _, err := c.db.ExecContext(ctx, fmt.Sprintf("alter table t add partition (partition p%d values less than (%d))", toBeCreated, (toBeCreated+1)*1000000)); err != nil {
log.Errorf("failed to add partition p%d: %v", toBeCreated, err)
} else {
log.Infof("succeed to add partition p%d", toBeCreated)
}
toBeDeleted := p - 3
if toBeDeleted >= 0 {
if _, err := c.db.ExecContext(ctx, fmt.Sprintf("alter table t drop partition p%d", toBeDeleted)); err != nil {
log.Errorf("failed to drop partition p%d: %v", toBeDeleted, err)
} else {
log.Infof("succeed to drop partition p%d", toBeDeleted)
}
}

}(current)

current++
}
return ctx.Err()
}
50 changes: 50 additions & 0 deletions testcase/partition-write/cmd/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"context"
"flag"

// use mysql
_ "github.com/go-sql-driver/mysql"

"github.com/pingcap/tipocket/cmd/util"
logs "github.com/pingcap/tipocket/logsearch/pkg/logs"
"github.com/pingcap/tipocket/pkg/cluster"
"github.com/pingcap/tipocket/pkg/control"
test_infra "github.com/pingcap/tipocket/pkg/test-infra"
"github.com/pingcap/tipocket/pkg/test-infra/fixture"

testcase "github.com/pingcap/tipocket/testcase/partition-write"
)

func main() {
flag.Parse()
cfg := control.Config{
Mode: control.ModeStandard,
ClientCount: 1,
RunTime: fixture.Context.RunTime,
}
c := fixture.Context
suit := util.Suit{
Config: &cfg,
Provider: cluster.NewDefaultClusterProvider(),
ClientCreator: testcase.CaseCreator{},
NemesisGens: util.ParseNemesisGenerators(fixture.Context.Nemesis),
ClusterDefs: test_infra.NewDefaultCluster(c.Namespace, c.ClusterName, c.TiDBClusterConfig),
LogsClient: logs.NewDiagnosticLogClient(),
}
suit.Run(context.Background())
}
46 changes: 46 additions & 0 deletions testcase/partition-write/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
module github.com/pingcap/tipocket/testcase/partition-write

go 1.16

require (
github.com/go-sql-driver/mysql v1.5.0
github.com/ngaut/log v0.0.0-20180314031856-b8e36e7ba5ac
github.com/pingcap/tipocket v1.0.0
github.com/pingcap/tipocket/logsearch v1.0.0
)

replace google.golang.org/grpc => google.golang.org/grpc v1.26.0

replace github.com/uber-go/atomic => go.uber.org/atomic v1.5.0

replace (
k8s.io/api => k8s.io/api v0.17.0
k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.17.0
k8s.io/apimachinery => k8s.io/apimachinery v0.17.0
k8s.io/apiserver => k8s.io/apiserver v0.17.0
k8s.io/cli-runtime => k8s.io/cli-runtime v0.17.0
k8s.io/client-go => k8s.io/client-go v0.17.0
k8s.io/cloud-provider => k8s.io/cloud-provider v0.17.0
k8s.io/cluster-bootstrap => k8s.io/cluster-bootstrap v0.17.0
k8s.io/code-generator => k8s.io/code-generator v0.17.0
k8s.io/component-base => k8s.io/component-base v0.17.0
k8s.io/cri-api => k8s.io/cri-api v0.17.0
k8s.io/csi-translation-lib => k8s.io/csi-translation-lib v0.17.0
k8s.io/kube-aggregator => k8s.io/kube-aggregator v0.17.0
k8s.io/kube-controller-manager => k8s.io/kube-controller-manager v0.17.0
k8s.io/kube-proxy => k8s.io/kube-proxy v0.17.0
k8s.io/kube-scheduler => k8s.io/kube-scheduler v0.17.0
k8s.io/kubectl => k8s.io/kubectl v0.17.0
k8s.io/kubelet => k8s.io/kubelet v0.17.0
k8s.io/legacy-cloud-providers => k8s.io/legacy-cloud-providers v0.17.0
k8s.io/metrics => k8s.io/metrics v0.17.0
k8s.io/sample-apiserver => k8s.io/sample-apiserver v0.17.0
)

replace github.com/Azure/go-autorest => github.com/Azure/go-autorest v12.2.0+incompatible

replace golang.org/x/net v0.0.0-20190813000000-74dc4d7220e7 => golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7

replace github.com/pingcap/tipocket => ../../.

replace github.com/pingcap/tipocket/logsearch => ../../logsearch
Loading