Skip to content

Commit

Permalink
Support cancel task and add benchmark status (pingcap#43)
Browse files Browse the repository at this point in the history
* support cancel job and refine demo

* make benchmark great again

* fmt

* support task recover

* fix ci
  • Loading branch information
hanfei1991 authored Dec 11, 2021
1 parent cefc756 commit 87a6e61
Show file tree
Hide file tree
Showing 24 changed files with 905 additions and 1,761 deletions.
5 changes: 5 additions & 0 deletions cmd/executor/example1.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
join = "127.0.0.1:10240"
worker-addr = "127.0.0.1:10244"
keepalive-ttl = "10s"
keepalive-interval = "500ms"
session-ttl = 10
6 changes: 3 additions & 3 deletions cmd/master-client/bench-example.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
flow-id = "feed1"
table-num = 10
servers = ["127.0.0.1:50051","127.0.0.1:50052","127.0.0.1:50053"]
rcd-cnt = 100000
ddl-freq = 1000
servers = ["127.0.0.1:50046","127.0.0.1:50047","127.0.0.1:50048","127.0.0.1:50049","127.0.0.1:50050","127.0.0.1:50051","127.0.0.1:50052","127.0.0.1:50053"]
rcd-cnt = 1000000000
ddl-freq = 100000000
92 changes: 57 additions & 35 deletions cmd/master-client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,66 +6,88 @@ import (
"flag"
"fmt"
"os"
"strconv"

"github.com/hanfei1991/microcosm/master"
"github.com/hanfei1991/microcosm/master/jobmaster/benchmark"
"github.com/hanfei1991/microcosm/pb"
"github.com/pkg/errors"
"google.golang.org/grpc"
)

func main() {
cmd := os.Args[1]
addr := ""
switch cmd {
case "submit-job":
case "submit-job", "cancel-job":
flag1 := os.Args[2]
if flag1 != "--master-addr" {
fmt.Printf("no master address found")
os.Exit(1)
}
addr = os.Args[3]

case "help":
default:
fmt.Printf("submit-job --config configFile")
os.Exit(0)
}
conn, err := grpc.Dial(addr, grpc.WithInsecure())
ctx := context.Background()
clt, err := master.NewMasterClient(ctx, []string{addr})
if err != nil {
fmt.Printf("err: %v", err)
}
clt := pb.NewMasterClient(conn)

args := os.Args[4:]
cfg := benchmark.NewConfig()
err = cfg.Parse(args)
switch errors.Cause(err) {
case nil:
case flag.ErrHelp:
os.Exit(0)
default:
fmt.Printf("err1: %v", err)
os.Exit(2)
}

configJSON, err := json.Marshal(cfg)
if err != nil {
fmt.Printf("err2: %v", err)
}
if cmd == "submit-job" {
args := os.Args[4:]
cfg := benchmark.NewConfig()
err = cfg.Parse(args)
switch errors.Cause(err) {
case nil:
case flag.ErrHelp:
os.Exit(0)
default:
fmt.Printf("err1: %v", err)
os.Exit(2)
}

req := &pb.SubmitJobRequest{
Tp: pb.SubmitJobRequest_Benchmark,
Config: configJSON,
User: "hanfei",
}
configJSON, err := json.Marshal(cfg)
if err != nil {
fmt.Printf("err2: %v", err)
}

resp, err := clt.SubmitJob(context.Background(), req)
if err != nil {
fmt.Printf("err: %v", err)
return
req := &pb.SubmitJobRequest{
Tp: pb.SubmitJobRequest_Benchmark,
Config: configJSON,
User: "hanfei",
}
resp, err := clt.SubmitJob(context.Background(), req)
if err != nil {
fmt.Printf("err: %v", err)
return
}
if resp.Err != nil {
fmt.Printf("err: %v", resp.Err.Message)
return
}
fmt.Printf("submit job successful %d", resp.JobId)
}
if resp.Err != nil {
fmt.Printf("err: %v", resp.Err.Message)
return
if cmd == "cancel-job" {
flag1 := os.Args[4]
jobID, err := strconv.Atoi(flag1)
if err != nil {
fmt.Print(err.Error())
os.Exit(1)
}
req := &pb.CancelJobRequest{
JobId: int32(jobID),
}
resp, err := clt.CancelJob(context.Background(), req)
if err != nil {
fmt.Printf("err: %v", err)
return
}
if resp.Err != nil {
fmt.Printf("err: %v", resp.Err.Message)
return
}
fmt.Print("cancel job successful")
}
fmt.Print("submit job successful")
}
107 changes: 87 additions & 20 deletions executor/runtime/benchmark/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/pingcap/ticdc/dm/pkg/log"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
)

type fileWriter struct {
Expand All @@ -35,11 +36,30 @@ func (f *fileWriter) Prepare() error {
return err
}

func (f *fileWriter) write(_ *runtime.TaskContext, r *runtime.Record) error {
r.End = time.Now()
str := []byte(r.String())
// ctx.stats[f.tid].recordCnt ++
// ctx.stats[f.tid].totalLag += r.end.Sub(r.start)
func sprintPayload(r *pb.Record) string {
str := fmt.Sprintf("tid %d, pk %d, time tracer ", r.Tid, r.Pk)
for _, ts := range r.TimeTracer {
str += fmt.Sprintf("%s ", time.Unix(0, ts))
}
return str
}

func sprintRecord(r *runtime.Record) string {
start := time.Unix(0, r.Payload.(*pb.Record).TimeTracer[0])
return fmt.Sprintf("flowID %s start %s end %s payload: %s\n", r.FlowID, start.String(), r.End.String(), sprintPayload(r.Payload.(*pb.Record)))
}

func (f *fileWriter) writeStats(s *recordStats) error {
str := []byte(s.String())
_, err := f.fd.Write(str)
return err
}

func (f *fileWriter) write(s *recordStats, r *runtime.Record) error {
str := []byte(sprintRecord(r))
start := time.Unix(0, r.Payload.(*pb.Record).TimeTracer[0])
s.cnt++
s.totalLag += r.End.Sub(start)
_, err := f.fd.Write(str)
return err
}
Expand Down Expand Up @@ -77,7 +97,7 @@ func (o *opReceive) dial() (client pb.TestServiceClient, err error) {
}
client = mock.NewTestClient(conn)
} else {
conn, err := grpc.Dial(o.addr, grpc.WithInsecure(), grpc.WithBlock())
conn, err := grpc.Dial(o.addr, grpc.WithInsecure(), grpc.WithConnectParams(grpc.ConnectParams{Backoff: backoff.DefaultConfig}))
o.conn = conn
if err != nil {
return nil, errors.New("conn failed")
Expand All @@ -88,6 +108,10 @@ func (o *opReceive) dial() (client pb.TestServiceClient, err error) {
}

func (o *opReceive) Prepare() error {
return nil
}

func (o *opReceive) connect() error {
client, err := o.dial()
if err != nil {
return errors.New("conn failed")
Expand All @@ -106,6 +130,12 @@ func (o *opReceive) Next(ctx *runtime.TaskContext, _ *runtime.Record, _ int) ([]
if !o.running {
o.running = true
go func() {
err := o.connect()
if err != nil {
o.errCh <- err
log.L().Error("opReceive meet error", zap.Error(err))
return
}
for {
record, err := o.binlogClient.Recv()
if err != nil {
Expand All @@ -128,6 +158,8 @@ func (o *opReceive) Next(ctx *runtime.TaskContext, _ *runtime.Record, _ int) ([]
noMoreData := false
select {
case r := <-o.data:
payload := r.Payload.(*pb.Record)
payload.TimeTracer = append(payload.TimeTracer, time.Now().UnixNano())
o.cache = append(o.cache, r)
case err := <-o.errCh:
return nil, true, err
Expand Down Expand Up @@ -158,6 +190,7 @@ func (o *opSyncer) syncDDL(ctx *runtime.TaskContext) {

func (o *opSyncer) Next(ctx *runtime.TaskContext, r *runtime.Record, _ int) ([]runtime.Chunk, bool, error) {
record := r.Payload.(*pb.Record)
record.TimeTracer = append(record.TimeTracer, time.Now().UnixNano())
if record.Tp == pb.Record_DDL {
go o.syncDDL(ctx)
return nil, true, nil
Expand All @@ -167,23 +200,37 @@ func (o *opSyncer) Next(ctx *runtime.TaskContext, r *runtime.Record, _ int) ([]r

func (o *opSyncer) NextWantedInputIdx() int { return 0 }

type recordStats struct {
totalLag time.Duration
cnt int64
}

func (s *recordStats) String() string {
return fmt.Sprintf("total record %d, average lantency %.3f ms", s.cnt, float64(s.totalLag.Milliseconds())/float64(s.cnt))
}

type opSink struct {
writer fileWriter
stats *recordStats
}

func (o *opSink) Close() error { return nil }
func (o *opSink) Close() error {
return o.writer.writeStats(o.stats)
}

func (o *opSink) Prepare() error {
o.stats = new(recordStats)
return o.writer.Prepare()
}

func (o *opSink) Next(ctx *runtime.TaskContext, r *runtime.Record, _ int) ([]runtime.Chunk, bool, error) {
r.End = time.Now()
if test.GlobalTestFlag {
// log.L().Info("send record", zap.Int32("table", r.Tid), zap.Int32("pk", r.payload.(*pb.Record).Pk))
ctx.TestCtx.SendRecord(r)
return nil, false, nil
}
return nil, false, o.writer.write(ctx, r)
return nil, false, o.writer.write(o.stats, r)
}

func (o *opSink) NextWantedInputIdx() int { return 0 }
Expand All @@ -196,6 +243,8 @@ type opProducer struct {

ddlFrequency int32
outputCnt int

checkpoint time.Time
}

func (o *opProducer) Close() error { return nil }
Expand All @@ -207,21 +256,23 @@ func (o *opProducer) NextWantedInputIdx() int { return runtime.DontNeedData }
func (o *opProducer) Next(ctx *runtime.TaskContext, _ *runtime.Record, _ int) ([]runtime.Chunk, bool, error) {
outputData := make([]runtime.Chunk, o.outputCnt)
binlogID := 0
if o.checkpoint.Add(50 * time.Millisecond).After(time.Now()) {
return nil, true, nil
}
for i := 0; i < 128; i++ {
if o.pk >= o.dataCnt {
return outputData, true, nil
}
o.pk++
if o.pk == 10000 {
log.L().Info("got 10000 data")
}
start := time.Now()
if o.pk%o.ddlFrequency == 0 {
o.schemaVer++
for i := range outputData {
payload := &pb.Record{
Tp: pb.Record_DDL,
Tid: o.tid,
SchemaVer: o.schemaVer,
Tp: pb.Record_DDL,
Tid: o.tid,
SchemaVer: o.schemaVer,
TimeTracer: []int64{start.UnixNano()},
}
r := runtime.Record{
Tid: o.tid,
Expand All @@ -231,10 +282,11 @@ func (o *opProducer) Next(ctx *runtime.TaskContext, _ *runtime.Record, _ int) ([
}
}
payload := &pb.Record{
Tp: pb.Record_Data,
Tid: o.tid,
SchemaVer: o.schemaVer,
Pk: o.pk,
Tp: pb.Record_Data,
Tid: o.tid,
SchemaVer: o.schemaVer,
Pk: o.pk,
TimeTracer: []int64{start.UnixNano()},
}
r := runtime.Record{
Tid: o.tid,
Expand All @@ -243,15 +295,27 @@ func (o *opProducer) Next(ctx *runtime.TaskContext, _ *runtime.Record, _ int) ([
outputData[binlogID] = append(outputData[binlogID], &r)
binlogID = (binlogID + 1) % o.outputCnt
}
if !test.GlobalTestFlag {
o.checkpoint = time.Now()
go func() {
time.Sleep(55 * time.Millisecond)
ctx.Wake()
}()
return outputData, true, nil
}
return outputData, false, nil
}

type stoppable interface {
Stop()
}

type opBinlog struct {
binlogChan chan *runtime.Record
wal []*runtime.Record
addr string

server mock.GrpcServer
server stoppable
cacheRecord *runtime.Record
ctx *runtime.TaskContext
}
Expand All @@ -271,6 +335,7 @@ func (o *opBinlog) Prepare() (err error) {
return err
}
s := grpc.NewServer()
o.server = s
pb.RegisterTestServiceServer(s, o)
go func() {
err1 := s.Serve(lis)
Expand Down Expand Up @@ -322,8 +387,10 @@ func (o *opBinlog) FeedBinlog(req *pb.TestBinlogRequest, server pb.TestService_F
}
}
for record := range o.binlogChan {
r := record.Payload.(*pb.Record)
r.TimeTracer = append(r.TimeTracer, time.Now().UnixNano())
o.wal = append(o.wal, record)
err := server.Send(o.wal[id].Payload.(*pb.Record))
err := server.Send(r)
if err != nil {
return err
}
Expand Down
4 changes: 4 additions & 0 deletions executor/runtime/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,15 @@ func (s *Runtime) SubmitTasks(tasks []*model.Task) error {
}
}

if s.tasks == nil {
s.tasks = make(map[model.TaskID]*taskContainer)
}
for _, t := range taskSet {
err := t.prepare()
if err != nil {
return err
}
s.tasks[t.id] = t
}

log.L().Logger.Info("begin to push")
Expand Down
Loading

0 comments on commit 87a6e61

Please sign in to comment.