Skip to content

Commit

Permalink
persist all the job and tasks info (pingcap#58)
Browse files Browse the repository at this point in the history
* persist job master

* pass test

* prepare for test

* address comments
  • Loading branch information
hanfei1991 authored Dec 23, 2021
1 parent b61d082 commit 8d040b4
Show file tree
Hide file tree
Showing 19 changed files with 401 additions and 212 deletions.
10 changes: 5 additions & 5 deletions executor/runtime/benchmark/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (o *opReceive) dial() (client pb.TestServiceClient, err error) {
return
}

func (o *opReceive) Prepare() error {
func (o *opReceive) Prepare(_ *runtime.TaskContext) error {
return nil
}

Expand Down Expand Up @@ -181,7 +181,7 @@ type opSyncer struct{}
func (o *opSyncer) Close() error { return nil }

// TODO communicate with master.
func (o *opSyncer) Prepare() error { return nil }
func (o *opSyncer) Prepare(_ *runtime.TaskContext) error { return nil }

func (o *opSyncer) syncDDL(ctx *runtime.TaskContext) {
time.Sleep(1 * time.Second)
Expand Down Expand Up @@ -218,7 +218,7 @@ func (o *opSink) Close() error {
return o.writer.writeStats(o.stats)
}

func (o *opSink) Prepare() error {
func (o *opSink) Prepare(_ *runtime.TaskContext) error {
o.stats = new(recordStats)
return o.writer.Prepare()
}
Expand Down Expand Up @@ -249,7 +249,7 @@ type opProducer struct {

func (o *opProducer) Close() error { return nil }

func (o *opProducer) Prepare() error { return nil }
func (o *opProducer) Prepare(_ *runtime.TaskContext) error { return nil }

func (o *opProducer) NextWantedInputIdx() int { return runtime.DontNeedData }

Expand Down Expand Up @@ -325,7 +325,7 @@ func (o *opBinlog) Close() error {
return nil
}

func (o *opBinlog) Prepare() (err error) {
func (o *opBinlog) Prepare(_ *runtime.TaskContext) (err error) {
o.binlogChan = make(chan *runtime.Record, 1024)
if test.GlobalTestFlag {
o.server, err = mock.NewTestServer(o.addr, o)
Expand Down
58 changes: 58 additions & 0 deletions executor/runtime/jobmaster/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,25 @@ package jobmaster
import (
"context"
"encoding/json"
"strings"
"time"

"github.com/hanfei1991/microcosm/client"
"github.com/hanfei1991/microcosm/executor/runtime"
"github.com/hanfei1991/microcosm/jobmaster/benchmark"
"github.com/hanfei1991/microcosm/jobmaster/system"
"github.com/hanfei1991/microcosm/model"
"github.com/hanfei1991/microcosm/pb"
"github.com/hanfei1991/microcosm/pkg/errors"
"github.com/hanfei1991/microcosm/pkg/metadata"
"github.com/hanfei1991/microcosm/test"
"github.com/pingcap/tiflow/dm/pkg/log"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/pkg/logutil"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
)

func RegisterBuilder() {
Expand All @@ -18,6 +30,44 @@ func RegisterBuilder() {

type jobMasterBuilder struct{}

func getEtcdMetaKV(ctx context.Context, clients *client.Manager) (metadata.MetaKV, error) {
resp, err := clients.MasterClient().QueryMetaStore(
ctx,
&pb.QueryMetaStoreRequest{Tp: pb.StoreType_ServiceDiscovery},
5*time.Second,
)
if err != nil {
return nil, err
}
log.L().Info("update service discovery metastore", zap.String("addr", resp.Address))

logConfig := logutil.DefaultZapLoggerConfig
logConfig.Level = zap.NewAtomicLevelAt(zapcore.ErrorLevel)
etcdCli, err := clientv3.New(clientv3.Config{
Endpoints: strings.Split(resp.GetAddress(), ","),
Context: ctx,
LogConfig: &logConfig,
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{
grpc.WithInsecure(),
grpc.WithBlock(),
grpc.WithConnectParams(grpc.ConnectParams{
Backoff: backoff.Config{
BaseDelay: time.Second,
Multiplier: 1.1,
Jitter: 0.1,
MaxDelay: 3 * time.Second,
},
MinConnectTimeout: 3 * time.Second,
}),
},
})
if err != nil {
return nil, err
}
return metadata.NewMetaEtcd(etcdCli), nil
}

func (b *jobMasterBuilder) Build(op model.Operator) (runtime.Operator, bool, error) {
cfg := &model.JobMaster{}
err := json.Unmarshal(op, cfg)
Expand All @@ -30,6 +80,13 @@ func (b *jobMasterBuilder) Build(op model.Operator) (runtime.Operator, bool, err
if err != nil {
return nil, false, err
}
var metaKV metadata.MetaKV
if !test.GlobalTestFlag {
metaKV, err = getEtcdMetaKV(context.Background(), clients)
if err != nil {
return nil, false, err
}
}
switch cfg.Tp {
case model.Benchmark:
jobMaster, err = benchmark.BuildBenchmarkJobMaster(
Expand All @@ -41,6 +98,7 @@ func (b *jobMasterBuilder) Build(op model.Operator) (runtime.Operator, bool, err
return nil, false, err
}
return &jobMasterAgent{
metaKV: metaKV,
master: jobMaster,
}, true, nil
}
11 changes: 9 additions & 2 deletions executor/runtime/jobmaster/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,23 @@ import (

"github.com/hanfei1991/microcosm/executor/runtime"
"github.com/hanfei1991/microcosm/jobmaster/system"
"github.com/hanfei1991/microcosm/pkg/metadata"
"github.com/hanfei1991/microcosm/test"
"github.com/pingcap/tiflow/dm/pkg/log"
"go.uber.org/zap"
)

type jobMasterAgent struct {
metaKV metadata.MetaKV
master system.JobMaster
}

func (j *jobMasterAgent) Prepare() error {
return j.master.Start(context.Background())
func (j *jobMasterAgent) Prepare(ctx *runtime.TaskContext) error {
if test.GlobalTestFlag {
j.metaKV = ctx.TestCtx.GetMetaKV()
}
// TODO: the starting routine cannot be cancelled here.
return j.master.Start(context.Background(), j.metaKV)
}

func (j *jobMasterAgent) Next(_ *runtime.TaskContext, _ *runtime.Record, _ int) ([]runtime.Chunk, bool, error) {
Expand Down
2 changes: 1 addition & 1 deletion executor/runtime/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ package runtime
type Operator interface {
Next(ctx *TaskContext, r *Record, idx int) ([]Chunk, bool, error)
NextWantedInputIdx() int
Prepare() error
Prepare(ctx *TaskContext) error
Close() error
}
2 changes: 1 addition & 1 deletion executor/runtime/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ type taskContainer struct {
func (t *taskContainer) prepare() error {
t.inputCache = make([]Chunk, len(t.inputs))
t.outputCache = make([]Chunk, len(t.outputs))
return t.op.Prepare()
return t.op.Prepare(t.ctx)
}

func (t *taskContainer) tryAwake() bool {
Expand Down
3 changes: 1 addition & 2 deletions jobmaster/benchmark/build.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package benchmark

import (
"context"
"encoding/json"
"fmt"
"path/filepath"
Expand Down Expand Up @@ -127,7 +126,7 @@ func BuildBenchmarkJobMaster(
connectTwoTask(hashTask, sinkTask)
}

systemJobMaster := system.New(context.Background(), jobID, clients)
systemJobMaster := system.New(jobID, clients)
master := &jobMaster{
Master: systemJobMaster,
config: config,
Expand Down
29 changes: 17 additions & 12 deletions jobmaster/benchmark/jobmaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/hanfei1991/microcosm/jobmaster/system"
"github.com/hanfei1991/microcosm/model"
"github.com/hanfei1991/microcosm/pkg/metadata"
)

type jobMaster struct {
Expand All @@ -15,18 +16,22 @@ type jobMaster struct {
stage2 []*model.Task
}

func (m *jobMaster) Start(ctx context.Context) error {
m.StartInternal()
// start stage1
err := m.DispatchTasks(ctx, m.stage1)
// start stage2
if err != nil {
return err
}
err = m.DispatchTasks(ctx, m.stage2)
if err != nil {
return err
}
// TODO: Shall we pass an argument to indicate whether to recover from etcd?
func (m *jobMaster) Start(ctx context.Context, metaKV metadata.MetaKV) error {
m.MetaKV = metaKV
//for _, task := range m.stage1 {
// if err := m.RestoreTask(ctx, task); err != nil {
// return err
// }
//}
//for _, task := range m.stage2 {
// if err := m.RestoreTask(ctx, task); err != nil {
// return err
// }
//}
m.StartInternal(ctx)
m.DispatchTasks(m.stage1...)
m.DispatchTasks(m.stage2...)
// TODO: Start the tasks manager to communicate.
return nil
}
Expand Down
Loading

0 comments on commit 8d040b4

Please sign in to comment.