Skip to content

Commit

Permalink
lib/*(engine): Implement worker exit fast path (pingcap#361)
Browse files Browse the repository at this point in the history
  • Loading branch information
liuzix authored May 17, 2022
1 parent c16e9f0 commit 88da691
Show file tree
Hide file tree
Showing 14 changed files with 518 additions and 46 deletions.
40 changes: 40 additions & 0 deletions .github/workflows/dataflow_engine_e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,46 @@ jobs:
name: node-failure-workflow-logs
path: logs/*

Worker-error-workflow:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2

- uses: actions/setup-go@v3
with:
go-version: 1.18

- name: Build images
run: $GITHUB_WORKSPACE/sample/prepare.sh

- name: Run containers
run: docker-compose -f $GITHUB_WORKSPACE/sample/3m3e.yaml up -d

- name: Run tests
run: |
cd $GITHUB_WORKSPACE/test/e2e
go test -count=1 -v -run=TestWorkerExit
- name: Dump docker container logs on failure
if: ${{ failure() }}
uses: jwalton/gh-docker-logs@v2
with:
tail: '100'

- name: Collect docker logs on failure
if: ${{ failure() }}
uses: jwalton/gh-docker-logs@v2
with:
dest: 'logs'

- name: Upload logs to GitHub
if: ${{ failure() }}
uses: actions/upload-artifact@master
with:
name: node-failure-workflow-logs
path: logs/*

DM-workflow:
runs-on: ubuntu-latest

Expand Down
11 changes: 7 additions & 4 deletions lib/base_jobmaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@ package lib
import (
"context"

"github.com/hanfei1991/microcosm/pkg/errctx"
resourcemeta "github.com/hanfei1991/microcosm/pkg/externalresource/resourcemeta/model"

"github.com/pingcap/errors"
"github.com/pingcap/tiflow/dm/pkg/log"

"github.com/hanfei1991/microcosm/executor/worker"
libModel "github.com/hanfei1991/microcosm/lib/model"
"github.com/hanfei1991/microcosm/model"
dcontext "github.com/hanfei1991/microcosm/pkg/context"
"github.com/hanfei1991/microcosm/pkg/errctx"
derror "github.com/hanfei1991/microcosm/pkg/errors"
resourcemeta "github.com/hanfei1991/microcosm/pkg/externalresource/resourcemeta/model"
"github.com/hanfei1991/microcosm/pkg/meta/metaclient"
"github.com/hanfei1991/microcosm/pkg/p2p"
)
Expand Down Expand Up @@ -142,7 +142,10 @@ func (d *DefaultBaseJobMaster) Poll(ctx context.Context) error {
return errors.Trace(err)
}
if err := d.worker.doPoll(ctx); err != nil {
return errors.Trace(err)
if derror.ErrWorkerHalfExit.NotEqual(err) {
return errors.Trace(err)
}
return nil
}
if err := d.impl.Tick(ctx); err != nil {
return errors.Trace(err)
Expand Down
10 changes: 8 additions & 2 deletions lib/fake/fake_master.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type Config struct {
EtcdWatchEnable bool `json:"etcd-watch-enable"`
EtcdEndpoints []string `json:"etcd-endpoints"`
EtcdWatchPrefix string `json:"etcd-watch-prefix"`

InjectErrorInterval time.Duration `json:"inject-error-interval"`
}

// Checkpoint defines the checkpoint of fake job
Expand Down Expand Up @@ -436,6 +438,9 @@ func (m *Master) OnWorkerMessage(worker lib.WorkerHandle, topic p2p.Topic, messa

// OnWorkerStatusUpdated implements MasterImpl.OnWorkerStatusUpdated
func (m *Master) OnWorkerStatusUpdated(worker lib.WorkerHandle, newStatus *libModel.WorkerStatus) error {
log.L().Info("FakeMaster: worker status updated",
zap.String("worker-id", worker.ID()),
zap.Any("worker-status", newStatus))
return nil
}

Expand Down Expand Up @@ -532,7 +537,8 @@ func (m *Master) genWorkerConfig(index int, checkpoint workerCheckpoint) *Worker
EtcdWatchPrefix: m.config.EtcdWatchPrefix,

// loaded from checkpoint if exists
Checkpoint: checkpoint,
Checkpoint: checkpoint,
InjectErrorInterval: m.config.InjectErrorInterval,
}
}

Expand All @@ -546,7 +552,7 @@ func NewFakeMaster(ctx *dcontext.Context, workerID libModel.WorkerID, masterID l
workerList: make([]lib.WorkerHandle, masterConfig.WorkerCount),
workerID2BusinessID: make(map[libModel.WorkerID]int),
config: masterConfig,
statusRateLimiter: rate.NewLimiter(rate.Every(time.Second*3), 1),
statusRateLimiter: rate.NewLimiter(rate.Every(100*time.Millisecond), 1),
bStatus: &businessStatus{status: make(map[libModel.WorkerID]*dummyWorkerStatus)},
finishedSet: make(map[libModel.WorkerID]int),
ctx: ctx.Context,
Expand Down
17 changes: 13 additions & 4 deletions lib/fake/fake_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ type (
ID int `json:"id"`
TargetTick int64 `json:"target-tick"`

EtcdWatchEnable bool `json:"etcd-watch-enable"`
EtcdEndpoints []string `json:"etcd-endpoints"`
EtcdWatchPrefix string `json:"etcd-watch-prefix"`
EtcdWatchEnable bool `json:"etcd-watch-enable"`
EtcdEndpoints []string `json:"etcd-endpoints"`
EtcdWatchPrefix string `json:"etcd-watch-prefix"`
InjectErrorInterval time.Duration `json:"inject-error-interval"`

Checkpoint workerCheckpoint `json:"checkpoint"`
}
Expand All @@ -57,6 +58,8 @@ type (
sync.RWMutex
code libModel.WorkerStatusCode
}

startTime time.Time
}
)

Expand Down Expand Up @@ -102,6 +105,7 @@ func (d *dummyWorker) InitImpl(ctx context.Context) error {
}
d.init = true
d.setStatusCode(libModel.WorkerStatusNormal)
d.startTime = time.Now()
return nil
}
return errors.New("repeated init")
Expand Down Expand Up @@ -144,6 +148,11 @@ func (d *dummyWorker) Tick(ctx context.Context) error {
return d.Exit(ctx, d.Status(), nil)
}

if d.config.InjectErrorInterval != 0 {
if time.Since(d.startTime) > d.config.InjectErrorInterval {
return errors.Errorf("injected error by worker: %d", d.config.ID)
}
}
return nil
}

Expand Down Expand Up @@ -277,7 +286,7 @@ func NewDummyWorker(
},
}
return &dummyWorker{
statusRateLimiter: rate.NewLimiter(rate.Every(time.Second*3), 1),
statusRateLimiter: rate.NewLimiter(rate.Every(100*time.Millisecond), 1),
status: status,
config: wcfg,
errCh: make(chan error, 1),
Expand Down
1 change: 1 addition & 0 deletions lib/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ func (m *DefaultBaseMaster) registerMessageHandlers(ctx context.Context) error {
ReplyTime: m.clock.Now(),
ToWorkerID: msg.FromWorkerID,
Epoch: m.currentEpoch.Load(),
IsFinished: msg.IsFinished,
})
if err != nil {
return err
Expand Down
13 changes: 12 additions & 1 deletion lib/master/worker_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/pingcap/tiflow/dm/pkg/log"
"go.uber.org/atomic"
"go.uber.org/zap"

libModel "github.com/hanfei1991/microcosm/lib/model"
Expand Down Expand Up @@ -53,6 +54,8 @@ type workerEntry struct {
expireAt time.Time
state workerEntryState

receivedFinish atomic.Bool

statusMu sync.RWMutex
status *libModel.WorkerStatus
}
Expand Down Expand Up @@ -97,7 +100,7 @@ func (e *workerEntry) MarkAsTombstone() {
e.mu.Lock()
defer e.mu.Unlock()

if e.state == workerEntryWait || e.state == workerEntryOffline {
if e.state == workerEntryWait || e.state == workerEntryOffline || e.IsFinished() {
// Only workerEntryWait and workerEntryOffline are allowed
// to transition to workerEntryTombstone.
e.state = workerEntryTombstone
Expand Down Expand Up @@ -167,3 +170,11 @@ func (e *workerEntry) ExpireTime() time.Time {

return e.expireAt
}

func (e *workerEntry) SetFinished() {
e.receivedFinish.Store(true)
}

func (e *workerEntry) IsFinished() bool {
return e.receivedFinish.Load()
}
27 changes: 17 additions & 10 deletions lib/master/worker_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,16 +187,17 @@ func (m *WorkerManager) InitAfterRecover(ctx context.Context) (retErr error) {
zap.Duration("duration", m.clock.Since(startTime)))
case <-timer.C:
// Wait for the worker timeout to expire
m.mu.Lock()
for _, entry := range m.workerEntries {
if entry.State() == workerEntryWait {
entry.MarkAsTombstone()
}
}
m.mu.Unlock()
}

m.mu.Lock()
for _, entry := range m.workerEntries {
if entry.State() == workerEntryWait || entry.IsFinished() {
entry.MarkAsTombstone()
}
}
m.state = workerManagerReady
m.mu.Unlock()

return nil
}

Expand All @@ -222,6 +223,10 @@ func (m *WorkerManager) HandleHeartbeat(msg *libModel.HeartbeatPingMessage, from
return
}

if msg.IsFinished {
entry.SetFinished()
}

entry.SetExpireTime(m.nextExpireTime())

if m.state == workerManagerWaitingHeartbeat {
Expand Down Expand Up @@ -463,12 +468,14 @@ func (m *WorkerManager) checkWorkerEntriesOnce() error {
continue
}

if entry.ExpireTime().After(m.clock.Now()) {
// Not timed out
hasTimedOut := entry.ExpireTime().Before(m.clock.Now())
shouldGoOffline := hasTimedOut || entry.IsFinished()
if !shouldGoOffline {
continue
}

// The worker has timed out.
// The worker has timed out, or has received a heartbeat
// with IsFinished == true.
entry.MarkAsOffline()

var offlineError error
Expand Down
Loading

0 comments on commit 88da691

Please sign in to comment.