Skip to content

Commit

Permalink
feat: Implement Timeout Configuration for DAG Tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
Kiyo510 committed Aug 11, 2024
1 parent a10fff5 commit 95a81ca
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 0 deletions.
1 change: 1 addition & 0 deletions internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ func (a *Agent) newScheduler() *scheduler.Scheduler {
LogDir: a.logDir,
Logger: a.logger,
MaxActiveRuns: a.dag.MaxActiveRuns,
Timeout: a.dag.Timeout,
Delay: a.dag.Delay,
Dry: a.dry,
ReqID: a.requestID,
Expand Down
14 changes: 14 additions & 0 deletions internal/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,20 @@ func TestAgent_Run(t *testing.T) {
// Check if the status is saved correctly
require.Equal(t, scheduler.StatusError, agt.Status().Status)
})
t.Run("FinishWithTimeout", func(t *testing.T) {
setup := test.SetupTest(t)
defer setup.Cleanup()

// Run a DAG that timeout
timeoutDAG := testLoadDAG(t, "timeout.yaml")
agt := newAgent(setup, genRequestID(), timeoutDAG, &agent.Options{})
ctx := context.Background()
err := agt.Run(ctx)
require.Error(t, err)

// Check if the status is saved correctly
require.Equal(t, scheduler.StatusError, agt.Status().Status)
})
t.Run("ReceiveSignal", func(t *testing.T) {
setup := test.SetupTest(t)
defer setup.Cleanup()
Expand Down
6 changes: 6 additions & 0 deletions internal/agent/testdata/timeout.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
timeout: 2
steps:
- name: "1"
command: "sleep 1"
- name: "2"
command: "sleep 2"
1 change: 1 addition & 0 deletions internal/dag/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func (b *builder) build(def *definition, envs []string) (*DAG, error) {
Name: def.Name,
Group: def.Group,
Description: def.Description,
Timeout: time.Second * time.Duration(def.Timeout),
Delay: time.Second * time.Duration(def.DelaySec),
RestartWait: time.Second * time.Duration(def.RestartWaitSec),
Tags: parseTags(def.Tags),
Expand Down
2 changes: 2 additions & 0 deletions internal/dag/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ type DAG struct {
// MailOn contains the conditions to send mail.
MailOn *MailOn `json:"MailOn"`

// Timeout is a field to specify the maximum execution time of the DAG task
Timeout time.Duration `json:"Timeout"`
// Misc configuration for DAG execution.
// Delay is the delay before starting the DAG.
Delay time.Duration `json:"Delay"`
Expand Down
1 change: 1 addition & 0 deletions internal/dag/definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type definition struct {
MailOn *mailOnDef
ErrorMail mailConfigDef
InfoMail mailConfigDef
Timeout int
DelaySec int
RestartWaitSec int
HistRetentionDays *int
Expand Down
21 changes: 21 additions & 0 deletions internal/dag/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type Scheduler struct {
logDir string
logger logger.Logger
maxActiveRuns int
timeout time.Duration
delay time.Duration
dry bool
onExit *dag.Step
Expand All @@ -82,6 +83,7 @@ func New(cfg *Config) *Scheduler {
logDir: cfg.LogDir,
logger: lg,
maxActiveRuns: cfg.MaxActiveRuns,
timeout: cfg.Timeout,
delay: cfg.Delay,
dry: cfg.Dry,
onExit: cfg.OnExit,
Expand All @@ -96,6 +98,7 @@ type Config struct {
LogDir string
Logger logger.Logger
MaxActiveRuns int
Timeout time.Duration
Delay time.Duration
Dry bool
OnExit *dag.Step
Expand All @@ -115,6 +118,12 @@ func (sc *Scheduler) Schedule(ctx context.Context, g *ExecutionGraph, done chan

var wg = sync.WaitGroup{}

var cancel context.CancelFunc
if sc.timeout > 0 {
ctx, cancel = context.WithTimeout(ctx, sc.timeout)
defer cancel()
}

for !sc.isFinished(g) {
if sc.isCanceled() {
break
Expand Down Expand Up @@ -168,6 +177,14 @@ func (sc *Scheduler) Schedule(ctx context.Context, g *ExecutionGraph, done chan
switch {
case status == NodeStatusSuccess || status == NodeStatusCancel:
// do nothing
case sc.isTimeout(g.startedAt):
sc.logger.Info(
"Step execution deadline exceeded",
"step", node.data.Step.Name,
"error", execErr,
)
node.setStatus(NodeStatusCancel)
sc.setLastError(execErr)
case sc.isCanceled():
sc.setLastError(execErr)
case node.data.Step.RetryPolicy != nil && node.data.Step.RetryPolicy.Limit > node.getRetryCount():
Expand Down Expand Up @@ -480,6 +497,10 @@ func (sc *Scheduler) isSucceed(g *ExecutionGraph) bool {
return true
}

func (sc *Scheduler) isTimeout(startedAt time.Time) bool {
return sc.timeout > 0 && time.Since(startedAt) > sc.timeout
}

var (
errUpstreamFailed = fmt.Errorf("upstream failed")
errUpstreamSkipped = fmt.Errorf("upstream skipped")
Expand Down
25 changes: 25 additions & 0 deletions internal/dag/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,31 @@ func TestSchedulerCancel(t *testing.T) {
require.Equal(t, NodeStatusNone, nodes[2].State().Status)
}

func TestSchedulerTimeout(t *testing.T) {
g, _ := NewExecutionGraph(
logger.Default,
step("1", "sleep 1"),
step("2", "sleep 1"),
step("3", "sleep 3"),
step("4", "sleep 10"),
step("5", "sleep 1", "2"),
step("6", "sleep 1", "5"),
)
sc := New(&Config{Timeout: time.Second * 2, LogDir: testHomeDir})

err := sc.Schedule(context.Background(), g, nil)
require.Error(t, err)
require.Equal(t, sc.Status(g), StatusError)

nodes := g.Nodes()
require.Equal(t, NodeStatusSuccess, nodes[0].State().Status)
require.Equal(t, NodeStatusSuccess, nodes[1].State().Status)
require.Equal(t, NodeStatusCancel, nodes[2].State().Status)
require.Equal(t, NodeStatusCancel, nodes[3].State().Status)
require.Equal(t, NodeStatusCancel, nodes[4].State().Status)
require.Equal(t, NodeStatusCancel, nodes[5].State().Status)
}

func TestSchedulerRetryFail(t *testing.T) {
cmd := filepath.Join(util.MustGetwd(), "testdata/testfile.sh")
g, sc, err := testSchedule(t,
Expand Down

0 comments on commit 95a81ca

Please sign in to comment.