Skip to content
This repository has been archived by the owner on Dec 9, 2024. It is now read-only.

[DNM] Stress test with high write load #410

Draft
wants to merge 10 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ build: bindir consistency isolation pocket on-dup sqllogic block-writer \
region-available crud \
read-stress follower-read pessimistic resolve-lock cdc-bank \
example ttl \
write-stress \
# +tipocket:scaffold:makefile_build

bindir:
Expand Down Expand Up @@ -134,6 +135,10 @@ ttl:
cd testcase/ttl ; make build; \
cp bin/* ../../bin/

write-stress:
cd testcase/write-stress; make build; \
cp bin/* ../../bin/

# +tipocket:scaffold:makefile_build_cmd

tipocket:
Expand Down
4 changes: 4 additions & 0 deletions run/lib/case.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,9 @@
[
'/bin/tll',
],
'write-stress'(args={})::
[
'/bin/write-stress',
],
// +tipocket:scaffold:case_decls
}
11 changes: 11 additions & 0 deletions run/workflow/write-stress.jsonnet
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
_config+:: {
case_name: 'write-stress',
image_name: 'hub.pingcap.net/qa/tipocket',
args+: {
// k8s configurations
// 'storage-class': 'local-storage',
},
command: {},
},
}
60 changes: 60 additions & 0 deletions testcase/write-stress/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@

GOARCH := $(if $(GOARCH),$(GOARCH),amd64)
GO=GO15VENDOREXPERIMENT="1" CGO_ENABLED=0 GOOS=$(GOOS) GOARCH=$(GOARCH) GO111MODULE=on go
GOTEST=GO15VENDOREXPERIMENT="1" CGO_ENABLED=0 GO111MODULE=on go test # go race detector requires cgo
VERSION := $(if $(VERSION),$(VERSION),latest)

PACKAGES := go list ./...| grep -vE 'vendor'

LDFLAGS += -s -w
LDFLAGS += -X "github.com/pingcap/tipocket/pkg/test-infra/fixture.BuildTS=$(shell date -u '+%Y-%m-%d %I:%M:%S')"
LDFLAGS += -X "github.com/pingcap/tipocket/pkg/test-infra/fixture.BuildHash=$(shell git rev-parse HEAD)"

GOBUILD=$(GO) build -ldflags '$(LDFLAGS)'

DOCKER_REGISTRY_PREFIX := $(if $(DOCKER_REGISTRY),$(DOCKER_REGISTRY)/,)

default: tidy fmt lint build

build: mod-sum write-stress

write-stress:
$(GOBUILD) $(GOMOD) -o bin/write-stress cmd/*.go

fmt: groupimports
go fmt ./...

mod-sum:
$(GO) mod tidy

tidy:
@echo "go mod tidy"
GO111MODULE=on go mod tidy
@git diff --exit-code -- go.mod

lint: revive
@echo "linting"
revive -formatter friendly -config revive.toml $$($(PACKAGES))

revive:
ifeq (,$(shell which revive))
@echo "installing revive"
$(GO) get github.com/mgechev/revive@v1.0.2
endif

groupimports: install-goimports
goimports -w -l -local github.com/pingcap/tipocket .

install-goimports:
ifeq (,$(shell which goimports))
@echo "installing goimports"
go get golang.org/x/tools/cmd/goimports
endif

clean:
@rm -rf bin/*

test:
$(GOTEST) ./...

.PHONY: all clean build
101 changes: 101 additions & 0 deletions testcase/write-stress/append.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package testcase

import (
"context"
"encoding/base64"
"fmt"
"math/rand"
"sync"

"github.com/ngaut/log"
"github.com/pingcap/errors"

"github.com/pingcap/tipocket/pkg/cluster"
"github.com/pingcap/tipocket/util"
)

type appendClient struct {
baseClient
}

func (c *appendClient) SetUp(ctx context.Context, nodes []cluster.Node, clientNodes []cluster.ClientNode, idx int) error {
if err := c.baseClient.SetUp(ctx, nodes, clientNodes, idx); err != nil {
return err
}
// Use 32 threads to create tables.
var wg sync.WaitGroup
for i := 0; i < 32; i++ {
wg.Add(1)
go func(tid int) {
defer wg.Done()
for j := 0; j < c.tables; j++ {
if j%32 == tid {
if c.dropTable {
util.MustExec(c.db, fmt.Sprintf("drop table if exists write_stress%d", j+1))
}
util.MustExec(c.db, fmt.Sprintf("create table if not exists write_stress%d(col1 bigint, col2 varchar(256), data longtext, key k(col1, col2))", j+1))
}
}
}(i)
}
wg.Wait()
return nil
}

func (c *appendClient) Start(ctx context.Context, cfg interface{}, clientNodes []cluster.ClientNode) error {
var wg sync.WaitGroup
for i := 0; i < c.concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
}
err := c.runClient(ctx)
log.Error(err)
}
}()
}

wg.Wait()
log.Info("everything is ok!")
return nil
}

func (c *appendClient) runClient(ctx context.Context) error {
rng := rand.New(rand.NewSource(rand.Int63()))

col2 := make([]byte, 192)
data := make([]byte, c.padLength)
for {
col1 := rng.Int63()
col2Len := rng.Intn(192)
_, _ = rng.Read(col2[:col2Len])
dataLen := rng.Intn(c.padLength)
_, _ = rng.Read(data[:dataLen])
tid := rng.Int()%c.tables + 1
sql := fmt.Sprintf("insert into write_stress%d values (?, ?, ?)", tid)
_, err := c.db.ExecContext(ctx, sql, col1,
base64.StdEncoding.EncodeToString(col2[:col2Len]),
base64.StdEncoding.EncodeToString(data[:dataLen]))
if err != nil {
return errors.Trace(err)
}
}
}
79 changes: 79 additions & 0 deletions testcase/write-stress/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package testcase

import (
"context"
"database/sql"
"fmt"

"github.com/ngaut/log"

"github.com/pingcap/tipocket/pkg/cluster"
"github.com/pingcap/tipocket/pkg/core"
"github.com/pingcap/tipocket/pkg/test-infra/fixture"
"github.com/pingcap/tipocket/util"
)

// CaseCreator is a creator of test client
type CaseCreator struct {
CaseName string
Concurrency int
Tables int
PadLength int
DropTable bool
}

// Create creates a test client
func (c CaseCreator) Create(node cluster.ClientNode) core.Client {
base := baseClient{
concurrency: c.Concurrency,
tables: c.Tables,
padLength: c.PadLength,
dropTable: c.DropTable,
}
switch c.CaseName {
case "uniform":
return &uniformClient{base}
case "append":
return &appendClient{base}
}
return nil
}

type baseClient struct {
db *sql.DB
concurrency int
tables int
padLength int
dropTable bool
}

func (c *baseClient) SetUp(ctx context.Context, _ []cluster.Node, clientNodes []cluster.ClientNode, idx int) error {
log.Info("start to setup client...")
node := clientNodes[idx]
dsn := fmt.Sprintf("root@tcp(%s:%d)/test", node.IP, node.Port)
util.SetMySQLProxy(fixture.Context.MySQLProxy)
db, err := util.OpenDB(dsn, c.concurrency)
if err != nil {
log.Fatalf("open db error: %v", err)
}

c.db = db
return nil
}

func (c *baseClient) TearDown(ctx context.Context, nodes []cluster.ClientNode, idx int) error {
return nil
}
65 changes: 65 additions & 0 deletions testcase/write-stress/cmd/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"context"
"flag"

// use mysql
_ "github.com/go-sql-driver/mysql"

"github.com/pingcap/tipocket/cmd/util"
logs "github.com/pingcap/tipocket/logsearch/pkg/logs"
"github.com/pingcap/tipocket/pkg/cluster"
"github.com/pingcap/tipocket/pkg/control"
test_infra "github.com/pingcap/tipocket/pkg/test-infra"
"github.com/pingcap/tipocket/pkg/test-infra/fixture"

testcase "github.com/pingcap/tipocket/testcase/write-stress"
)

var (
concurrency = flag.Int("concurrency", 1024, "write concurrency")
tables = flag.Int("tables", 1, "total tables")
padLength = flag.Int("pad-length", 65536, "pad string length")
dropTable = flag.Bool("drop-table", false, "drop existed tables")

caseName = flag.String("case-name", "uniform", "test case name")
)

func main() {
flag.Parse()
cfg := control.Config{
Mode: control.ModeStandard,
ClientCount: 1,
RunTime: fixture.Context.RunTime,
}
c := fixture.Context
suit := util.Suit{
Config: &cfg,
Provider: cluster.NewDefaultClusterProvider(),
ClientCreator: testcase.CaseCreator{
CaseName: *caseName,
Concurrency: *concurrency,
Tables: *tables,
PadLength: *padLength,
DropTable: *dropTable,
},
NemesisGens: util.ParseNemesisGenerators(fixture.Context.Nemesis),
ClusterDefs: test_infra.NewDefaultCluster(c.Namespace, c.ClusterName, c.TiDBClusterConfig),
LogsClient: logs.NewDiagnosticLogClient(),
}
suit.Run(context.Background())
}
Loading