From 87a6e61265af722904fc53e4eacd601aa1599703 Mon Sep 17 00:00:00 2001 From: Han Fei Date: Sat, 11 Dec 2021 15:24:30 +0800 Subject: [PATCH] Support cancel task and add benchmark status (#43) * support cancel job and refine demo * make benchmark great again * fmt * support task recover * fix ci --- cmd/executor/example1.toml | 5 + cmd/master-client/bench-example.toml | 6 +- cmd/master-client/main.go | 92 +- executor/runtime/benchmark/operator.go | 107 +- executor/runtime/build.go | 4 + executor/runtime/runtime.go | 38 +- executor/runtime/task.go | 45 +- executor/server.go | 13 +- master/client.go | 4 + master/cluster/executor_client.go | 7 + master/jobmanager.go | 15 + master/jobmaster/benchmark/master.go | 13 + master/jobmaster/system/impl.go | 36 + master/jobmaster/system/interface.go | 2 + master/server.go | 4 + model/job.go | 1 + pb/CDCPeerToPeer.pb.go | 1552 ------------------------ pb/master.pb.go | 479 +++++++- pb/test.pb.go | 154 ++- proto/CDCPeerToPeer.proto | 60 - proto/master.proto | 10 + proto/test.proto | 5 +- test/job_test.go | 7 +- test/mock/grpc.go | 7 + 24 files changed, 905 insertions(+), 1761 deletions(-) create mode 100644 cmd/executor/example1.toml delete mode 100644 pb/CDCPeerToPeer.pb.go delete mode 100644 proto/CDCPeerToPeer.proto diff --git a/cmd/executor/example1.toml b/cmd/executor/example1.toml new file mode 100644 index 00000000000..10734b03907 --- /dev/null +++ b/cmd/executor/example1.toml @@ -0,0 +1,5 @@ +join = "127.0.0.1:10240" +worker-addr = "127.0.0.1:10244" +keepalive-ttl = "10s" +keepalive-interval = "500ms" +session-ttl = 10 diff --git a/cmd/master-client/bench-example.toml b/cmd/master-client/bench-example.toml index 0ed0c4ecf33..6e77de922c2 100644 --- a/cmd/master-client/bench-example.toml +++ b/cmd/master-client/bench-example.toml @@ -1,5 +1,5 @@ flow-id = "feed1" table-num = 10 -servers = ["127.0.0.1:50051","127.0.0.1:50052","127.0.0.1:50053"] -rcd-cnt = 100000 -ddl-freq = 1000 +servers = ["127.0.0.1:50046","127.0.0.1:50047","127.0.0.1:50048","127.0.0.1:50049","127.0.0.1:50050","127.0.0.1:50051","127.0.0.1:50052","127.0.0.1:50053"] +rcd-cnt = 1000000000 +ddl-freq = 100000000 diff --git a/cmd/master-client/main.go b/cmd/master-client/main.go index 9b03f7fbe00..c917f6a1aac 100644 --- a/cmd/master-client/main.go +++ b/cmd/master-client/main.go @@ -6,66 +6,88 @@ import ( "flag" "fmt" "os" + "strconv" + "github.com/hanfei1991/microcosm/master" "github.com/hanfei1991/microcosm/master/jobmaster/benchmark" "github.com/hanfei1991/microcosm/pb" "github.com/pkg/errors" - "google.golang.org/grpc" ) func main() { cmd := os.Args[1] addr := "" switch cmd { - case "submit-job": + case "submit-job", "cancel-job": flag1 := os.Args[2] if flag1 != "--master-addr" { fmt.Printf("no master address found") os.Exit(1) } addr = os.Args[3] - - case "help": + default: fmt.Printf("submit-job --config configFile") os.Exit(0) } - conn, err := grpc.Dial(addr, grpc.WithInsecure()) + ctx := context.Background() + clt, err := master.NewMasterClient(ctx, []string{addr}) if err != nil { fmt.Printf("err: %v", err) } - clt := pb.NewMasterClient(conn) - - args := os.Args[4:] - cfg := benchmark.NewConfig() - err = cfg.Parse(args) - switch errors.Cause(err) { - case nil: - case flag.ErrHelp: - os.Exit(0) - default: - fmt.Printf("err1: %v", err) - os.Exit(2) - } - configJSON, err := json.Marshal(cfg) - if err != nil { - fmt.Printf("err2: %v", err) - } + if cmd == "submit-job" { + args := os.Args[4:] + cfg := benchmark.NewConfig() + err = cfg.Parse(args) + switch errors.Cause(err) { + case nil: + case flag.ErrHelp: + os.Exit(0) + default: + fmt.Printf("err1: %v", err) + os.Exit(2) + } - req := &pb.SubmitJobRequest{ - Tp: pb.SubmitJobRequest_Benchmark, - Config: configJSON, - User: "hanfei", - } + configJSON, err := json.Marshal(cfg) + if err != nil { + fmt.Printf("err2: %v", err) + } - resp, err := clt.SubmitJob(context.Background(), req) - if err != nil { - fmt.Printf("err: %v", err) - return + req := &pb.SubmitJobRequest{ + Tp: pb.SubmitJobRequest_Benchmark, + Config: configJSON, + User: "hanfei", + } + resp, err := clt.SubmitJob(context.Background(), req) + if err != nil { + fmt.Printf("err: %v", err) + return + } + if resp.Err != nil { + fmt.Printf("err: %v", resp.Err.Message) + return + } + fmt.Printf("submit job successful %d", resp.JobId) } - if resp.Err != nil { - fmt.Printf("err: %v", resp.Err.Message) - return + if cmd == "cancel-job" { + flag1 := os.Args[4] + jobID, err := strconv.Atoi(flag1) + if err != nil { + fmt.Print(err.Error()) + os.Exit(1) + } + req := &pb.CancelJobRequest{ + JobId: int32(jobID), + } + resp, err := clt.CancelJob(context.Background(), req) + if err != nil { + fmt.Printf("err: %v", err) + return + } + if resp.Err != nil { + fmt.Printf("err: %v", resp.Err.Message) + return + } + fmt.Print("cancel job successful") } - fmt.Print("submit job successful") } diff --git a/executor/runtime/benchmark/operator.go b/executor/runtime/benchmark/operator.go index 1e0047b1827..7f12745fd81 100644 --- a/executor/runtime/benchmark/operator.go +++ b/executor/runtime/benchmark/operator.go @@ -16,6 +16,7 @@ import ( "github.com/pingcap/ticdc/dm/pkg/log" "go.uber.org/zap" "google.golang.org/grpc" + "google.golang.org/grpc/backoff" ) type fileWriter struct { @@ -35,11 +36,30 @@ func (f *fileWriter) Prepare() error { return err } -func (f *fileWriter) write(_ *runtime.TaskContext, r *runtime.Record) error { - r.End = time.Now() - str := []byte(r.String()) - // ctx.stats[f.tid].recordCnt ++ - // ctx.stats[f.tid].totalLag += r.end.Sub(r.start) +func sprintPayload(r *pb.Record) string { + str := fmt.Sprintf("tid %d, pk %d, time tracer ", r.Tid, r.Pk) + for _, ts := range r.TimeTracer { + str += fmt.Sprintf("%s ", time.Unix(0, ts)) + } + return str +} + +func sprintRecord(r *runtime.Record) string { + start := time.Unix(0, r.Payload.(*pb.Record).TimeTracer[0]) + return fmt.Sprintf("flowID %s start %s end %s payload: %s\n", r.FlowID, start.String(), r.End.String(), sprintPayload(r.Payload.(*pb.Record))) +} + +func (f *fileWriter) writeStats(s *recordStats) error { + str := []byte(s.String()) + _, err := f.fd.Write(str) + return err +} + +func (f *fileWriter) write(s *recordStats, r *runtime.Record) error { + str := []byte(sprintRecord(r)) + start := time.Unix(0, r.Payload.(*pb.Record).TimeTracer[0]) + s.cnt++ + s.totalLag += r.End.Sub(start) _, err := f.fd.Write(str) return err } @@ -77,7 +97,7 @@ func (o *opReceive) dial() (client pb.TestServiceClient, err error) { } client = mock.NewTestClient(conn) } else { - conn, err := grpc.Dial(o.addr, grpc.WithInsecure(), grpc.WithBlock()) + conn, err := grpc.Dial(o.addr, grpc.WithInsecure(), grpc.WithConnectParams(grpc.ConnectParams{Backoff: backoff.DefaultConfig})) o.conn = conn if err != nil { return nil, errors.New("conn failed") @@ -88,6 +108,10 @@ func (o *opReceive) dial() (client pb.TestServiceClient, err error) { } func (o *opReceive) Prepare() error { + return nil +} + +func (o *opReceive) connect() error { client, err := o.dial() if err != nil { return errors.New("conn failed") @@ -106,6 +130,12 @@ func (o *opReceive) Next(ctx *runtime.TaskContext, _ *runtime.Record, _ int) ([] if !o.running { o.running = true go func() { + err := o.connect() + if err != nil { + o.errCh <- err + log.L().Error("opReceive meet error", zap.Error(err)) + return + } for { record, err := o.binlogClient.Recv() if err != nil { @@ -128,6 +158,8 @@ func (o *opReceive) Next(ctx *runtime.TaskContext, _ *runtime.Record, _ int) ([] noMoreData := false select { case r := <-o.data: + payload := r.Payload.(*pb.Record) + payload.TimeTracer = append(payload.TimeTracer, time.Now().UnixNano()) o.cache = append(o.cache, r) case err := <-o.errCh: return nil, true, err @@ -158,6 +190,7 @@ func (o *opSyncer) syncDDL(ctx *runtime.TaskContext) { func (o *opSyncer) Next(ctx *runtime.TaskContext, r *runtime.Record, _ int) ([]runtime.Chunk, bool, error) { record := r.Payload.(*pb.Record) + record.TimeTracer = append(record.TimeTracer, time.Now().UnixNano()) if record.Tp == pb.Record_DDL { go o.syncDDL(ctx) return nil, true, nil @@ -167,23 +200,37 @@ func (o *opSyncer) Next(ctx *runtime.TaskContext, r *runtime.Record, _ int) ([]r func (o *opSyncer) NextWantedInputIdx() int { return 0 } +type recordStats struct { + totalLag time.Duration + cnt int64 +} + +func (s *recordStats) String() string { + return fmt.Sprintf("total record %d, average lantency %.3f ms", s.cnt, float64(s.totalLag.Milliseconds())/float64(s.cnt)) +} + type opSink struct { writer fileWriter + stats *recordStats } -func (o *opSink) Close() error { return nil } +func (o *opSink) Close() error { + return o.writer.writeStats(o.stats) +} func (o *opSink) Prepare() error { + o.stats = new(recordStats) return o.writer.Prepare() } func (o *opSink) Next(ctx *runtime.TaskContext, r *runtime.Record, _ int) ([]runtime.Chunk, bool, error) { + r.End = time.Now() if test.GlobalTestFlag { // log.L().Info("send record", zap.Int32("table", r.Tid), zap.Int32("pk", r.payload.(*pb.Record).Pk)) ctx.TestCtx.SendRecord(r) return nil, false, nil } - return nil, false, o.writer.write(ctx, r) + return nil, false, o.writer.write(o.stats, r) } func (o *opSink) NextWantedInputIdx() int { return 0 } @@ -196,6 +243,8 @@ type opProducer struct { ddlFrequency int32 outputCnt int + + checkpoint time.Time } func (o *opProducer) Close() error { return nil } @@ -207,21 +256,23 @@ func (o *opProducer) NextWantedInputIdx() int { return runtime.DontNeedData } func (o *opProducer) Next(ctx *runtime.TaskContext, _ *runtime.Record, _ int) ([]runtime.Chunk, bool, error) { outputData := make([]runtime.Chunk, o.outputCnt) binlogID := 0 + if o.checkpoint.Add(50 * time.Millisecond).After(time.Now()) { + return nil, true, nil + } for i := 0; i < 128; i++ { if o.pk >= o.dataCnt { return outputData, true, nil } o.pk++ - if o.pk == 10000 { - log.L().Info("got 10000 data") - } + start := time.Now() if o.pk%o.ddlFrequency == 0 { o.schemaVer++ for i := range outputData { payload := &pb.Record{ - Tp: pb.Record_DDL, - Tid: o.tid, - SchemaVer: o.schemaVer, + Tp: pb.Record_DDL, + Tid: o.tid, + SchemaVer: o.schemaVer, + TimeTracer: []int64{start.UnixNano()}, } r := runtime.Record{ Tid: o.tid, @@ -231,10 +282,11 @@ func (o *opProducer) Next(ctx *runtime.TaskContext, _ *runtime.Record, _ int) ([ } } payload := &pb.Record{ - Tp: pb.Record_Data, - Tid: o.tid, - SchemaVer: o.schemaVer, - Pk: o.pk, + Tp: pb.Record_Data, + Tid: o.tid, + SchemaVer: o.schemaVer, + Pk: o.pk, + TimeTracer: []int64{start.UnixNano()}, } r := runtime.Record{ Tid: o.tid, @@ -243,15 +295,27 @@ func (o *opProducer) Next(ctx *runtime.TaskContext, _ *runtime.Record, _ int) ([ outputData[binlogID] = append(outputData[binlogID], &r) binlogID = (binlogID + 1) % o.outputCnt } + if !test.GlobalTestFlag { + o.checkpoint = time.Now() + go func() { + time.Sleep(55 * time.Millisecond) + ctx.Wake() + }() + return outputData, true, nil + } return outputData, false, nil } +type stoppable interface { + Stop() +} + type opBinlog struct { binlogChan chan *runtime.Record wal []*runtime.Record addr string - server mock.GrpcServer + server stoppable cacheRecord *runtime.Record ctx *runtime.TaskContext } @@ -271,6 +335,7 @@ func (o *opBinlog) Prepare() (err error) { return err } s := grpc.NewServer() + o.server = s pb.RegisterTestServiceServer(s, o) go func() { err1 := s.Serve(lis) @@ -322,8 +387,10 @@ func (o *opBinlog) FeedBinlog(req *pb.TestBinlogRequest, server pb.TestService_F } } for record := range o.binlogChan { + r := record.Payload.(*pb.Record) + r.TimeTracer = append(r.TimeTracer, time.Now().UnixNano()) o.wal = append(o.wal, record) - err := server.Send(o.wal[id].Payload.(*pb.Record)) + err := server.Send(r) if err != nil { return err } diff --git a/executor/runtime/build.go b/executor/runtime/build.go index e9cd708fbd6..e0fc0d4f63a 100644 --- a/executor/runtime/build.go +++ b/executor/runtime/build.go @@ -82,11 +82,15 @@ func (s *Runtime) SubmitTasks(tasks []*model.Task) error { } } + if s.tasks == nil { + s.tasks = make(map[model.TaskID]*taskContainer) + } for _, t := range taskSet { err := t.prepare() if err != nil { return err } + s.tasks[t.id] = t } log.L().Logger.Info("begin to push") diff --git a/executor/runtime/runtime.go b/executor/runtime/runtime.go index 72b083656f0..22c04bff885 100644 --- a/executor/runtime/runtime.go +++ b/executor/runtime/runtime.go @@ -4,6 +4,7 @@ import ( "context" "sync" + "github.com/hanfei1991/microcosm/model" "github.com/hanfei1991/microcosm/test" ) @@ -30,11 +31,39 @@ func (q *queue) push(t *taskContainer) { } type Runtime struct { - testCtx *test.Context - q queue + testCtx *test.Context + tasksLock sync.Mutex + tasks map[model.TaskID]*taskContainer + q queue + wg sync.WaitGroup } -func (s *Runtime) Run(ctx context.Context) { +func (s *Runtime) Stop(tasks []int32) error { + s.tasksLock.Lock() + defer s.tasksLock.Unlock() + var retErr error + for _, id := range tasks { + if task, ok := s.tasks[model.TaskID(id)]; ok { + err := task.Stop() + if err != nil { + retErr = err + } + delete(s.tasks, task.id) + } + } + return retErr +} + +func (s *Runtime) Run(ctx context.Context, cur int) { + s.wg.Add(cur) + for i := 0; i < cur; i++ { + go s.runImpl(ctx) + } + s.wg.Wait() +} + +func (s *Runtime) runImpl(ctx context.Context) { + defer s.wg.Done() for { select { case <-ctx.Done(): @@ -51,6 +80,9 @@ func (s *Runtime) Run(ctx context.Context) { if t.tryBlock() { continue } + // the status is waking + } else if status == Stop { + continue } t.setRunnable() s.q.push(t) diff --git a/executor/runtime/task.go b/executor/runtime/task.go index c266df4821e..e87bbaa6306 100644 --- a/executor/runtime/task.go +++ b/executor/runtime/task.go @@ -1,9 +1,7 @@ package runtime import ( - // "fmt" - //"log" - "fmt" + "sync" "sync/atomic" "time" @@ -17,20 +15,16 @@ const ( Runnable TaskStatus = iota Blocked Waking + Stop ) type Record struct { - flowID string - start time.Time + FlowID string End time.Time Payload interface{} Tid int32 } -func (r *Record) String() string { - return fmt.Sprintf("flowID %s start %s end %s payload: %v\n", r.flowID, r.start.String(), r.End.String(), r.Payload) -} - type Channel struct { innerChan chan *Record sendCtx *TaskContext @@ -87,6 +81,8 @@ type taskContainer struct { inputs []*Channel outputs []*Channel ctx *TaskContext + + stopLock sync.Mutex } func (t *taskContainer) prepare() error { @@ -97,6 +93,9 @@ func (t *taskContainer) prepare() error { func (t *taskContainer) tryAwake() bool { for { + if atomic.LoadInt32(&t.status) == int32(Stop) { + return false + } // log.Printf("try wake task %d", t.id) if atomic.CompareAndSwapInt32(&t.status, int32(Blocked), int32(Waking)) { // log.Printf("wake task %d successful", t.id) @@ -120,7 +119,7 @@ func (t *taskContainer) tryBlock() bool { } func (t *taskContainer) setRunnable() { - atomic.StoreInt32(&t.status, int32(Runnable)) + atomic.CompareAndSwapInt32(&t.status, int32(Waking), int32(Runnable)) } func (t *taskContainer) tryFlush() (blocked bool) { @@ -166,7 +165,33 @@ const ( DontRequireIndex int = -2 ) +func (t *taskContainer) Stop() error { + for { + if atomic.LoadInt32(&t.status) == int32(Stop) { + return nil + } + + if atomic.CompareAndSwapInt32(&t.status, int32(Runnable), int32(Stop)) { + break + } + + if atomic.CompareAndSwapInt32(&t.status, int32(Blocked), int32(Stop)) { + break + } + } + + t.stopLock.Lock() + defer t.stopLock.Unlock() + + return t.op.Close() +} + func (t *taskContainer) Poll() TaskStatus { + t.stopLock.Lock() + defer t.stopLock.Unlock() + if atomic.LoadInt32(&t.status) == int32(Stop) { + return Stop + } if t.tryFlush() { return Blocked } diff --git a/executor/server.go b/executor/server.go index d3526da82bf..9fdc9c7c7a8 100644 --- a/executor/server.go +++ b/executor/server.go @@ -81,6 +81,15 @@ func (s *Server) SubmitBatchTasks(ctx context.Context, req *pb.SubmitBatchTasksR // CancelBatchTasks implements pb interface. func (s *Server) CancelBatchTasks(ctx context.Context, req *pb.CancelBatchTasksRequest) (*pb.CancelBatchTasksResponse, error) { + log.L().Info("cancel tasks", zap.String("req", req.String())) + err := s.sch.Stop(req.TaskIdList) + if err != nil { + return &pb.CancelBatchTasksResponse{ + Err: &pb.Error{ + Message: err.Error(), + }, + }, nil + } return &pb.CancelBatchTasksResponse{}, nil } @@ -102,7 +111,7 @@ func (s *Server) startForTest(ctx context.Context) (err error) { s.sch = runtime.NewRuntime(s.testCtx) go func() { defer s.cancel() - s.sch.Run(ctx) + s.sch.Run(ctx, 10) }() err = s.selfRegister(ctx) @@ -148,7 +157,7 @@ func (s *Server) Start(ctx context.Context) error { s.sch = runtime.NewRuntime(nil) go func() { defer s.close() - s.sch.Run(ctx1) + s.sch.Run(ctx1, 10) }() err = s.selfRegister(ctx1) diff --git a/master/client.go b/master/client.go index 0e1c125d916..051e761beb8 100644 --- a/master/client.go +++ b/master/client.go @@ -78,6 +78,10 @@ func (c *Client) SubmitJob(ctx context.Context, req *pb.SubmitJobRequest) (resp return c.client.SubmitJob(ctx, req) } +func (c *Client) CancelJob(ctx context.Context, req *pb.CancelJobRequest) (resp *pb.CancelJobResponse, err error) { + return c.client.CancelJob(ctx, req) +} + func (c *Client) QueryMetaStore( ctx context.Context, req *pb.QueryMetaStoreRequest, timeout time.Duration, ) (*pb.QueryMetaStoreResponse, error) { diff --git a/master/cluster/executor_client.go b/master/cluster/executor_client.go index f1fa4493381..d499750866c 100644 --- a/master/cluster/executor_client.go +++ b/master/cluster/executor_client.go @@ -36,6 +36,8 @@ func (c *executorClient) send(ctx context.Context, req *ExecutorRequest) (*Execu switch req.Cmd { case CmdSubmitBatchTasks: resp.Resp, err = c.client.SubmitBatchTasks(ctx, req.SubmitBatchTasks()) + case CmdCancelBatchTasks: + resp.Resp, err = c.client.CancelBatchTasks(ctx, req.CancelBatchTasks()) } if err != nil { log.L().Logger.Error("send req meet error", zap.Error(err)) @@ -72,6 +74,7 @@ type CmdType uint16 const ( CmdSubmitBatchTasks CmdType = 1 + iota + CmdCancelBatchTasks ) type ExecutorRequest struct { @@ -83,6 +86,10 @@ func (e *ExecutorRequest) SubmitBatchTasks() *pb.SubmitBatchTasksRequest { return e.Req.(*pb.SubmitBatchTasksRequest) } +func (e *ExecutorRequest) CancelBatchTasks() *pb.CancelBatchTasksRequest { + return e.Req.(*pb.CancelBatchTasksRequest) +} + type ExecutorResponse struct { Resp interface{} } diff --git a/master/jobmanager.go b/master/jobmanager.go index 23e0b4dbe00..517507181ca 100644 --- a/master/jobmanager.go +++ b/master/jobmanager.go @@ -49,6 +49,21 @@ func (j *JobManager) startImpl(ctx context.Context) { } } +func (j *JobManager) CancelJob(ctx context.Context, req *pb.CancelJobRequest) *pb.CancelJobResponse { + j.mu.Lock() + defer j.mu.Unlock() + job, ok := j.jobMasters[model.JobID(req.JobId)] + if !ok { + return &pb.CancelJobResponse{Err: &pb.Error{Message: "No such job"}} + } + err := job.Stop(ctx) + if err != nil { + return &pb.CancelJobResponse{Err: &pb.Error{Message: err.Error()}} + } + delete(j.jobMasters, model.JobID(req.JobId)) + return &pb.CancelJobResponse{} +} + // SubmitJob processes "SubmitJobRequest". func (j *JobManager) SubmitJob(ctx context.Context, req *pb.SubmitJobRequest) *pb.SubmitJobResponse { info := model.JobInfo{ diff --git a/master/jobmaster/benchmark/master.go b/master/jobmaster/benchmark/master.go index c17139f5067..f5fb63b2cdc 100644 --- a/master/jobmaster/benchmark/master.go +++ b/master/jobmaster/benchmark/master.go @@ -30,3 +30,16 @@ func (m *jobMaster) Start(ctx context.Context) error { // TODO: Start the tasks manager to communicate. return nil } + +func (m *jobMaster) Stop(ctx context.Context) error { + err := m.StopTasks(ctx, m.stage2) + if err != nil { + return err + } + err = m.StopTasks(ctx, m.stage1) + if err != nil { + return err + } + m.Cancel() + return nil +} diff --git a/master/jobmaster/system/impl.go b/master/jobmaster/system/impl.go index 068c13440c2..fdcce5be193 100644 --- a/master/jobmaster/system/impl.go +++ b/master/jobmaster/system/impl.go @@ -213,6 +213,42 @@ func (m *Master) DispatchTasks(ctx context.Context, tasks []*model.Task) error { return nil } +func (m *Master) StopTasks(ctx context.Context, tasks []*model.Task) error { + m.mu.Lock() + defer m.mu.Unlock() + arrange := make(map[model.ExecutorID][]int32) + for _, task := range tasks { + runningTask := m.runningTasks[task.ID] + li, ok := arrange[runningTask.exec] + if !ok { + arrange[runningTask.exec] = []int32{int32(task.ID)} + } else { + li = append(li, int32(task.ID)) + arrange[runningTask.exec] = li + } + } + var retErr error + for exec, taskList := range arrange { + req := &pb.CancelBatchTasksRequest{ + TaskIdList: taskList, + } + log.L().Info("begin to cancel tasks", zap.Int32("exec", int32(exec)), zap.Any("task", taskList)) + resp, err := m.client.Send(ctx, exec, &cluster.ExecutorRequest{ + Cmd: cluster.CmdCancelBatchTasks, + Req: req, + }) + if err != nil { + retErr = err + } else { + respErr := resp.Resp.(*pb.CancelBatchTasksResponse).Err + if respErr != nil { + retErr = stdErrors.New(respErr.Message) + } + } + } + return retErr +} + // Listen the events from every tasks func (m *Master) StartInternal() { // Register Listen Handler to Msg Servers diff --git a/master/jobmaster/system/interface.go b/master/jobmaster/system/interface.go index 60987695506..996dafc841b 100644 --- a/master/jobmaster/system/interface.go +++ b/master/jobmaster/system/interface.go @@ -12,6 +12,8 @@ type JobMaster interface { DispatchTasks(ctx context.Context, tasks []*model.Task) error // Start the job master. Start(ctx context.Context) error + // Stop the job master. + Stop(ctx context.Context) error // OfflineExecutor notifies the offlined executor to all the job masters. OfflineExecutor(eid model.ExecutorID) // ID returns the current job id. diff --git a/master/server.go b/master/server.go index be0c027adaa..9d136fca648 100644 --- a/master/server.go +++ b/master/server.go @@ -69,6 +69,10 @@ func (s *Server) SubmitJob(ctx context.Context, req *pb.SubmitJobRequest) (*pb.S return s.jobManager.SubmitJob(ctx, req), nil } +func (s *Server) CancelJob(ctx context.Context, req *pb.CancelJobRequest) (*pb.CancelJobResponse, error) { + return s.jobManager.CancelJob(ctx, req), nil +} + // RegisterExecutor implements grpc interface, and passes request onto executor manager. func (s *Server) RegisterExecutor(ctx context.Context, req *pb.RegisterExecutorRequest) (*pb.RegisterExecutorResponse, error) { // register executor to scheduler diff --git a/model/job.go b/model/job.go index 3cba0ee49fe..7e28c226f20 100644 --- a/model/job.go +++ b/model/job.go @@ -26,6 +26,7 @@ type Task struct { ID TaskID JobID JobID + Stage int Outputs []TaskID Inputs []TaskID diff --git a/pb/CDCPeerToPeer.pb.go b/pb/CDCPeerToPeer.pb.go deleted file mode 100644 index ff30d0d6ec5..00000000000 --- a/pb/CDCPeerToPeer.pb.go +++ /dev/null @@ -1,1552 +0,0 @@ -// Code generated by protoc-gen-gogo. DO NOT EDIT. -// source: CDCPeerToPeer.proto - -package pb - -import ( - context "context" - fmt "fmt" - io "io" - math "math" - math_bits "math/bits" - - proto "github.com/gogo/protobuf/proto" - grpc "google.golang.org/grpc" - codes "google.golang.org/grpc/codes" - status "google.golang.org/grpc/status" -) - -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = fmt.Errorf -var _ = math.Inf - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the proto package it is being compiled against. -// A compilation error at this line likely means your copy of the -// proto package needs to be updated. -const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package - -type ExitReason int32 - -const ( - ExitReason_NONE ExitReason = 0 - ExitReason_CONGESTED ExitReason = 1 - ExitReason_CAPTURE_SUICIDE ExitReason = 2 - ExitReason_STALE_CONNECTION ExitReason = 3 - ExitReason_DUPLICATE_CONNECTION ExitReason = 4 - ExitReason_CAPTURE_ID_MISMATCH ExitReason = 5 - ExitReason_OTHER ExitReason = 100 -) - -var ExitReason_name = map[int32]string{ - 0: "NONE", - 1: "CONGESTED", - 2: "CAPTURE_SUICIDE", - 3: "STALE_CONNECTION", - 4: "DUPLICATE_CONNECTION", - 5: "CAPTURE_ID_MISMATCH", - 100: "OTHER", -} - -var ExitReason_value = map[string]int32{ - "NONE": 0, - "CONGESTED": 1, - "CAPTURE_SUICIDE": 2, - "STALE_CONNECTION": 3, - "DUPLICATE_CONNECTION": 4, - "CAPTURE_ID_MISMATCH": 5, - "OTHER": 100, -} - -func (x ExitReason) String() string { - return proto.EnumName(ExitReason_name, int32(x)) -} - -func (ExitReason) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_6560df28dddfd2cc, []int{0} -} - -type MessageEntry struct { - Topic string `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` - Content []byte `protobuf:"bytes,2,opt,name=content,proto3" json:"content,omitempty"` - Sequence int64 `protobuf:"varint,3,opt,name=sequence,proto3" json:"sequence,omitempty"` -} - -func (m *MessageEntry) Reset() { *m = MessageEntry{} } -func (m *MessageEntry) String() string { return proto.CompactTextString(m) } -func (*MessageEntry) ProtoMessage() {} -func (*MessageEntry) Descriptor() ([]byte, []int) { - return fileDescriptor_6560df28dddfd2cc, []int{0} -} -func (m *MessageEntry) XXX_Unmarshal(b []byte) error { - return m.Unmarshal(b) -} -func (m *MessageEntry) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - if deterministic { - return xxx_messageInfo_MessageEntry.Marshal(b, m, deterministic) - } else { - b = b[:cap(b)] - n, err := m.MarshalToSizedBuffer(b) - if err != nil { - return nil, err - } - return b[:n], nil - } -} -func (m *MessageEntry) XXX_Merge(src proto.Message) { - xxx_messageInfo_MessageEntry.Merge(m, src) -} -func (m *MessageEntry) XXX_Size() int { - return m.Size() -} -func (m *MessageEntry) XXX_DiscardUnknown() { - xxx_messageInfo_MessageEntry.DiscardUnknown(m) -} - -var xxx_messageInfo_MessageEntry proto.InternalMessageInfo - -func (m *MessageEntry) GetTopic() string { - if m != nil { - return m.Topic - } - return "" -} - -func (m *MessageEntry) GetContent() []byte { - if m != nil { - return m.Content - } - return nil -} - -func (m *MessageEntry) GetSequence() int64 { - if m != nil { - return m.Sequence - } - return 0 -} - -type StreamMeta struct { - SenderId string `protobuf:"bytes,1,opt,name=sender_id,json=senderId,proto3" json:"sender_id,omitempty"` - ReceiverId string `protobuf:"bytes,2,opt,name=receiver_id,json=receiverId,proto3" json:"receiver_id,omitempty"` - Epoch int64 `protobuf:"varint,3,opt,name=epoch,proto3" json:"epoch,omitempty"` -} - -func (m *StreamMeta) Reset() { *m = StreamMeta{} } -func (m *StreamMeta) String() string { return proto.CompactTextString(m) } -func (*StreamMeta) ProtoMessage() {} -func (*StreamMeta) Descriptor() ([]byte, []int) { - return fileDescriptor_6560df28dddfd2cc, []int{1} -} -func (m *StreamMeta) XXX_Unmarshal(b []byte) error { - return m.Unmarshal(b) -} -func (m *StreamMeta) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - if deterministic { - return xxx_messageInfo_StreamMeta.Marshal(b, m, deterministic) - } else { - b = b[:cap(b)] - n, err := m.MarshalToSizedBuffer(b) - if err != nil { - return nil, err - } - return b[:n], nil - } -} -func (m *StreamMeta) XXX_Merge(src proto.Message) { - xxx_messageInfo_StreamMeta.Merge(m, src) -} -func (m *StreamMeta) XXX_Size() int { - return m.Size() -} -func (m *StreamMeta) XXX_DiscardUnknown() { - xxx_messageInfo_StreamMeta.DiscardUnknown(m) -} - -var xxx_messageInfo_StreamMeta proto.InternalMessageInfo - -func (m *StreamMeta) GetSenderId() string { - if m != nil { - return m.SenderId - } - return "" -} - -func (m *StreamMeta) GetReceiverId() string { - if m != nil { - return m.ReceiverId - } - return "" -} - -func (m *StreamMeta) GetEpoch() int64 { - if m != nil { - return m.Epoch - } - return 0 -} - -type MessagePacket struct { - StreamMeta *StreamMeta `protobuf:"bytes,1,opt,name=stream_meta,json=streamMeta,proto3" json:"stream_meta,omitempty"` - Entries []*MessageEntry `protobuf:"bytes,2,rep,name=entries,proto3" json:"entries,omitempty"` -} - -func (m *MessagePacket) Reset() { *m = MessagePacket{} } -func (m *MessagePacket) String() string { return proto.CompactTextString(m) } -func (*MessagePacket) ProtoMessage() {} -func (*MessagePacket) Descriptor() ([]byte, []int) { - return fileDescriptor_6560df28dddfd2cc, []int{2} -} -func (m *MessagePacket) XXX_Unmarshal(b []byte) error { - return m.Unmarshal(b) -} -func (m *MessagePacket) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - if deterministic { - return xxx_messageInfo_MessagePacket.Marshal(b, m, deterministic) - } else { - b = b[:cap(b)] - n, err := m.MarshalToSizedBuffer(b) - if err != nil { - return nil, err - } - return b[:n], nil - } -} -func (m *MessagePacket) XXX_Merge(src proto.Message) { - xxx_messageInfo_MessagePacket.Merge(m, src) -} -func (m *MessagePacket) XXX_Size() int { - return m.Size() -} -func (m *MessagePacket) XXX_DiscardUnknown() { - xxx_messageInfo_MessagePacket.DiscardUnknown(m) -} - -var xxx_messageInfo_MessagePacket proto.InternalMessageInfo - -func (m *MessagePacket) GetStreamMeta() *StreamMeta { - if m != nil { - return m.StreamMeta - } - return nil -} - -func (m *MessagePacket) GetEntries() []*MessageEntry { - if m != nil { - return m.Entries - } - return nil -} - -type Ack struct { - Topic string `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` - LastSeq int64 `protobuf:"varint,2,opt,name=last_seq,json=lastSeq,proto3" json:"last_seq,omitempty"` -} - -func (m *Ack) Reset() { *m = Ack{} } -func (m *Ack) String() string { return proto.CompactTextString(m) } -func (*Ack) ProtoMessage() {} -func (*Ack) Descriptor() ([]byte, []int) { - return fileDescriptor_6560df28dddfd2cc, []int{3} -} -func (m *Ack) XXX_Unmarshal(b []byte) error { - return m.Unmarshal(b) -} -func (m *Ack) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - if deterministic { - return xxx_messageInfo_Ack.Marshal(b, m, deterministic) - } else { - b = b[:cap(b)] - n, err := m.MarshalToSizedBuffer(b) - if err != nil { - return nil, err - } - return b[:n], nil - } -} -func (m *Ack) XXX_Merge(src proto.Message) { - xxx_messageInfo_Ack.Merge(m, src) -} -func (m *Ack) XXX_Size() int { - return m.Size() -} -func (m *Ack) XXX_DiscardUnknown() { - xxx_messageInfo_Ack.DiscardUnknown(m) -} - -var xxx_messageInfo_Ack proto.InternalMessageInfo - -func (m *Ack) GetTopic() string { - if m != nil { - return m.Topic - } - return "" -} - -func (m *Ack) GetLastSeq() int64 { - if m != nil { - return m.LastSeq - } - return 0 -} - -type SendMessageResponse struct { - Ack []*Ack `protobuf:"bytes,1,rep,name=ack,proto3" json:"ack,omitempty"` - ExitReason ExitReason `protobuf:"varint,2,opt,name=exit_reason,json=exitReason,proto3,enum=pb.ExitReason" json:"exit_reason,omitempty"` - ErrorMessage string `protobuf:"bytes,3,opt,name=error_message,json=errorMessage,proto3" json:"error_message,omitempty"` -} - -func (m *SendMessageResponse) Reset() { *m = SendMessageResponse{} } -func (m *SendMessageResponse) String() string { return proto.CompactTextString(m) } -func (*SendMessageResponse) ProtoMessage() {} -func (*SendMessageResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_6560df28dddfd2cc, []int{4} -} -func (m *SendMessageResponse) XXX_Unmarshal(b []byte) error { - return m.Unmarshal(b) -} -func (m *SendMessageResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - if deterministic { - return xxx_messageInfo_SendMessageResponse.Marshal(b, m, deterministic) - } else { - b = b[:cap(b)] - n, err := m.MarshalToSizedBuffer(b) - if err != nil { - return nil, err - } - return b[:n], nil - } -} -func (m *SendMessageResponse) XXX_Merge(src proto.Message) { - xxx_messageInfo_SendMessageResponse.Merge(m, src) -} -func (m *SendMessageResponse) XXX_Size() int { - return m.Size() -} -func (m *SendMessageResponse) XXX_DiscardUnknown() { - xxx_messageInfo_SendMessageResponse.DiscardUnknown(m) -} - -var xxx_messageInfo_SendMessageResponse proto.InternalMessageInfo - -func (m *SendMessageResponse) GetAck() []*Ack { - if m != nil { - return m.Ack - } - return nil -} - -func (m *SendMessageResponse) GetExitReason() ExitReason { - if m != nil { - return m.ExitReason - } - return ExitReason_NONE -} - -func (m *SendMessageResponse) GetErrorMessage() string { - if m != nil { - return m.ErrorMessage - } - return "" -} - -func init() { - proto.RegisterEnum("pb.ExitReason", ExitReason_name, ExitReason_value) - proto.RegisterType((*MessageEntry)(nil), "pb.MessageEntry") - proto.RegisterType((*StreamMeta)(nil), "pb.StreamMeta") - proto.RegisterType((*MessagePacket)(nil), "pb.MessagePacket") - proto.RegisterType((*Ack)(nil), "pb.Ack") - proto.RegisterType((*SendMessageResponse)(nil), "pb.SendMessageResponse") -} - -func init() { proto.RegisterFile("CDCPeerToPeer.proto", fileDescriptor_6560df28dddfd2cc) } - -var fileDescriptor_6560df28dddfd2cc = []byte{ - // 516 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x52, 0x41, 0x8f, 0xd2, 0x40, - 0x14, 0x66, 0xe8, 0x22, 0xf0, 0x80, 0xb5, 0x0e, 0x24, 0xdb, 0xc5, 0xa4, 0x12, 0xbc, 0x90, 0x3d, - 0xa0, 0xc1, 0xc4, 0x9b, 0x87, 0x5a, 0x1a, 0x69, 0xb2, 0x14, 0x32, 0x2d, 0x17, 0x2f, 0xb5, 0xb4, - 0x2f, 0xda, 0xb0, 0xb4, 0xa5, 0x33, 0x9a, 0xf5, 0x0f, 0x78, 0x34, 0xfe, 0x2c, 0x8f, 0x7b, 0xf4, - 0x68, 0xe0, 0x8f, 0x98, 0xb6, 0xb0, 0xb0, 0x31, 0x7b, 0x69, 0xfa, 0x7d, 0x6f, 0xe6, 0x7d, 0xdf, - 0x7c, 0xef, 0x41, 0x5b, 0x1f, 0xeb, 0x73, 0xc4, 0xd4, 0x89, 0xb3, 0xef, 0x30, 0x49, 0x63, 0x11, - 0xd3, 0x72, 0xb2, 0xec, 0x7f, 0x84, 0xe6, 0x14, 0x39, 0xf7, 0x3e, 0xa3, 0x11, 0x89, 0xf4, 0x3b, - 0xed, 0x40, 0x45, 0xc4, 0x49, 0xe8, 0x2b, 0xa4, 0x47, 0x06, 0x75, 0x56, 0x00, 0xaa, 0x40, 0xd5, - 0x8f, 0x23, 0x81, 0x91, 0x50, 0xca, 0x3d, 0x32, 0x68, 0xb2, 0x03, 0xa4, 0x5d, 0xa8, 0x71, 0xdc, - 0x7c, 0xc5, 0xc8, 0x47, 0x45, 0xea, 0x91, 0x81, 0xc4, 0xee, 0x71, 0xff, 0x13, 0x80, 0x2d, 0x52, - 0xf4, 0xd6, 0x53, 0x14, 0x1e, 0x7d, 0x0e, 0x75, 0x8e, 0x51, 0x80, 0xa9, 0x1b, 0x06, 0xfb, 0xee, - 0xb5, 0x82, 0x30, 0x03, 0xfa, 0x02, 0x1a, 0x29, 0xfa, 0x18, 0x7e, 0x2b, 0xca, 0xe5, 0xbc, 0x0c, - 0x07, 0xca, 0x0c, 0x32, 0x5f, 0x98, 0xc4, 0xfe, 0x97, 0xbd, 0x48, 0x01, 0xfa, 0x37, 0xd0, 0xda, - 0xbb, 0x9f, 0x7b, 0xfe, 0x0a, 0x05, 0x7d, 0x05, 0x0d, 0x9e, 0x4b, 0xba, 0x6b, 0x14, 0x5e, 0x2e, - 0xd3, 0x18, 0x9d, 0x0f, 0x93, 0xe5, 0xf0, 0xe8, 0x84, 0x01, 0x3f, 0xba, 0xba, 0x82, 0x2a, 0x46, - 0x22, 0x0d, 0x91, 0x2b, 0xe5, 0x9e, 0x34, 0x68, 0x8c, 0xe4, 0xec, 0xf0, 0x69, 0x24, 0xec, 0x70, - 0xa0, 0xff, 0x16, 0x24, 0xcd, 0x5f, 0x3d, 0x12, 0xd1, 0x25, 0xd4, 0x6e, 0x3c, 0x2e, 0x5c, 0x8e, - 0x9b, 0xdc, 0xbe, 0xc4, 0xaa, 0x19, 0xb6, 0x71, 0xd3, 0xff, 0x41, 0xa0, 0x6d, 0x63, 0x14, 0xec, - 0xbb, 0x32, 0xe4, 0x49, 0x1c, 0x71, 0xa4, 0x97, 0x20, 0x79, 0xfe, 0x4a, 0x21, 0xb9, 0x6e, 0x35, - 0xd3, 0xd5, 0xfc, 0x15, 0xcb, 0xb8, 0xec, 0x1d, 0x78, 0x1b, 0x0a, 0x37, 0x45, 0x8f, 0xc7, 0x51, - 0xde, 0xf0, 0xbc, 0x78, 0x87, 0x71, 0x1b, 0x0a, 0x96, 0xb3, 0x0c, 0xf0, 0xfe, 0x9f, 0xbe, 0x84, - 0x16, 0xa6, 0x69, 0x9c, 0xba, 0xeb, 0x42, 0x24, 0xcf, 0xa9, 0xce, 0x9a, 0x39, 0xb9, 0x17, 0xbe, - 0xfa, 0x49, 0x00, 0x8e, 0xf7, 0x69, 0x0d, 0xce, 0xac, 0x99, 0x65, 0xc8, 0x25, 0xda, 0x82, 0xba, - 0x3e, 0xb3, 0x3e, 0x18, 0xb6, 0x63, 0x8c, 0x65, 0x42, 0xdb, 0xf0, 0x54, 0xd7, 0xe6, 0xce, 0x82, - 0x19, 0xae, 0xbd, 0x30, 0x75, 0x73, 0x6c, 0xc8, 0x65, 0xda, 0x01, 0xd9, 0x76, 0xb4, 0x6b, 0xc3, - 0xd5, 0x67, 0x96, 0x65, 0xe8, 0x8e, 0x39, 0xb3, 0x64, 0x89, 0x2a, 0xd0, 0x19, 0x2f, 0xe6, 0xd7, - 0xa6, 0xae, 0x39, 0x0f, 0x2a, 0x67, 0xf4, 0x02, 0xda, 0x87, 0x26, 0xe6, 0xd8, 0x9d, 0x9a, 0xf6, - 0x54, 0x73, 0xf4, 0x89, 0x5c, 0xa1, 0x75, 0xa8, 0xcc, 0x9c, 0x89, 0xc1, 0xe4, 0x60, 0x64, 0x41, - 0xeb, 0xc1, 0x62, 0xd2, 0x77, 0xd0, 0x38, 0x49, 0x8a, 0x3e, 0x3b, 0x19, 0x46, 0x31, 0xe1, 0xee, - 0x45, 0x3e, 0xcc, 0xff, 0xd3, 0x1c, 0x90, 0xd7, 0xe4, 0x7d, 0xf7, 0xf7, 0x56, 0x25, 0x77, 0x5b, - 0x95, 0xfc, 0xdd, 0xaa, 0xe4, 0xd7, 0x4e, 0x2d, 0xdd, 0xed, 0xd4, 0xd2, 0x9f, 0x9d, 0x5a, 0x9a, - 0x90, 0xe5, 0x93, 0x7c, 0xe9, 0xdf, 0xfc, 0x0b, 0x00, 0x00, 0xff, 0xff, 0x77, 0x3b, 0xd5, 0xb6, - 0x0b, 0x03, 0x00, 0x00, -} - -// Reference imports to suppress errors if they are not otherwise used. -var _ context.Context -var _ grpc.ClientConn - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the grpc package it is being compiled against. -const _ = grpc.SupportPackageIsVersion4 - -// CDCPeerToPeerClient is the client API for CDCPeerToPeer service. -// -// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. -type CDCPeerToPeerClient interface { - SendMessage(ctx context.Context, opts ...grpc.CallOption) (CDCPeerToPeer_SendMessageClient, error) -} - -type cDCPeerToPeerClient struct { - cc *grpc.ClientConn -} - -func NewCDCPeerToPeerClient(cc *grpc.ClientConn) CDCPeerToPeerClient { - return &cDCPeerToPeerClient{cc} -} - -func (c *cDCPeerToPeerClient) SendMessage(ctx context.Context, opts ...grpc.CallOption) (CDCPeerToPeer_SendMessageClient, error) { - stream, err := c.cc.NewStream(ctx, &_CDCPeerToPeer_serviceDesc.Streams[0], "/pb.CDCPeerToPeer/SendMessage", opts...) - if err != nil { - return nil, err - } - x := &cDCPeerToPeerSendMessageClient{stream} - return x, nil -} - -type CDCPeerToPeer_SendMessageClient interface { - Send(*MessagePacket) error - Recv() (*SendMessageResponse, error) - grpc.ClientStream -} - -type cDCPeerToPeerSendMessageClient struct { - grpc.ClientStream -} - -func (x *cDCPeerToPeerSendMessageClient) Send(m *MessagePacket) error { - return x.ClientStream.SendMsg(m) -} - -func (x *cDCPeerToPeerSendMessageClient) Recv() (*SendMessageResponse, error) { - m := new(SendMessageResponse) - if err := x.ClientStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - -// CDCPeerToPeerServer is the server API for CDCPeerToPeer service. -type CDCPeerToPeerServer interface { - SendMessage(CDCPeerToPeer_SendMessageServer) error -} - -// UnimplementedCDCPeerToPeerServer can be embedded to have forward compatible implementations. -type UnimplementedCDCPeerToPeerServer struct { -} - -func (*UnimplementedCDCPeerToPeerServer) SendMessage(srv CDCPeerToPeer_SendMessageServer) error { - return status.Errorf(codes.Unimplemented, "method SendMessage not implemented") -} - -func RegisterCDCPeerToPeerServer(s *grpc.Server, srv CDCPeerToPeerServer) { - s.RegisterService(&_CDCPeerToPeer_serviceDesc, srv) -} - -func _CDCPeerToPeer_SendMessage_Handler(srv interface{}, stream grpc.ServerStream) error { - return srv.(CDCPeerToPeerServer).SendMessage(&cDCPeerToPeerSendMessageServer{stream}) -} - -type CDCPeerToPeer_SendMessageServer interface { - Send(*SendMessageResponse) error - Recv() (*MessagePacket, error) - grpc.ServerStream -} - -type cDCPeerToPeerSendMessageServer struct { - grpc.ServerStream -} - -func (x *cDCPeerToPeerSendMessageServer) Send(m *SendMessageResponse) error { - return x.ServerStream.SendMsg(m) -} - -func (x *cDCPeerToPeerSendMessageServer) Recv() (*MessagePacket, error) { - m := new(MessagePacket) - if err := x.ServerStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - -var _CDCPeerToPeer_serviceDesc = grpc.ServiceDesc{ - ServiceName: "pb.CDCPeerToPeer", - HandlerType: (*CDCPeerToPeerServer)(nil), - Methods: []grpc.MethodDesc{}, - Streams: []grpc.StreamDesc{ - { - StreamName: "SendMessage", - Handler: _CDCPeerToPeer_SendMessage_Handler, - ServerStreams: true, - ClientStreams: true, - }, - }, - Metadata: "CDCPeerToPeer.proto", -} - -func (m *MessageEntry) Marshal() (dAtA []byte, err error) { - size := m.Size() - dAtA = make([]byte, size) - n, err := m.MarshalToSizedBuffer(dAtA[:size]) - if err != nil { - return nil, err - } - return dAtA[:n], nil -} - -func (m *MessageEntry) MarshalTo(dAtA []byte) (int, error) { - size := m.Size() - return m.MarshalToSizedBuffer(dAtA[:size]) -} - -func (m *MessageEntry) MarshalToSizedBuffer(dAtA []byte) (int, error) { - i := len(dAtA) - _ = i - var l int - _ = l - if m.Sequence != 0 { - i = encodeVarintCDCPeerToPeer(dAtA, i, uint64(m.Sequence)) - i-- - dAtA[i] = 0x18 - } - if len(m.Content) > 0 { - i -= len(m.Content) - copy(dAtA[i:], m.Content) - i = encodeVarintCDCPeerToPeer(dAtA, i, uint64(len(m.Content))) - i-- - dAtA[i] = 0x12 - } - if len(m.Topic) > 0 { - i -= len(m.Topic) - copy(dAtA[i:], m.Topic) - i = encodeVarintCDCPeerToPeer(dAtA, i, uint64(len(m.Topic))) - i-- - dAtA[i] = 0xa - } - return len(dAtA) - i, nil -} - -func (m *StreamMeta) Marshal() (dAtA []byte, err error) { - size := m.Size() - dAtA = make([]byte, size) - n, err := m.MarshalToSizedBuffer(dAtA[:size]) - if err != nil { - return nil, err - } - return dAtA[:n], nil -} - -func (m *StreamMeta) MarshalTo(dAtA []byte) (int, error) { - size := m.Size() - return m.MarshalToSizedBuffer(dAtA[:size]) -} - -func (m *StreamMeta) MarshalToSizedBuffer(dAtA []byte) (int, error) { - i := len(dAtA) - _ = i - var l int - _ = l - if m.Epoch != 0 { - i = encodeVarintCDCPeerToPeer(dAtA, i, uint64(m.Epoch)) - i-- - dAtA[i] = 0x18 - } - if len(m.ReceiverId) > 0 { - i -= len(m.ReceiverId) - copy(dAtA[i:], m.ReceiverId) - i = encodeVarintCDCPeerToPeer(dAtA, i, uint64(len(m.ReceiverId))) - i-- - dAtA[i] = 0x12 - } - if len(m.SenderId) > 0 { - i -= len(m.SenderId) - copy(dAtA[i:], m.SenderId) - i = encodeVarintCDCPeerToPeer(dAtA, i, uint64(len(m.SenderId))) - i-- - dAtA[i] = 0xa - } - return len(dAtA) - i, nil -} - -func (m *MessagePacket) Marshal() (dAtA []byte, err error) { - size := m.Size() - dAtA = make([]byte, size) - n, err := m.MarshalToSizedBuffer(dAtA[:size]) - if err != nil { - return nil, err - } - return dAtA[:n], nil -} - -func (m *MessagePacket) MarshalTo(dAtA []byte) (int, error) { - size := m.Size() - return m.MarshalToSizedBuffer(dAtA[:size]) -} - -func (m *MessagePacket) MarshalToSizedBuffer(dAtA []byte) (int, error) { - i := len(dAtA) - _ = i - var l int - _ = l - if len(m.Entries) > 0 { - for iNdEx := len(m.Entries) - 1; iNdEx >= 0; iNdEx-- { - { - size, err := m.Entries[iNdEx].MarshalToSizedBuffer(dAtA[:i]) - if err != nil { - return 0, err - } - i -= size - i = encodeVarintCDCPeerToPeer(dAtA, i, uint64(size)) - } - i-- - dAtA[i] = 0x12 - } - } - if m.StreamMeta != nil { - { - size, err := m.StreamMeta.MarshalToSizedBuffer(dAtA[:i]) - if err != nil { - return 0, err - } - i -= size - i = encodeVarintCDCPeerToPeer(dAtA, i, uint64(size)) - } - i-- - dAtA[i] = 0xa - } - return len(dAtA) - i, nil -} - -func (m *Ack) Marshal() (dAtA []byte, err error) { - size := m.Size() - dAtA = make([]byte, size) - n, err := m.MarshalToSizedBuffer(dAtA[:size]) - if err != nil { - return nil, err - } - return dAtA[:n], nil -} - -func (m *Ack) MarshalTo(dAtA []byte) (int, error) { - size := m.Size() - return m.MarshalToSizedBuffer(dAtA[:size]) -} - -func (m *Ack) MarshalToSizedBuffer(dAtA []byte) (int, error) { - i := len(dAtA) - _ = i - var l int - _ = l - if m.LastSeq != 0 { - i = encodeVarintCDCPeerToPeer(dAtA, i, uint64(m.LastSeq)) - i-- - dAtA[i] = 0x10 - } - if len(m.Topic) > 0 { - i -= len(m.Topic) - copy(dAtA[i:], m.Topic) - i = encodeVarintCDCPeerToPeer(dAtA, i, uint64(len(m.Topic))) - i-- - dAtA[i] = 0xa - } - return len(dAtA) - i, nil -} - -func (m *SendMessageResponse) Marshal() (dAtA []byte, err error) { - size := m.Size() - dAtA = make([]byte, size) - n, err := m.MarshalToSizedBuffer(dAtA[:size]) - if err != nil { - return nil, err - } - return dAtA[:n], nil -} - -func (m *SendMessageResponse) MarshalTo(dAtA []byte) (int, error) { - size := m.Size() - return m.MarshalToSizedBuffer(dAtA[:size]) -} - -func (m *SendMessageResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { - i := len(dAtA) - _ = i - var l int - _ = l - if len(m.ErrorMessage) > 0 { - i -= len(m.ErrorMessage) - copy(dAtA[i:], m.ErrorMessage) - i = encodeVarintCDCPeerToPeer(dAtA, i, uint64(len(m.ErrorMessage))) - i-- - dAtA[i] = 0x1a - } - if m.ExitReason != 0 { - i = encodeVarintCDCPeerToPeer(dAtA, i, uint64(m.ExitReason)) - i-- - dAtA[i] = 0x10 - } - if len(m.Ack) > 0 { - for iNdEx := len(m.Ack) - 1; iNdEx >= 0; iNdEx-- { - { - size, err := m.Ack[iNdEx].MarshalToSizedBuffer(dAtA[:i]) - if err != nil { - return 0, err - } - i -= size - i = encodeVarintCDCPeerToPeer(dAtA, i, uint64(size)) - } - i-- - dAtA[i] = 0xa - } - } - return len(dAtA) - i, nil -} - -func encodeVarintCDCPeerToPeer(dAtA []byte, offset int, v uint64) int { - offset -= sovCDCPeerToPeer(v) - base := offset - for v >= 1<<7 { - dAtA[offset] = uint8(v&0x7f | 0x80) - v >>= 7 - offset++ - } - dAtA[offset] = uint8(v) - return base -} -func (m *MessageEntry) Size() (n int) { - if m == nil { - return 0 - } - var l int - _ = l - l = len(m.Topic) - if l > 0 { - n += 1 + l + sovCDCPeerToPeer(uint64(l)) - } - l = len(m.Content) - if l > 0 { - n += 1 + l + sovCDCPeerToPeer(uint64(l)) - } - if m.Sequence != 0 { - n += 1 + sovCDCPeerToPeer(uint64(m.Sequence)) - } - return n -} - -func (m *StreamMeta) Size() (n int) { - if m == nil { - return 0 - } - var l int - _ = l - l = len(m.SenderId) - if l > 0 { - n += 1 + l + sovCDCPeerToPeer(uint64(l)) - } - l = len(m.ReceiverId) - if l > 0 { - n += 1 + l + sovCDCPeerToPeer(uint64(l)) - } - if m.Epoch != 0 { - n += 1 + sovCDCPeerToPeer(uint64(m.Epoch)) - } - return n -} - -func (m *MessagePacket) Size() (n int) { - if m == nil { - return 0 - } - var l int - _ = l - if m.StreamMeta != nil { - l = m.StreamMeta.Size() - n += 1 + l + sovCDCPeerToPeer(uint64(l)) - } - if len(m.Entries) > 0 { - for _, e := range m.Entries { - l = e.Size() - n += 1 + l + sovCDCPeerToPeer(uint64(l)) - } - } - return n -} - -func (m *Ack) Size() (n int) { - if m == nil { - return 0 - } - var l int - _ = l - l = len(m.Topic) - if l > 0 { - n += 1 + l + sovCDCPeerToPeer(uint64(l)) - } - if m.LastSeq != 0 { - n += 1 + sovCDCPeerToPeer(uint64(m.LastSeq)) - } - return n -} - -func (m *SendMessageResponse) Size() (n int) { - if m == nil { - return 0 - } - var l int - _ = l - if len(m.Ack) > 0 { - for _, e := range m.Ack { - l = e.Size() - n += 1 + l + sovCDCPeerToPeer(uint64(l)) - } - } - if m.ExitReason != 0 { - n += 1 + sovCDCPeerToPeer(uint64(m.ExitReason)) - } - l = len(m.ErrorMessage) - if l > 0 { - n += 1 + l + sovCDCPeerToPeer(uint64(l)) - } - return n -} - -func sovCDCPeerToPeer(x uint64) (n int) { - return (math_bits.Len64(x|1) + 6) / 7 -} -func sozCDCPeerToPeer(x uint64) (n int) { - return sovCDCPeerToPeer(uint64((x << 1) ^ uint64((int64(x) >> 63)))) -} -func (m *MessageEntry) Unmarshal(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowCDCPeerToPeer - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: MessageEntry: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: MessageEntry: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Topic", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowCDCPeerToPeer - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLen |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLengthCDCPeerToPeer - } - postIndex := iNdEx + intStringLen - if postIndex < 0 { - return ErrInvalidLengthCDCPeerToPeer - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Topic = string(dAtA[iNdEx:postIndex]) - iNdEx = postIndex - case 2: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Content", wireType) - } - var byteLen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowCDCPeerToPeer - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - byteLen |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - if byteLen < 0 { - return ErrInvalidLengthCDCPeerToPeer - } - postIndex := iNdEx + byteLen - if postIndex < 0 { - return ErrInvalidLengthCDCPeerToPeer - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Content = append(m.Content[:0], dAtA[iNdEx:postIndex]...) - if m.Content == nil { - m.Content = []byte{} - } - iNdEx = postIndex - case 3: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field Sequence", wireType) - } - m.Sequence = 0 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowCDCPeerToPeer - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - m.Sequence |= int64(b&0x7F) << shift - if b < 0x80 { - break - } - } - default: - iNdEx = preIndex - skippy, err := skipCDCPeerToPeer(dAtA[iNdEx:]) - if err != nil { - return err - } - if (skippy < 0) || (iNdEx+skippy) < 0 { - return ErrInvalidLengthCDCPeerToPeer - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} -func (m *StreamMeta) Unmarshal(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowCDCPeerToPeer - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: StreamMeta: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: StreamMeta: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field SenderId", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowCDCPeerToPeer - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLen |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLengthCDCPeerToPeer - } - postIndex := iNdEx + intStringLen - if postIndex < 0 { - return ErrInvalidLengthCDCPeerToPeer - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.SenderId = string(dAtA[iNdEx:postIndex]) - iNdEx = postIndex - case 2: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field ReceiverId", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowCDCPeerToPeer - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLen |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLengthCDCPeerToPeer - } - postIndex := iNdEx + intStringLen - if postIndex < 0 { - return ErrInvalidLengthCDCPeerToPeer - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.ReceiverId = string(dAtA[iNdEx:postIndex]) - iNdEx = postIndex - case 3: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field Epoch", wireType) - } - m.Epoch = 0 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowCDCPeerToPeer - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - m.Epoch |= int64(b&0x7F) << shift - if b < 0x80 { - break - } - } - default: - iNdEx = preIndex - skippy, err := skipCDCPeerToPeer(dAtA[iNdEx:]) - if err != nil { - return err - } - if (skippy < 0) || (iNdEx+skippy) < 0 { - return ErrInvalidLengthCDCPeerToPeer - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} -func (m *MessagePacket) Unmarshal(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowCDCPeerToPeer - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: MessagePacket: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: MessagePacket: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field StreamMeta", wireType) - } - var msglen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowCDCPeerToPeer - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - msglen |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - if msglen < 0 { - return ErrInvalidLengthCDCPeerToPeer - } - postIndex := iNdEx + msglen - if postIndex < 0 { - return ErrInvalidLengthCDCPeerToPeer - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - if m.StreamMeta == nil { - m.StreamMeta = &StreamMeta{} - } - if err := m.StreamMeta.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { - return err - } - iNdEx = postIndex - case 2: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Entries", wireType) - } - var msglen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowCDCPeerToPeer - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - msglen |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - if msglen < 0 { - return ErrInvalidLengthCDCPeerToPeer - } - postIndex := iNdEx + msglen - if postIndex < 0 { - return ErrInvalidLengthCDCPeerToPeer - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Entries = append(m.Entries, &MessageEntry{}) - if err := m.Entries[len(m.Entries)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { - return err - } - iNdEx = postIndex - default: - iNdEx = preIndex - skippy, err := skipCDCPeerToPeer(dAtA[iNdEx:]) - if err != nil { - return err - } - if (skippy < 0) || (iNdEx+skippy) < 0 { - return ErrInvalidLengthCDCPeerToPeer - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} -func (m *Ack) Unmarshal(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowCDCPeerToPeer - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: Ack: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: Ack: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Topic", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowCDCPeerToPeer - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLen |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLengthCDCPeerToPeer - } - postIndex := iNdEx + intStringLen - if postIndex < 0 { - return ErrInvalidLengthCDCPeerToPeer - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Topic = string(dAtA[iNdEx:postIndex]) - iNdEx = postIndex - case 2: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field LastSeq", wireType) - } - m.LastSeq = 0 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowCDCPeerToPeer - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - m.LastSeq |= int64(b&0x7F) << shift - if b < 0x80 { - break - } - } - default: - iNdEx = preIndex - skippy, err := skipCDCPeerToPeer(dAtA[iNdEx:]) - if err != nil { - return err - } - if (skippy < 0) || (iNdEx+skippy) < 0 { - return ErrInvalidLengthCDCPeerToPeer - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} -func (m *SendMessageResponse) Unmarshal(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowCDCPeerToPeer - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: SendMessageResponse: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: SendMessageResponse: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Ack", wireType) - } - var msglen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowCDCPeerToPeer - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - msglen |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - if msglen < 0 { - return ErrInvalidLengthCDCPeerToPeer - } - postIndex := iNdEx + msglen - if postIndex < 0 { - return ErrInvalidLengthCDCPeerToPeer - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Ack = append(m.Ack, &Ack{}) - if err := m.Ack[len(m.Ack)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { - return err - } - iNdEx = postIndex - case 2: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field ExitReason", wireType) - } - m.ExitReason = 0 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowCDCPeerToPeer - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - m.ExitReason |= ExitReason(b&0x7F) << shift - if b < 0x80 { - break - } - } - case 3: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field ErrorMessage", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowCDCPeerToPeer - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLen |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLengthCDCPeerToPeer - } - postIndex := iNdEx + intStringLen - if postIndex < 0 { - return ErrInvalidLengthCDCPeerToPeer - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.ErrorMessage = string(dAtA[iNdEx:postIndex]) - iNdEx = postIndex - default: - iNdEx = preIndex - skippy, err := skipCDCPeerToPeer(dAtA[iNdEx:]) - if err != nil { - return err - } - if (skippy < 0) || (iNdEx+skippy) < 0 { - return ErrInvalidLengthCDCPeerToPeer - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} -func skipCDCPeerToPeer(dAtA []byte) (n int, err error) { - l := len(dAtA) - iNdEx := 0 - depth := 0 - for iNdEx < l { - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return 0, ErrIntOverflowCDCPeerToPeer - } - if iNdEx >= l { - return 0, io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= (uint64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - wireType := int(wire & 0x7) - switch wireType { - case 0: - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return 0, ErrIntOverflowCDCPeerToPeer - } - if iNdEx >= l { - return 0, io.ErrUnexpectedEOF - } - iNdEx++ - if dAtA[iNdEx-1] < 0x80 { - break - } - } - case 1: - iNdEx += 8 - case 2: - var length int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return 0, ErrIntOverflowCDCPeerToPeer - } - if iNdEx >= l { - return 0, io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - length |= (int(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - if length < 0 { - return 0, ErrInvalidLengthCDCPeerToPeer - } - iNdEx += length - case 3: - depth++ - case 4: - if depth == 0 { - return 0, ErrUnexpectedEndOfGroupCDCPeerToPeer - } - depth-- - case 5: - iNdEx += 4 - default: - return 0, fmt.Errorf("proto: illegal wireType %d", wireType) - } - if iNdEx < 0 { - return 0, ErrInvalidLengthCDCPeerToPeer - } - if depth == 0 { - return iNdEx, nil - } - } - return 0, io.ErrUnexpectedEOF -} - -var ( - ErrInvalidLengthCDCPeerToPeer = fmt.Errorf("proto: negative length found during unmarshaling") - ErrIntOverflowCDCPeerToPeer = fmt.Errorf("proto: integer overflow") - ErrUnexpectedEndOfGroupCDCPeerToPeer = fmt.Errorf("proto: unexpected end of group") -) diff --git a/pb/master.pb.go b/pb/master.pb.go index 3cfeb588735..7ad005e3bb4 100644 --- a/pb/master.pb.go +++ b/pb/master.pb.go @@ -237,6 +237,50 @@ func (m *SubmitJobRequest) GetUser() string { return "" } +type CancelJobRequest struct { + JobId int32 `protobuf:"varint,1,opt,name=job_id,json=jobId,proto3" json:"job_id,omitempty"` +} + +func (m *CancelJobRequest) Reset() { *m = CancelJobRequest{} } +func (m *CancelJobRequest) String() string { return proto.CompactTextString(m) } +func (*CancelJobRequest) ProtoMessage() {} +func (*CancelJobRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_f9c348dec43a6705, []int{3} +} +func (m *CancelJobRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *CancelJobRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_CancelJobRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *CancelJobRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_CancelJobRequest.Merge(m, src) +} +func (m *CancelJobRequest) XXX_Size() int { + return m.Size() +} +func (m *CancelJobRequest) XXX_DiscardUnknown() { + xxx_messageInfo_CancelJobRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_CancelJobRequest proto.InternalMessageInfo + +func (m *CancelJobRequest) GetJobId() int32 { + if m != nil { + return m.JobId + } + return 0 +} + type SubmitJobResponse struct { Err *Error `protobuf:"bytes,1,opt,name=err,proto3" json:"err,omitempty"` JobId int32 `protobuf:"varint,2,opt,name=job_id,json=jobId,proto3" json:"job_id,omitempty"` @@ -246,7 +290,7 @@ func (m *SubmitJobResponse) Reset() { *m = SubmitJobResponse{} } func (m *SubmitJobResponse) String() string { return proto.CompactTextString(m) } func (*SubmitJobResponse) ProtoMessage() {} func (*SubmitJobResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_f9c348dec43a6705, []int{3} + return fileDescriptor_f9c348dec43a6705, []int{4} } func (m *SubmitJobResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -289,6 +333,50 @@ func (m *SubmitJobResponse) GetJobId() int32 { return 0 } +type CancelJobResponse struct { + Err *Error `protobuf:"bytes,1,opt,name=err,proto3" json:"err,omitempty"` +} + +func (m *CancelJobResponse) Reset() { *m = CancelJobResponse{} } +func (m *CancelJobResponse) String() string { return proto.CompactTextString(m) } +func (*CancelJobResponse) ProtoMessage() {} +func (*CancelJobResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_f9c348dec43a6705, []int{5} +} +func (m *CancelJobResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *CancelJobResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_CancelJobResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *CancelJobResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_CancelJobResponse.Merge(m, src) +} +func (m *CancelJobResponse) XXX_Size() int { + return m.Size() +} +func (m *CancelJobResponse) XXX_DiscardUnknown() { + xxx_messageInfo_CancelJobResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_CancelJobResponse proto.InternalMessageInfo + +func (m *CancelJobResponse) GetErr() *Error { + if m != nil { + return m.Err + } + return nil +} + type RegisterExecutorRequest struct { // dm need 'worker-name' to locate the worker. // TODO: Do we really need a "worker name"? Can we use address to identify an executor? @@ -301,7 +389,7 @@ func (m *RegisterExecutorRequest) Reset() { *m = RegisterExecutorRequest func (m *RegisterExecutorRequest) String() string { return proto.CompactTextString(m) } func (*RegisterExecutorRequest) ProtoMessage() {} func (*RegisterExecutorRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_f9c348dec43a6705, []int{4} + return fileDescriptor_f9c348dec43a6705, []int{6} } func (m *RegisterExecutorRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -360,7 +448,7 @@ func (m *RegisterExecutorResponse) Reset() { *m = RegisterExecutorRespon func (m *RegisterExecutorResponse) String() string { return proto.CompactTextString(m) } func (*RegisterExecutorResponse) ProtoMessage() {} func (*RegisterExecutorResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_f9c348dec43a6705, []int{5} + return fileDescriptor_f9c348dec43a6705, []int{7} } func (m *RegisterExecutorResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -413,7 +501,7 @@ func (m *ScheduleTask) Reset() { *m = ScheduleTask{} } func (m *ScheduleTask) String() string { return proto.CompactTextString(m) } func (*ScheduleTask) ProtoMessage() {} func (*ScheduleTask) Descriptor() ([]byte, []int) { - return fileDescriptor_f9c348dec43a6705, []int{6} + return fileDescriptor_f9c348dec43a6705, []int{8} } func (m *ScheduleTask) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -474,7 +562,7 @@ func (m *TaskSchedulerRequest) Reset() { *m = TaskSchedulerRequest{} } func (m *TaskSchedulerRequest) String() string { return proto.CompactTextString(m) } func (*TaskSchedulerRequest) ProtoMessage() {} func (*TaskSchedulerRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_f9c348dec43a6705, []int{7} + return fileDescriptor_f9c348dec43a6705, []int{9} } func (m *TaskSchedulerRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -520,7 +608,7 @@ func (m *ScheduleResult) Reset() { *m = ScheduleResult{} } func (m *ScheduleResult) String() string { return proto.CompactTextString(m) } func (*ScheduleResult) ProtoMessage() {} func (*ScheduleResult) Descriptor() ([]byte, []int) { - return fileDescriptor_f9c348dec43a6705, []int{8} + return fileDescriptor_f9c348dec43a6705, []int{10} } func (m *ScheduleResult) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -564,7 +652,7 @@ func (m *TaskSchedulerResponse) Reset() { *m = TaskSchedulerResponse{} } func (m *TaskSchedulerResponse) String() string { return proto.CompactTextString(m) } func (*TaskSchedulerResponse) ProtoMessage() {} func (*TaskSchedulerResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_f9c348dec43a6705, []int{9} + return fileDescriptor_f9c348dec43a6705, []int{11} } func (m *TaskSchedulerResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -605,7 +693,9 @@ func init() { proto.RegisterType((*HeartbeatRequest)(nil), "pb.HeartbeatRequest") proto.RegisterType((*HeartbeatResponse)(nil), "pb.HeartbeatResponse") proto.RegisterType((*SubmitJobRequest)(nil), "pb.SubmitJobRequest") + proto.RegisterType((*CancelJobRequest)(nil), "pb.CancelJobRequest") proto.RegisterType((*SubmitJobResponse)(nil), "pb.SubmitJobResponse") + proto.RegisterType((*CancelJobResponse)(nil), "pb.CancelJobResponse") proto.RegisterType((*RegisterExecutorRequest)(nil), "pb.RegisterExecutorRequest") proto.RegisterType((*RegisterExecutorResponse)(nil), "pb.RegisterExecutorResponse") proto.RegisterType((*ScheduleTask)(nil), "pb.ScheduleTask") @@ -618,53 +708,55 @@ func init() { func init() { proto.RegisterFile("master.proto", fileDescriptor_f9c348dec43a6705) } var fileDescriptor_f9c348dec43a6705 = []byte{ - // 729 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x54, 0x4d, 0x4f, 0x1b, 0x49, - 0x10, 0xf5, 0xcc, 0xd8, 0x66, 0x5d, 0x06, 0x33, 0x6e, 0xc1, 0xee, 0xac, 0x61, 0xbd, 0x68, 0x56, - 0xbb, 0xeb, 0x95, 0x36, 0x56, 0xe2, 0x5c, 0x22, 0x0e, 0x39, 0x00, 0x56, 0x02, 0x0a, 0x42, 0x69, - 0x88, 0x94, 0x1b, 0xea, 0x19, 0x17, 0x30, 0xf8, 0x63, 0x26, 0xdd, 0x3d, 0x28, 0xfe, 0x17, 0xb9, - 0xe5, 0x9e, 0x7b, 0xce, 0xf9, 0x0b, 0x39, 0x72, 0xcc, 0x31, 0x82, 0x3f, 0x12, 0x75, 0xcf, 0x07, - 0x66, 0x30, 0x0a, 0xb7, 0xee, 0xf7, 0xba, 0x5e, 0xd5, 0xeb, 0xea, 0x2e, 0x58, 0x1c, 0x33, 0x21, - 0x91, 0x77, 0x23, 0x1e, 0xca, 0x90, 0x98, 0x91, 0xd7, 0xaa, 0x23, 0xe7, 0x61, 0x0a, 0xb4, 0x1a, - 0xf8, 0x1e, 0xfd, 0x58, 0xe6, 0xfb, 0xe5, 0x31, 0x4a, 0x26, 0x64, 0xc8, 0x31, 0x01, 0xdc, 0x4f, - 0x06, 0xd8, 0x2f, 0x91, 0x71, 0xe9, 0x21, 0x93, 0x14, 0xdf, 0xc5, 0x28, 0x24, 0xf9, 0x13, 0xea, - 0x59, 0xdc, 0x71, 0x30, 0x70, 0x8c, 0x0d, 0xa3, 0x53, 0xa1, 0x90, 0x41, 0xbb, 0x03, 0xf2, 0x37, - 0x34, 0x38, 0x8a, 0x30, 0xe6, 0x3e, 0x1e, 0xc7, 0x82, 0x9d, 0xa2, 0x63, 0xea, 0x33, 0x4b, 0x19, - 0xfa, 0x46, 0x81, 0xe4, 0x57, 0xa8, 0x0a, 0xc9, 0x64, 0x2c, 0x1c, 0x4b, 0xd3, 0xe9, 0x8e, 0xac, - 0x43, 0x4d, 0x06, 0x63, 0x14, 0x92, 0x8d, 0x23, 0xa7, 0xbc, 0x61, 0x74, 0xca, 0xf4, 0x06, 0x20, - 0x36, 0x58, 0x52, 0x8e, 0x9c, 0x8a, 0xc6, 0xd5, 0xd2, 0x7d, 0x0c, 0xcd, 0x99, 0x1a, 0x45, 0x14, - 0x4e, 0x04, 0x92, 0x35, 0xb0, 0x90, 0x73, 0x5d, 0x5c, 0xbd, 0x57, 0xeb, 0x46, 0x5e, 0xb7, 0xaf, - 0x8c, 0x53, 0x85, 0xba, 0x1f, 0x0d, 0xb0, 0x0f, 0x63, 0x6f, 0x1c, 0xc8, 0xbd, 0xd0, 0xcb, 0x6c, - 0xfd, 0x0f, 0xa6, 0x8c, 0x74, 0x40, 0xa3, 0xb7, 0xae, 0x02, 0x8a, 0x27, 0xba, 0x7b, 0xa1, 0x77, - 0x34, 0x8d, 0x90, 0x9a, 0x32, 0x52, 0xc5, 0xfb, 0xe1, 0xe4, 0x24, 0x38, 0xd5, 0xde, 0x16, 0x69, - 0xba, 0x23, 0x04, 0xca, 0xb1, 0x40, 0xae, 0x2d, 0xd5, 0xa8, 0x5e, 0xbb, 0xff, 0xc1, 0x42, 0x1a, - 0x4a, 0xaa, 0x60, 0xee, 0xec, 0xdb, 0x25, 0xb2, 0x00, 0xd6, 0xf6, 0xce, 0xb6, 0x6d, 0x90, 0x25, - 0xa8, 0x6d, 0xe1, 0xc4, 0x3f, 0x1b, 0x33, 0x3e, 0xb4, 0x4d, 0xf7, 0x05, 0x34, 0x67, 0xd2, 0x3e, - 0xc0, 0x0b, 0x59, 0x85, 0xea, 0x79, 0xe8, 0xa9, 0x46, 0x24, 0x97, 0x5c, 0x39, 0x0f, 0xbd, 0xdd, - 0x81, 0x3b, 0x86, 0xdf, 0x28, 0x9e, 0x06, 0xaa, 0xfb, 0xfd, 0xb4, 0x33, 0x99, 0x51, 0x07, 0x16, - 0xd8, 0x60, 0xc0, 0x51, 0x08, 0x2d, 0x59, 0xa3, 0xd9, 0x56, 0x31, 0x17, 0xc8, 0x45, 0x10, 0x4e, - 0xb4, 0x58, 0x8d, 0x66, 0x5b, 0xd2, 0x06, 0xf0, 0x59, 0xc4, 0xbc, 0x60, 0x14, 0xc8, 0xa9, 0x36, - 0x67, 0xd1, 0x19, 0xc4, 0x7d, 0x0b, 0xce, 0xdd, 0x74, 0x0f, 0x29, 0xbf, 0xf0, 0x98, 0xcc, 0xe2, - 0x63, 0x72, 0x2f, 0x60, 0xf1, 0xd0, 0x3f, 0xc3, 0x41, 0x3c, 0xc2, 0x23, 0x26, 0x86, 0xe4, 0x2f, - 0x28, 0x4b, 0x26, 0x86, 0xa9, 0xdc, 0xb2, 0x92, 0x53, 0x78, 0x6a, 0x8e, 0x6a, 0x52, 0x75, 0xc1, - 0x0f, 0x85, 0xd4, 0x72, 0x16, 0xd5, 0x6b, 0xf2, 0x08, 0x48, 0xc4, 0xf1, 0x04, 0x39, 0xc7, 0xc1, - 0xf1, 0x28, 0xf4, 0x99, 0x54, 0x3e, 0x93, 0x3e, 0x35, 0x73, 0xe6, 0x55, 0x4a, 0xb8, 0xcf, 0x61, - 0x45, 0xe9, 0x66, 0xb9, 0xf3, 0xdb, 0xfb, 0x07, 0x2a, 0x2a, 0x85, 0xba, 0x3b, 0xab, 0x53, 0xef, - 0xd9, 0xfa, 0xa5, 0xcc, 0x14, 0x48, 0x13, 0xda, 0x7d, 0x02, 0x8d, 0x0c, 0xa6, 0x28, 0xe2, 0xd1, - 0xcf, 0xff, 0x8d, 0xfb, 0xd9, 0x80, 0xd5, 0x42, 0xce, 0xf4, 0x0a, 0xb7, 0xe1, 0x17, 0x91, 0x82, - 0x69, 0xde, 0x7f, 0x33, 0xe3, 0x77, 0x0e, 0xe7, 0xd5, 0xf4, 0x27, 0x92, 0x4f, 0x69, 0x1e, 0xd8, - 0x3a, 0x80, 0xa5, 0x5b, 0x94, 0xfa, 0x4a, 0x43, 0x9c, 0xa6, 0x85, 0xa8, 0x25, 0xe9, 0x40, 0xe5, - 0x82, 0x8d, 0xe2, 0xe4, 0xc3, 0xd6, 0x7b, 0x64, 0xd6, 0x5c, 0xe2, 0x82, 0x26, 0x07, 0x36, 0xcd, - 0x67, 0x46, 0xef, 0x8b, 0x05, 0xd5, 0x7d, 0x3d, 0x60, 0xc8, 0x01, 0xd8, 0xc5, 0xfe, 0x93, 0x35, - 0x15, 0x7d, 0xcf, 0x23, 0x6c, 0xad, 0xcf, 0x27, 0x13, 0x0b, 0x6e, 0x89, 0x6c, 0x42, 0x2d, 0xff, - 0x08, 0x64, 0x65, 0xde, 0x77, 0x6c, 0xad, 0x16, 0xd0, 0xd9, 0xd8, 0x7c, 0x20, 0x24, 0xb1, 0xc5, - 0x19, 0x96, 0xc4, 0xde, 0x99, 0x1a, 0x6e, 0x89, 0xf4, 0x0b, 0xcf, 0xcd, 0x99, 0x73, 0xcf, 0x89, - 0xc4, 0xef, 0xf7, 0x76, 0xc0, 0x2d, 0x11, 0x0a, 0xcd, 0xcc, 0xdc, 0x3e, 0x4a, 0x76, 0xa8, 0x66, - 0x2a, 0xb9, 0xe5, 0x39, 0x87, 0x33, 0xbd, 0x3f, 0xee, 0x61, 0x73, 0xcd, 0x5d, 0x68, 0xbc, 0x8e, - 0x91, 0x4f, 0x6f, 0x04, 0x75, 0x09, 0xb7, 0xb1, 0x4c, 0xad, 0x35, 0x8f, 0xca, 0xa4, 0xb6, 0x9c, - 0xaf, 0x57, 0x6d, 0xe3, 0xf2, 0xaa, 0x6d, 0x7c, 0xbf, 0x6a, 0x1b, 0x1f, 0xae, 0xdb, 0xa5, 0xcb, - 0xeb, 0x76, 0xe9, 0xdb, 0x75, 0xbb, 0xe4, 0x55, 0xf5, 0xe0, 0x7f, 0xfa, 0x23, 0x00, 0x00, 0xff, - 0xff, 0x38, 0x8a, 0xf9, 0xdc, 0x3a, 0x06, 0x00, 0x00, + // 761 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x55, 0xcf, 0x4e, 0xfb, 0x46, + 0x10, 0x8e, 0xed, 0x24, 0x34, 0x13, 0x08, 0xce, 0x0a, 0x5a, 0x37, 0xd0, 0x14, 0xb9, 0x6a, 0x1b, + 0xa4, 0x36, 0xa2, 0xe9, 0xa5, 0xe2, 0xd0, 0x03, 0x21, 0x6a, 0x41, 0x45, 0xa8, 0x0b, 0x95, 0x7a, + 0x43, 0x6b, 0x67, 0x00, 0x93, 0x3f, 0x76, 0x77, 0xd7, 0xa8, 0x79, 0x8b, 0xde, 0x7a, 0xef, 0xbd, + 0xef, 0xd1, 0x23, 0xc7, 0x1e, 0x2b, 0x78, 0x84, 0xbe, 0xc0, 0x4f, 0xbb, 0xfe, 0x83, 0x71, 0x82, + 0x7e, 0xdc, 0xbc, 0xdf, 0xec, 0x37, 0x33, 0xdf, 0xec, 0xcc, 0x18, 0xd6, 0x67, 0x4c, 0x48, 0xe4, + 0xfd, 0x88, 0x87, 0x32, 0x24, 0x66, 0xe4, 0x75, 0x9a, 0xc8, 0x79, 0x98, 0x02, 0x9d, 0x16, 0xfe, + 0x8e, 0x7e, 0x2c, 0xf3, 0xf3, 0xe6, 0x0c, 0x25, 0x13, 0x32, 0xe4, 0x98, 0x00, 0xee, 0x5f, 0x06, + 0xd8, 0x3f, 0x22, 0xe3, 0xd2, 0x43, 0x26, 0x29, 0xfe, 0x16, 0xa3, 0x90, 0xe4, 0x53, 0x68, 0x66, + 0xbc, 0xab, 0x60, 0xec, 0x18, 0x7b, 0x46, 0xaf, 0x46, 0x21, 0x83, 0x4e, 0xc6, 0xe4, 0x73, 0x68, + 0x71, 0x14, 0x61, 0xcc, 0x7d, 0xbc, 0x8a, 0x05, 0xbb, 0x41, 0xc7, 0xd4, 0x77, 0x36, 0x32, 0xf4, + 0x17, 0x05, 0x92, 0x0f, 0xa1, 0x2e, 0x24, 0x93, 0xb1, 0x70, 0x2c, 0x6d, 0x4e, 0x4f, 0x64, 0x17, + 0x1a, 0x32, 0x98, 0xa1, 0x90, 0x6c, 0x16, 0x39, 0xd5, 0x3d, 0xa3, 0x57, 0xa5, 0xcf, 0x00, 0xb1, + 0xc1, 0x92, 0x72, 0xea, 0xd4, 0x34, 0xae, 0x3e, 0xdd, 0x03, 0x68, 0x17, 0x72, 0x14, 0x51, 0x38, + 0x17, 0x48, 0x76, 0xc0, 0x42, 0xce, 0x75, 0x72, 0xcd, 0x41, 0xa3, 0x1f, 0x79, 0xfd, 0x91, 0x12, + 0x4e, 0x15, 0xea, 0xfe, 0x69, 0x80, 0x7d, 0x11, 0x7b, 0xb3, 0x40, 0x9e, 0x86, 0x5e, 0x26, 0xeb, + 0x2b, 0x30, 0x65, 0xa4, 0x09, 0xad, 0xc1, 0xae, 0x22, 0x94, 0x6f, 0xf4, 0x4f, 0x43, 0xef, 0x72, + 0x11, 0x21, 0x35, 0x65, 0xa4, 0x92, 0xf7, 0xc3, 0xf9, 0x75, 0x70, 0xa3, 0xb5, 0xad, 0xd3, 0xf4, + 0x44, 0x08, 0x54, 0x63, 0x81, 0x5c, 0x4b, 0x6a, 0x50, 0xfd, 0xed, 0xee, 0xc3, 0x5a, 0x4a, 0x25, + 0x75, 0x30, 0x8f, 0xcf, 0xec, 0x0a, 0x59, 0x03, 0x6b, 0x78, 0x3c, 0xb4, 0x0d, 0xb2, 0x01, 0x8d, + 0x23, 0x9c, 0xfb, 0xb7, 0x33, 0xc6, 0x27, 0xb6, 0xe9, 0xee, 0x83, 0x3d, 0x64, 0x73, 0x1f, 0xa7, + 0x85, 0xc4, 0xb6, 0xa1, 0x7e, 0x17, 0x7a, 0xcf, 0xa5, 0xae, 0xdd, 0x85, 0xde, 0xc9, 0xd8, 0xfd, + 0x01, 0xda, 0x85, 0x0c, 0xdf, 0x20, 0xbb, 0xe0, 0xc8, 0x2c, 0x3a, 0x3a, 0x80, 0x76, 0x21, 0xe6, + 0x5b, 0xea, 0x37, 0x83, 0x8f, 0x28, 0xde, 0x04, 0xaa, 0xb5, 0x46, 0xe9, 0xb3, 0x67, 0xc9, 0x3a, + 0xb0, 0xc6, 0xc6, 0x63, 0x8e, 0x42, 0x68, 0x6e, 0x83, 0x66, 0x47, 0x65, 0xb9, 0x47, 0x2e, 0x82, + 0x70, 0xae, 0xc3, 0x37, 0x68, 0x76, 0x24, 0x5d, 0x00, 0x9f, 0x45, 0xcc, 0x0b, 0xa6, 0x81, 0x5c, + 0xe8, 0xca, 0x59, 0xb4, 0x80, 0xb8, 0xbf, 0x82, 0xb3, 0x1c, 0xee, 0x2d, 0x82, 0x4b, 0x9d, 0x6a, + 0x96, 0x3b, 0xd5, 0xbd, 0x87, 0xf5, 0x0b, 0xff, 0x16, 0xc7, 0xf1, 0x14, 0x2f, 0x99, 0x98, 0x90, + 0xcf, 0xa0, 0x2a, 0x99, 0x98, 0xa4, 0xee, 0x36, 0x95, 0x3b, 0x85, 0xa7, 0xe2, 0xa8, 0x36, 0xaa, + 0x27, 0xf6, 0x43, 0x21, 0xb5, 0x3b, 0x8b, 0xea, 0x6f, 0xf2, 0x35, 0x90, 0x88, 0xe3, 0x35, 0x72, + 0x8e, 0xe3, 0xab, 0x69, 0xe8, 0x33, 0xa9, 0x74, 0x26, 0x4d, 0xd0, 0xce, 0x2d, 0x3f, 0xa5, 0x06, + 0xf7, 0x7b, 0xd8, 0x52, 0x7e, 0xb3, 0xd8, 0x79, 0xf5, 0xbe, 0x80, 0x9a, 0x0a, 0xa1, 0x6a, 0x67, + 0xf5, 0x9a, 0x03, 0x5b, 0xb7, 0x61, 0x21, 0x41, 0x9a, 0x98, 0xdd, 0x6f, 0xa0, 0x95, 0xc1, 0x14, + 0x45, 0x3c, 0x7d, 0xff, 0x50, 0xba, 0x7f, 0x1b, 0xb0, 0x5d, 0x8a, 0x99, 0x96, 0x70, 0x08, 0x1f, + 0x88, 0x14, 0x4c, 0xe3, 0x7e, 0x99, 0x09, 0x5f, 0xba, 0x9c, 0x67, 0x33, 0x9a, 0x4b, 0xbe, 0xa0, + 0x39, 0xb1, 0x73, 0x0e, 0x1b, 0x2f, 0x4c, 0x6a, 0x4e, 0x27, 0xb8, 0x48, 0x13, 0x51, 0x9f, 0xa4, + 0x07, 0xb5, 0x7b, 0x36, 0x8d, 0x93, 0x6d, 0xd0, 0x1c, 0x90, 0xa2, 0xb8, 0x44, 0x05, 0x4d, 0x2e, + 0x1c, 0x9a, 0xdf, 0x19, 0x83, 0xff, 0x2d, 0xa8, 0x9f, 0xe9, 0xed, 0x45, 0xce, 0xc1, 0x2e, 0xbf, + 0x3f, 0xd9, 0x51, 0xec, 0x57, 0x9a, 0xb0, 0xb3, 0xbb, 0xda, 0x98, 0x48, 0x70, 0x2b, 0xe4, 0x10, + 0x1a, 0xf9, 0xe8, 0x90, 0xad, 0x55, 0xb3, 0xde, 0xd9, 0x2e, 0xa1, 0x45, 0x6e, 0x3e, 0x2d, 0x09, + 0xb7, 0x3c, 0xb0, 0x09, 0x77, 0x69, 0xa4, 0x12, 0x6e, 0xbe, 0xa9, 0x12, 0x6e, 0x79, 0xb9, 0x26, + 0xdc, 0xa5, 0x75, 0xe6, 0x56, 0xc8, 0xa8, 0xd4, 0xaa, 0xce, 0x8a, 0x37, 0x4a, 0x5c, 0x7c, 0xfc, + 0xea, 0xeb, 0xb9, 0x15, 0x42, 0xa1, 0x9d, 0x15, 0xe6, 0x0c, 0x25, 0xbb, 0x50, 0xcb, 0x9e, 0xbc, + 0xa8, 0x57, 0x0e, 0x67, 0xfe, 0x3e, 0x79, 0xc5, 0x9a, 0xfb, 0x3c, 0x81, 0xd6, 0xcf, 0x31, 0xf2, + 0xc5, 0xb3, 0x43, 0x9d, 0xc2, 0x4b, 0x2c, 0xf3, 0xd6, 0x59, 0x65, 0xca, 0x5c, 0x1d, 0x39, 0xff, + 0x3c, 0x76, 0x8d, 0x87, 0xc7, 0xae, 0xf1, 0xdf, 0x63, 0xd7, 0xf8, 0xe3, 0xa9, 0x5b, 0x79, 0x78, + 0xea, 0x56, 0xfe, 0x7d, 0xea, 0x56, 0xbc, 0xba, 0xfe, 0x23, 0x7d, 0xfb, 0x2e, 0x00, 0x00, 0xff, + 0xff, 0xeb, 0xfd, 0xc5, 0x8d, 0xd3, 0x06, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -681,6 +773,7 @@ const _ = grpc.SupportPackageIsVersion4 type MasterClient interface { RegisterExecutor(ctx context.Context, in *RegisterExecutorRequest, opts ...grpc.CallOption) (*RegisterExecutorResponse, error) SubmitJob(ctx context.Context, in *SubmitJobRequest, opts ...grpc.CallOption) (*SubmitJobResponse, error) + CancelJob(ctx context.Context, in *CancelJobRequest, opts ...grpc.CallOption) (*CancelJobResponse, error) Heartbeat(ctx context.Context, in *HeartbeatRequest, opts ...grpc.CallOption) (*HeartbeatResponse, error) ScheduleTask(ctx context.Context, in *TaskSchedulerRequest, opts ...grpc.CallOption) (*TaskSchedulerResponse, error) // RegisterMetaStore is called from backend metastore and @@ -717,6 +810,15 @@ func (c *masterClient) SubmitJob(ctx context.Context, in *SubmitJobRequest, opts return out, nil } +func (c *masterClient) CancelJob(ctx context.Context, in *CancelJobRequest, opts ...grpc.CallOption) (*CancelJobResponse, error) { + out := new(CancelJobResponse) + err := c.cc.Invoke(ctx, "/pb.Master/CancelJob", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *masterClient) Heartbeat(ctx context.Context, in *HeartbeatRequest, opts ...grpc.CallOption) (*HeartbeatResponse, error) { out := new(HeartbeatResponse) err := c.cc.Invoke(ctx, "/pb.Master/Heartbeat", in, out, opts...) @@ -757,6 +859,7 @@ func (c *masterClient) QueryMetaStore(ctx context.Context, in *QueryMetaStoreReq type MasterServer interface { RegisterExecutor(context.Context, *RegisterExecutorRequest) (*RegisterExecutorResponse, error) SubmitJob(context.Context, *SubmitJobRequest) (*SubmitJobResponse, error) + CancelJob(context.Context, *CancelJobRequest) (*CancelJobResponse, error) Heartbeat(context.Context, *HeartbeatRequest) (*HeartbeatResponse, error) ScheduleTask(context.Context, *TaskSchedulerRequest) (*TaskSchedulerResponse, error) // RegisterMetaStore is called from backend metastore and @@ -777,6 +880,9 @@ func (*UnimplementedMasterServer) RegisterExecutor(ctx context.Context, req *Reg func (*UnimplementedMasterServer) SubmitJob(ctx context.Context, req *SubmitJobRequest) (*SubmitJobResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method SubmitJob not implemented") } +func (*UnimplementedMasterServer) CancelJob(ctx context.Context, req *CancelJobRequest) (*CancelJobResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method CancelJob not implemented") +} func (*UnimplementedMasterServer) Heartbeat(ctx context.Context, req *HeartbeatRequest) (*HeartbeatResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method Heartbeat not implemented") } @@ -830,6 +936,24 @@ func _Master_SubmitJob_Handler(srv interface{}, ctx context.Context, dec func(in return interceptor(ctx, in, info, handler) } +func _Master_CancelJob_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(CancelJobRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(MasterServer).CancelJob(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/pb.Master/CancelJob", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(MasterServer).CancelJob(ctx, req.(*CancelJobRequest)) + } + return interceptor(ctx, in, info, handler) +} + func _Master_Heartbeat_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(HeartbeatRequest) if err := dec(in); err != nil { @@ -914,6 +1038,10 @@ var _Master_serviceDesc = grpc.ServiceDesc{ MethodName: "SubmitJob", Handler: _Master_SubmitJob_Handler, }, + { + MethodName: "CancelJob", + Handler: _Master_CancelJob_Handler, + }, { MethodName: "Heartbeat", Handler: _Master_Heartbeat_Handler, @@ -1060,6 +1188,34 @@ func (m *SubmitJobRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *CancelJobRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *CancelJobRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *CancelJobRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.JobId != 0 { + i = encodeVarintMaster(dAtA, i, uint64(m.JobId)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + func (m *SubmitJobResponse) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -1100,6 +1256,41 @@ func (m *SubmitJobResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *CancelJobResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *CancelJobResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *CancelJobResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Err != nil { + { + size, err := m.Err.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintMaster(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + func (m *RegisterExecutorRequest) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -1409,6 +1600,18 @@ func (m *SubmitJobRequest) Size() (n int) { return n } +func (m *CancelJobRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.JobId != 0 { + n += 1 + sovMaster(uint64(m.JobId)) + } + return n +} + func (m *SubmitJobResponse) Size() (n int) { if m == nil { return 0 @@ -1425,6 +1628,19 @@ func (m *SubmitJobResponse) Size() (n int) { return n } +func (m *CancelJobResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Err != nil { + l = m.Err.Size() + n += 1 + l + sovMaster(uint64(l)) + } + return n +} + func (m *RegisterExecutorRequest) Size() (n int) { if m == nil { return 0 @@ -1902,6 +2118,75 @@ func (m *SubmitJobRequest) Unmarshal(dAtA []byte) error { } return nil } +func (m *CancelJobRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMaster + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: CancelJobRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: CancelJobRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field JobId", wireType) + } + m.JobId = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMaster + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.JobId |= int32(b&0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipMaster(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthMaster + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *SubmitJobResponse) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -2007,6 +2292,92 @@ func (m *SubmitJobResponse) Unmarshal(dAtA []byte) error { } return nil } +func (m *CancelJobResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMaster + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: CancelJobResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: CancelJobResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Err", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMaster + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthMaster + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthMaster + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Err == nil { + m.Err = &Error{} + } + if err := m.Err.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipMaster(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthMaster + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *RegisterExecutorRequest) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 diff --git a/pb/test.pb.go b/pb/test.pb.go index d4c1c46bb3c..c75b5710266 100644 --- a/pb/test.pb.go +++ b/pb/test.pb.go @@ -58,7 +58,10 @@ type Record struct { Tid int32 `protobuf:"varint,3,opt,name=tid,proto3" json:"tid,omitempty"` Gtid int32 `protobuf:"varint,4,opt,name=gtid,proto3" json:"gtid,omitempty"` Pk int32 `protobuf:"varint,5,opt,name=pk,proto3" json:"pk,omitempty"` - Err *Error `protobuf:"bytes,6,opt,name=err,proto3" json:"err,omitempty"` + // for record time + TimeTracer []int64 `protobuf:"varint,6,rep,packed,name=time_tracer,json=timeTracer,proto3" json:"time_tracer,omitempty"` + // error + Err *Error `protobuf:"bytes,7,opt,name=err,proto3" json:"err,omitempty"` } func (m *Record) Reset() { *m = Record{} } @@ -129,6 +132,13 @@ func (m *Record) GetPk() int32 { return 0 } +func (m *Record) GetTimeTracer() []int64 { + if m != nil { + return m.TimeTracer + } + return nil +} + func (m *Record) GetErr() *Error { if m != nil { return m.Err @@ -189,25 +199,27 @@ func init() { func init() { proto.RegisterFile("test.proto", fileDescriptor_c161fcfdc0c3ff1e) } var fileDescriptor_c161fcfdc0c3ff1e = []byte{ - // 288 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x64, 0xd0, 0xcf, 0x4a, 0xfb, 0x40, - 0x10, 0x07, 0xf0, 0xec, 0xa6, 0xed, 0xef, 0xd7, 0x29, 0x94, 0x3a, 0x20, 0x2c, 0x15, 0xd7, 0x52, - 0x10, 0x7b, 0x0a, 0x1a, 0x5f, 0x40, 0x4a, 0xf4, 0xe4, 0x29, 0x16, 0xaf, 0x92, 0x3f, 0x43, 0x0d, - 0x55, 0xb3, 0x6e, 0xd6, 0x82, 0x6f, 0xe1, 0x33, 0x79, 0xf2, 0xd8, 0xa3, 0x47, 0x49, 0x5e, 0x44, - 0x76, 0x23, 0xe6, 0xe0, 0x69, 0x87, 0x0f, 0xbb, 0xfb, 0x9d, 0x19, 0x00, 0x43, 0x95, 0x09, 0x94, - 0x2e, 0x4d, 0x89, 0x5c, 0xa5, 0xd3, 0x11, 0x69, 0x5d, 0xea, 0x16, 0xe6, 0xef, 0x0c, 0x06, 0x31, - 0x65, 0xa5, 0xce, 0xf1, 0x18, 0xb8, 0x51, 0x82, 0xcd, 0xd8, 0x62, 0x1c, 0xee, 0x07, 0x2a, 0x0d, - 0x5a, 0xff, 0x39, 0x56, 0xaf, 0x8a, 0x62, 0x6e, 0x14, 0x1e, 0x02, 0x54, 0xd9, 0x3d, 0x3d, 0x26, - 0x77, 0x5b, 0xd2, 0x82, 0xcf, 0xd8, 0xa2, 0x1f, 0x0f, 0x5b, 0xb9, 0x25, 0x8d, 0x13, 0xf0, 0x4d, - 0x91, 0x0b, 0xdf, 0xb9, 0x2d, 0x11, 0xa1, 0xb7, 0xb6, 0xd4, 0x73, 0xe4, 0x6a, 0x1c, 0x03, 0x57, - 0x1b, 0xd1, 0x77, 0xc2, 0xd5, 0x06, 0x0f, 0xc0, 0x27, 0xad, 0xc5, 0x60, 0xc6, 0x16, 0xa3, 0x70, - 0x68, 0xc3, 0x2f, 0x6d, 0x93, 0xb1, 0xd5, 0xf9, 0x11, 0x40, 0xd7, 0x03, 0xfe, 0x87, 0x5e, 0x94, - 0x98, 0x64, 0xe2, 0xe1, 0x3f, 0xf0, 0xa3, 0xe8, 0x7a, 0xc2, 0xe6, 0x27, 0xb0, 0xb7, 0xa2, 0xca, - 0x2c, 0x8b, 0xa7, 0x87, 0x72, 0x1d, 0xd3, 0xf3, 0x0b, 0x55, 0xe6, 0x37, 0x96, 0x75, 0xb1, 0xe1, - 0x05, 0x8c, 0xec, 0xc5, 0x1b, 0xd2, 0xdb, 0x22, 0x23, 0x3c, 0x03, 0xb8, 0x22, 0xca, 0xdb, 0x77, - 0xe8, 0x66, 0xfe, 0xf3, 0xcf, 0x14, 0xba, 0x55, 0x9c, 0xb2, 0xa5, 0xf8, 0xa8, 0x25, 0xdb, 0xd5, - 0x92, 0x7d, 0xd5, 0x92, 0xbd, 0x35, 0xd2, 0xdb, 0x35, 0xd2, 0xfb, 0x6c, 0xa4, 0x97, 0x0e, 0xdc, - 0x42, 0xcf, 0xbf, 0x03, 0x00, 0x00, 0xff, 0xff, 0x14, 0x71, 0x4b, 0xff, 0x6f, 0x01, 0x00, 0x00, + // 305 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x64, 0xd0, 0xcf, 0x4a, 0xc3, 0x40, + 0x10, 0x06, 0xf0, 0x6c, 0xd2, 0x3f, 0x76, 0x02, 0xa5, 0x0e, 0x08, 0x4b, 0xc5, 0x34, 0x14, 0xc4, + 0x9c, 0x8a, 0xd6, 0x17, 0x90, 0x52, 0x3d, 0x79, 0x5a, 0x8b, 0xd7, 0x92, 0xa6, 0x43, 0x0d, 0xb5, + 0x66, 0x9d, 0xac, 0x05, 0xdf, 0xc2, 0xc7, 0xf2, 0xd8, 0xa3, 0x47, 0x69, 0x5e, 0x44, 0x76, 0x23, + 0xf6, 0xe0, 0x69, 0x87, 0x1f, 0xbb, 0xcb, 0x37, 0x1f, 0x80, 0xa1, 0xd2, 0x8c, 0x34, 0x17, 0xa6, + 0x40, 0x5f, 0x2f, 0xfa, 0x21, 0x31, 0x17, 0x5c, 0xc3, 0xb0, 0x12, 0xd0, 0x52, 0x94, 0x15, 0xbc, + 0xc4, 0x73, 0xf0, 0x8d, 0x96, 0x22, 0x16, 0x49, 0x77, 0x7c, 0x32, 0xd2, 0x8b, 0x51, 0xed, 0xbf, + 0xc7, 0xec, 0x5d, 0x93, 0xf2, 0x8d, 0xc6, 0x33, 0x80, 0x32, 0x7b, 0xa2, 0x4d, 0x3a, 0xdf, 0x12, + 0x4b, 0x3f, 0x16, 0x49, 0x53, 0x75, 0x6a, 0x79, 0x24, 0xc6, 0x1e, 0x04, 0x26, 0x5f, 0xca, 0xc0, + 0xb9, 0x1d, 0x11, 0xa1, 0xb1, 0xb2, 0xd4, 0x70, 0xe4, 0x66, 0xec, 0x82, 0xaf, 0xd7, 0xb2, 0xe9, + 0xc4, 0xd7, 0x6b, 0x1c, 0x40, 0x68, 0xf2, 0x0d, 0xcd, 0x0d, 0xa7, 0x19, 0xb1, 0x6c, 0xc5, 0x41, + 0x12, 0x28, 0xb0, 0x34, 0x73, 0x82, 0xa7, 0x10, 0x10, 0xb3, 0x6c, 0xc7, 0x22, 0x09, 0xc7, 0x1d, + 0x9b, 0xee, 0xd6, 0x6e, 0xa1, 0xac, 0x0e, 0x07, 0x00, 0x87, 0x90, 0x78, 0x04, 0x8d, 0x69, 0x6a, + 0xd2, 0x9e, 0x87, 0x6d, 0x08, 0xa6, 0xd3, 0xfb, 0x9e, 0x18, 0x5e, 0xc0, 0xf1, 0x8c, 0x4a, 0x33, + 0xc9, 0x5f, 0x9e, 0x8b, 0x95, 0xa2, 0xd7, 0x37, 0x2a, 0xcd, 0x5f, 0x2e, 0x71, 0xc8, 0x35, 0xbe, + 0x81, 0xd0, 0x5e, 0x7c, 0x20, 0xde, 0xe6, 0x19, 0xe1, 0x15, 0xc0, 0x1d, 0xd1, 0xb2, 0x7e, 0x87, + 0xae, 0x94, 0x7f, 0xff, 0xf4, 0xe1, 0xd0, 0xd5, 0xa5, 0x98, 0xc8, 0xcf, 0x7d, 0x24, 0x76, 0xfb, + 0x48, 0x7c, 0xef, 0x23, 0xf1, 0x51, 0x45, 0xde, 0xae, 0x8a, 0xbc, 0xaf, 0x2a, 0xf2, 0x16, 0x2d, + 0xd7, 0xf8, 0xf5, 0x4f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x27, 0x14, 0xe3, 0x72, 0x90, 0x01, 0x00, + 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -347,6 +359,25 @@ func (m *Record) MarshalToSizedBuffer(dAtA []byte) (int, error) { i = encodeVarintTest(dAtA, i, uint64(size)) } i-- + dAtA[i] = 0x3a + } + if len(m.TimeTracer) > 0 { + dAtA3 := make([]byte, len(m.TimeTracer)*10) + var j2 int + for _, num1 := range m.TimeTracer { + num := uint64(num1) + for num >= 1<<7 { + dAtA3[j2] = uint8(uint64(num)&0x7f | 0x80) + num >>= 7 + j2++ + } + dAtA3[j2] = uint8(num) + j2++ + } + i -= j2 + copy(dAtA[i:], dAtA3[:j2]) + i = encodeVarintTest(dAtA, i, uint64(j2)) + i-- dAtA[i] = 0x32 } if m.Pk != 0 { @@ -437,6 +468,13 @@ func (m *Record) Size() (n int) { if m.Pk != 0 { n += 1 + sovTest(uint64(m.Pk)) } + if len(m.TimeTracer) > 0 { + l = 0 + for _, e := range m.TimeTracer { + l += sovTest(uint64(e)) + } + n += 1 + sovTest(uint64(l)) + l + } if m.Err != nil { l = m.Err.Size() n += 1 + l + sovTest(uint64(l)) @@ -587,6 +625,82 @@ func (m *Record) Unmarshal(dAtA []byte) error { } } case 6: + if wireType == 0 { + var v int64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTest + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.TimeTracer = append(m.TimeTracer, v) + } else if wireType == 2 { + var packedLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTest + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + packedLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if packedLen < 0 { + return ErrInvalidLengthTest + } + postIndex := iNdEx + packedLen + if postIndex < 0 { + return ErrInvalidLengthTest + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + var elementCount int + var count int + for _, integer := range dAtA[iNdEx:postIndex] { + if integer < 128 { + count++ + } + } + elementCount = count + if elementCount != 0 && len(m.TimeTracer) == 0 { + m.TimeTracer = make([]int64, 0, elementCount) + } + for iNdEx < postIndex { + var v int64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTest + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.TimeTracer = append(m.TimeTracer, v) + } + } else { + return fmt.Errorf("proto: wrong wireType = %d for field TimeTracer", wireType) + } + case 7: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Err", wireType) } diff --git a/proto/CDCPeerToPeer.proto b/proto/CDCPeerToPeer.proto deleted file mode 100644 index 786ecc557d6..00000000000 --- a/proto/CDCPeerToPeer.proto +++ /dev/null @@ -1,60 +0,0 @@ -// Copyright 2021 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -syntax = "proto3"; - -package pb; - -option optimize_for = SPEED; - -service CDCPeerToPeer { - rpc SendMessage(stream MessagePacket) returns (stream SendMessageResponse); -} - -message MessageEntry { - string topic = 1; - bytes content = 2; - int64 sequence = 3; -} - -message StreamMeta { - string sender_id = 1; - string receiver_id = 2; - int64 epoch = 3; -} - -message MessagePacket { - StreamMeta stream_meta = 1; - repeated MessageEntry entries = 2; -} - -message Ack { - string topic = 1; - int64 last_seq = 2; -} - -enum ExitReason { - NONE = 0; - CONGESTED = 1; - CAPTURE_SUICIDE = 2; - STALE_CONNECTION = 3; - DUPLICATE_CONNECTION = 4; - CAPTURE_ID_MISMATCH = 5; - OTHER = 100; -} - -message SendMessageResponse { - repeated Ack ack = 1; - ExitReason exit_reason = 2; - string error_message = 3; -} diff --git a/proto/master.proto b/proto/master.proto index 23938b06e9e..601b92f3c12 100644 --- a/proto/master.proto +++ b/proto/master.proto @@ -22,6 +22,8 @@ service Master { //}; } + rpc CancelJob(CancelJobRequest) returns(CancelJobResponse) {} + //GetMembers returns the available master members //rpc GetMembers(GetMembersRequest) {} @@ -69,11 +71,19 @@ message SubmitJobRequest { // TODO: Resource Limit } +message CancelJobRequest { + int32 job_id = 1; +} + message SubmitJobResponse { Error err = 1; int32 job_id = 2; } +message CancelJobResponse { + Error err = 1; +} + message RegisterExecutorRequest { // dm need 'worker-name' to locate the worker. // TODO: Do we really need a "worker name"? Can we use address to identify an executor? diff --git a/proto/test.proto b/proto/test.proto index b16bd016715..a5c8aa217a2 100644 --- a/proto/test.proto +++ b/proto/test.proto @@ -13,7 +13,10 @@ message Record { int32 tid = 3; int32 gtid = 4; int32 pk = 5; - Error err = 6; + // for record time + repeated int64 time_tracer = 6; + // error + Error err = 7; } message TestBinlogRequest { diff --git a/test/job_test.go b/test/job_test.go index 1fa42c297eb..1677a371bef 100644 --- a/test/job_test.go +++ b/test/job_test.go @@ -57,7 +57,7 @@ func (t *testJobSuite) TestSubmit(c *C) { c.Assert(err, IsNil) testJobConfig := benchmark.Config{ Servers: []string{"127.0.0.1:9999", "127.0.0.1:9998", "127.0.0.1:9997"}, - FlowID: "job test", + FlowID: "jobtest", TableNum: 10, RecordCnt: 10000, DDLFrequency: 100, @@ -81,4 +81,9 @@ func (t *testJobSuite) TestSubmit(c *C) { for _, cnt := range tablesCnt { c.Assert(cnt, Equals, testJobConfig.RecordCnt) } + resp1, err := client.CancelJob(context.Background(), &pb.CancelJobRequest{ + JobId: resp.JobId, + }) + c.Assert(err, IsNil) + c.Assert(resp1.Err, IsNil) } diff --git a/test/mock/grpc.go b/test/mock/grpc.go index 4583810d39b..cc8348dc5fc 100644 --- a/test/mock/grpc.go +++ b/test/mock/grpc.go @@ -55,6 +55,8 @@ func (s *masterServerConn) sendRequest(ctx context.Context, req interface{}) (in return s.server.Heartbeat(ctx, x) case *pb.TaskSchedulerRequest: return s.server.ScheduleTask(ctx, x) + case *pb.CancelJobRequest: + return s.server.CancelJob(ctx, x) } return nil, errors.New("unknown request") } @@ -73,6 +75,11 @@ func (c *masterServerClient) SubmitJob(ctx context.Context, req *pb.SubmitJobReq return resp.(*pb.SubmitJobResponse), err } +func (c *masterServerClient) CancelJob(ctx context.Context, req *pb.CancelJobRequest, opts ...grpc.CallOption) (*pb.CancelJobResponse, error) { + resp, err := c.conn.sendRequest(ctx, req) + return resp.(*pb.CancelJobResponse), err +} + func (c *masterServerClient) Heartbeat(ctx context.Context, req *pb.HeartbeatRequest, opts ...grpc.CallOption) (*pb.HeartbeatResponse, error) { resp, err := c.conn.sendRequest(ctx, req) return resp.(*pb.HeartbeatResponse), err