Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(api): try to fix lock between workers and jobs #6689

Merged
merged 2 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion engine/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"fmt"
"io"
"math/rand"
"net/http"
"net/url"
"os"
Expand Down Expand Up @@ -849,7 +850,9 @@ func (a *API) Serve(ctx context.Context) error {
a.GoRoutines.RunWithRestart(ctx, "api.WorkflowRunCraft", func(ctx context.Context) {
a.WorkflowRunCraft(ctx, 100*time.Millisecond)
})

a.GoRoutines.RunWithRestart(ctx, "api.WorkflowRunJobDeletion", func(ctx context.Context) {
a.WorkflowRunJobDeletion(ctx, time.Duration(10*rand.Float64())*time.Second, 10)
})
a.GoRoutines.RunWithRestart(ctx, "api.V2WorkflowRunCraft", func(ctx context.Context) {
a.V2WorkflowRunCraft(ctx, 10*time.Second)
})
Expand Down
14 changes: 7 additions & 7 deletions engine/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,27 @@ package api

import (
"context"
"github.com/ovh/cds/engine/api/link"
"github.com/ovh/cds/engine/api/organization"
"net/http/httptest"
"net/url"
"testing"
"time"

"github.com/gorilla/mux"
"github.com/rockbears/log"
"github.com/stretchr/testify/require"

"github.com/ovh/cds/engine/api/authentication/builtin"
"github.com/ovh/cds/engine/api/authentication/local"
authdrivertest "github.com/ovh/cds/engine/api/authentication/test"
"github.com/ovh/cds/engine/api/bootstrap"
"github.com/ovh/cds/engine/api/link"
"github.com/ovh/cds/engine/api/organization"
apiTest "github.com/ovh/cds/engine/api/test"
"github.com/ovh/cds/engine/api/workflow"
"github.com/ovh/cds/engine/cache"
"github.com/ovh/cds/engine/service"
"github.com/ovh/cds/engine/test"
"github.com/ovh/cds/sdk"

"github.com/gorilla/mux"
"github.com/rockbears/log"
"github.com/stretchr/testify/require"
)

func newTestAPI(t *testing.T, bootstrapFunc ...test.Bootstrapf) (*API, *test.FakeTransaction, *Router) {
Expand Down Expand Up @@ -63,7 +63,7 @@ func newTestAPI(t *testing.T, bootstrapFunc ...test.Bootstrapf) (*API, *test.Fak
// Clean all the pending crafting workflow runs
lockKey := cache.Key("api:workflowRunCraft")
require.NoError(t, store.DeleteAll(lockKey))
ids, _ := workflow.LoadCratingWorkflowRunIDs(api.mustDB())
ids, _ := workflow.LoadCraftingWorkflowRunIDs(api.mustDB())
for _, id := range ids {
require.NoError(t, workflow.UpdateCraftedWorkflowRun(api.mustDB(), id))
}
Expand Down
34 changes: 28 additions & 6 deletions engine/api/workflow/dao_node_run_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func LoadNodeJobRunIDByNodeRunID(db gorp.SqlExecutor, runNodeID int64) ([]int64,
return ids, nil
}

//LoadNodeJobRun load a NodeJobRun given its ID
// LoadNodeJobRun load a NodeJobRun given its ID
func LoadNodeJobRun(ctx context.Context, db gorp.SqlExecutor, store cache.Store, id int64) (*sdk.WorkflowNodeJobRun, error) {
j := JobRun{}
query := `select workflow_node_run_job.* from workflow_node_run_job where id = $1`
Expand All @@ -252,7 +252,7 @@ func LoadNodeJobRun(ctx context.Context, db gorp.SqlExecutor, store cache.Store,
return &jr, nil
}

//LoadDeadNodeJobRun load a NodeJobRun which is Building but without worker
// LoadDeadNodeJobRun load a NodeJobRun which is Building but without worker
func LoadDeadNodeJobRun(ctx context.Context, db gorp.SqlExecutor, store cache.Store) ([]sdk.WorkflowNodeJobRun, error) {
var deadJobsDB []JobRun
query := `SELECT workflow_node_run_job.* FROM workflow_node_run_job WHERE worker_id IS NULL`
Expand All @@ -279,7 +279,29 @@ func LoadDeadNodeJobRun(ctx context.Context, db gorp.SqlExecutor, store cache.St
return deadJobs, nil
}

//LoadAndLockNodeJobRunWait load for update a NodeJobRun given its ID
func LoadAndLockTerminatedNodeJobRun(ctx context.Context, db gorp.SqlExecutor, limit int) ([]sdk.WorkflowNodeJobRun, error) {
var terminatedJobsDB []JobRun
query := `SELECT workflow_node_run_job.* FROM workflow_node_run_job WHERE status IN ($1, $2, $3) ORDER BY id ASC LIMIT $4 FOR UPDATE SKIP LOCKED`
if _, err := db.Select(&terminatedJobsDB, query, sdk.StatusStopped, sdk.StatusSuccess, sdk.StatusFail, limit); err != nil {
if err == sql.ErrNoRows {
return nil, nil
}
return nil, err
}

deadJobs := make([]sdk.WorkflowNodeJobRun, len(terminatedJobsDB))
for i, deadJob := range terminatedJobsDB {
jr, err := deadJob.WorkflowNodeRunJob()
if err != nil {
return nil, err
}
deadJobs[i] = jr
}

return deadJobs, nil
}

// LoadAndLockNodeJobRunWait load for update a NodeJobRun given its ID
func LoadAndLockNodeJobRunWait(ctx context.Context, db gorp.SqlExecutor, store cache.Store, id int64) (*sdk.WorkflowNodeJobRun, error) {
j := JobRun{}
query := `select workflow_node_run_job.* from workflow_node_run_job where id = $1 for update`
Expand All @@ -294,7 +316,7 @@ func LoadAndLockNodeJobRunWait(ctx context.Context, db gorp.SqlExecutor, store c
return &jr, nil
}

//LoadAndLockNodeJobRunSkipLocked load for update a NodeJobRun given its ID
// LoadAndLockNodeJobRunSkipLocked load for update a NodeJobRun given its ID
func LoadAndLockNodeJobRunSkipLocked(ctx context.Context, db gorp.SqlExecutor, store cache.Store, id int64) (*sdk.WorkflowNodeJobRun, error) {
var end func()
_, end = telemetry.Span(ctx, "workflow.LoadAndLockNodeJobRunSkipLocked")
Expand Down Expand Up @@ -329,7 +351,7 @@ func insertWorkflowNodeJobRun(db gorp.SqlExecutor, j *sdk.WorkflowNodeJobRun) er
return nil
}

//DeleteNodeJobRuns deletes all workflow_node_run_job for a given workflow_node_run
// DeleteNodeJobRuns deletes all workflow_node_run_job for a given workflow_node_run
func DeleteNodeJobRuns(db gorp.SqlExecutor, nodeID int64) error {
query := `delete from workflow_node_run_job where workflow_node_run_id = $1`
_, err := db.Exec(query, nodeID)
Expand All @@ -343,7 +365,7 @@ func DeleteNodeJobRun(db gorp.SqlExecutor, nodeRunJob int64) error {
return err
}

//UpdateNodeJobRun updates a workflow_node_run_job
// UpdateNodeJobRun updates a workflow_node_run_job
func UpdateNodeJobRun(ctx context.Context, db gorp.SqlExecutor, j *sdk.WorkflowNodeJobRun) error {
var end func()
_, end = telemetry.Span(ctx, "workflow.UpdateNodeJobRun")
Expand Down
2 changes: 1 addition & 1 deletion engine/api/workflow/dao_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ func InsertRunNum(db gorp.SqlExecutor, w *sdk.Workflow, num int64) error {
return nil
}

func LoadCratingWorkflowRunIDs(db gorp.SqlExecutor) ([]int64, error) {
func LoadCraftingWorkflowRunIDs(db gorp.SqlExecutor) ([]int64, error) {
query := `
SELECT id
FROM workflow_run
Expand Down
5 changes: 1 addition & 4 deletions engine/api/workflow/execute_node_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,10 +327,7 @@ func executeNodeRun(ctx context.Context, db gorpmapper.SqlExecutorWithTx, store
}
// End of temporary debug

// Delete the line in workflow_node_run_job
if err := DeleteNodeJobRuns(db, workflowNodeRun.ID); err != nil {
return nil, sdk.WrapError(err, "unable to delete node %d job runs", workflowNodeRun.ID)
}
// Delete the line in workflow_node_run_job is done asynchronously in a goroutine at api level

// If current node has a mutex, we want to trigger another node run that can be waiting for the mutex
node := updatedWorkflowRun.Workflow.WorkflowData.NodeByID(workflowNodeRun.WorkflowNodeID)
Expand Down
54 changes: 53 additions & 1 deletion engine/api/workflow_run_craft.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (api *API) WorkflowRunCraft(ctx context.Context, tick time.Duration) {
}
return
case <-ticker.C:
ids, err := workflow.LoadCratingWorkflowRunIDs(api.mustDB())
ids, err := workflow.LoadCraftingWorkflowRunIDs(api.mustDB())
if err != nil {
log.Error(ctx, "WorkflowRunCraft> unable to start tx: %v", err)
continue
Expand Down Expand Up @@ -166,3 +166,55 @@ func (api *API) workflowRunCraft(ctx context.Context, id int64) error {

return workflow.UpdateCraftedWorkflowRun(api.mustDB(), run.ID)
}

func (api *API) WorkflowRunJobDeletion(ctx context.Context, tick time.Duration, limit int) {
ticker := time.NewTicker(tick)
defer ticker.Stop()

mainLoop:
for {
select {
case <-ctx.Done():
if ctx.Err() != nil {
log.Error(ctx, "%v", ctx.Err())
}
return
case <-ticker.C:
tx, err := api.mustDB().Begin()
if err != nil {
log.ErrorWithStackTrace(ctx, err)
continue
}
jobs, err := workflow.LoadAndLockTerminatedNodeJobRun(ctx, tx, limit)
if err != nil {
log.Error(ctx, "WorkflowRunJobDeletion> unable to start tx: %v", err)
_ = tx.Rollback()
continue
}
for i := range jobs {
j := &jobs[i]
node, err := workflow.LoadNodeRunByID(ctx, tx, j.WorkflowNodeRunID, workflow.LoadRunOptions{})
if err != nil {
log.ErrorWithStackTrace(ctx, sdk.WrapError(err, "unable to load NodeRun %d", j.WorkflowNodeRunID))
_ = tx.Rollback()
continue mainLoop
}

if !sdk.StatusIsTerminated(node.Status) {
continue
}

if err := workflow.DeleteNodeJobRun(tx, j.ID); err != nil {
log.ErrorWithStackTrace(ctx, sdk.WrapError(err, "unable to delete WorkflowNodeJobRun %d", j.ID))
_ = tx.Rollback()
continue mainLoop
}
}
if err := tx.Commit(); err != nil {
log.Error(ctx, "WorkflowRunJobDeletion> unable to commit tx: %v", err)
_ = tx.Rollback()
continue
}
}
}
}