Skip to content

Commit

Permalink
GetUnfinished fix
Browse files Browse the repository at this point in the history
  • Loading branch information
cedric-cordenier committed Nov 7, 2024
1 parent a98566c commit cd8c663
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 8 deletions.
1 change: 1 addition & 0 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
)

workflowRegistrySyncer := &syncer.WorkflowRegistry{
DS: opts.DS,
Logger: globalLogger.Named("WorkflowRegistrySyncer"),
Store: workflowORM,
Registry: opts.CapabilitiesRegistry,
Expand Down
2 changes: 1 addition & 1 deletion core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ var (
)

func (e *Engine) resumeInProgressExecutions(ctx context.Context) error {
wipExecutions, err := e.executionStates.GetUnfinished(ctx, defaultOffset, defaultLimit)
wipExecutions, err := e.executionStates.GetUnfinished(ctx, e.workflow.id, defaultOffset, defaultLimit)
if err != nil {
return err
}
Expand Down
9 changes: 7 additions & 2 deletions core/services/workflows/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
"github.com/smartcontractkit/chainlink/v2/core/services/registrysyncer"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/store"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer"
)

const testWorkflowId = "<workflow-id>"
Expand Down Expand Up @@ -149,6 +148,12 @@ func newTestEngineWithYAMLSpec(t *testing.T, reg *coreCap.Registry, spec string,
return newTestEngine(t, reg, sdkSpec, opts...)
}

type mockSecretsFetcher struct{}

func (s mockSecretsFetcher) SecretsFor(workflowOwner, workflowName string) (map[string]string, error) {
return map[string]string{}, nil
}

// newTestEngine creates a new engine with some test defaults.
func newTestEngine(t *testing.T, reg *coreCap.Registry, sdkSpec sdk.WorkflowSpec, opts ...func(c *Config)) (*Engine, *testHooks) {
initFailed := make(chan struct{})
Expand All @@ -174,7 +179,7 @@ func newTestEngine(t *testing.T, reg *coreCap.Registry, sdkSpec sdk.WorkflowSpec
onExecutionFinished: func(weid string) {
executionFinished <- weid
},
SecretsFetcher: syncer.NewWorkflowRegistry(),
SecretsFetcher: mockSecretsFetcher{},
clock: clock,
}
for _, o := range opts {
Expand Down
2 changes: 1 addition & 1 deletion core/services/workflows/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ type Store interface {
UpsertStep(ctx context.Context, step *WorkflowExecutionStep) (WorkflowExecution, error)
UpdateStatus(ctx context.Context, executionID string, status string) error
Get(ctx context.Context, executionID string) (WorkflowExecution, error)
GetUnfinished(ctx context.Context, offset, limit int) ([]WorkflowExecution, error)
GetUnfinished(ctx context.Context, workflowID string, offset, limit int) ([]WorkflowExecution, error)
}

var _ Store = (*DBStore)(nil)
7 changes: 4 additions & 3 deletions core/services/workflows/store/store_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ func (d *DBStore) transact(ctx context.Context, fn func(*DBStore) error) error {
)
}

func (d *DBStore) GetUnfinished(ctx context.Context, offset, limit int) ([]WorkflowExecution, error) {
func (d *DBStore) GetUnfinished(ctx context.Context, workflowID string, offset, limit int) ([]WorkflowExecution, error) {
sql := `
SELECT
workflow_steps.workflow_execution_id AS ws_workflow_execution_id,
Expand All @@ -382,9 +382,10 @@ func (d *DBStore) GetUnfinished(ctx context.Context, offset, limit int) ([]Workf
JOIN workflow_steps
ON workflow_steps.workflow_execution_id = workflow_executions.id
WHERE workflow_executions.status = $1
AND workflow_executions.workflow_id = $2
ORDER BY workflow_executions.created_at DESC
LIMIT $2
OFFSET $3
LIMIT $3
OFFSET $4
`
var joinRecords []workflowExecutionWithStep
err := d.db.SelectContext(ctx, &joinRecords, sql, StatusStarted, limit, offset)
Expand Down
19 changes: 18 additions & 1 deletion core/services/workflows/store/store_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ func Test_StoreDB_GetUnfinishedSteps(t *testing.T) {
store := newTestDBStore(t)

id := randomID()
wid := randomID()
stepOne := &WorkflowExecutionStep{
ExecutionID: id,
Ref: "step1",
Expand All @@ -254,12 +255,28 @@ func Test_StoreDB_GetUnfinishedSteps(t *testing.T) {
"step2": stepTwo,
},
ExecutionID: id,
WorkflowID: wid,
Status: StatusStarted,
}

_, err := store.Add(tests.Context(t), &es)
require.NoError(t, err)

id2 := randomID()
wid2 := randomID()
es2 := WorkflowExecution{
Steps: map[string]*WorkflowExecutionStep{
"step1": stepOne,
"step2": stepTwo,
},
ExecutionID: id2,
WorkflowID: wid2,
Status: StatusStarted,
}

_, err = store.Add(tests.Context(t), &es2)
require.NoError(t, err)

id = randomID()
esTwo := WorkflowExecution{
ExecutionID: id,
Expand All @@ -269,7 +286,7 @@ func Test_StoreDB_GetUnfinishedSteps(t *testing.T) {
_, err = store.Add(tests.Context(t), &esTwo)
require.NoError(t, err)

states, err := store.GetUnfinished(tests.Context(t), 0, 100)
states, err := store.GetUnfinished(tests.Context(t), wid, 0, 100)
require.NoError(t, err)

assert.Len(t, states, 1)
Expand Down
18 changes: 18 additions & 0 deletions core/services/workflows/syncer/workflow_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
"github.com/smartcontractkit/chainlink-common/pkg/types/core"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host"

Expand Down Expand Up @@ -36,6 +37,7 @@ type WorkflowRegistry struct {
Logger logger.Logger
Registry core.CapabilitiesRegistry
Store store.Store
DS sqlutil.DataSource
subServices []job.ServiceCtx
}

Expand Down Expand Up @@ -71,6 +73,22 @@ func (w *WorkflowRegistry) trySetup() bool {
return true
}

job := job.WorkflowSpec{
Workflow: string(workflow),
Config: string(config),
WorkflowID: workflowID,
WorkflowName: workflowName,
WorkflowOwner: workflowOwner,
}
sql := `INSERT INTO workflow_specs (workflow, workflow_id, workflow_owner, workflow_name, created_at, updated_at, spec_type, config)
VALUES (:workflow, :workflow_id, :workflow_owner, :workflow_name, NOW(), NOW(), :spec_type, :config)
RETURNING id;`
_, err = w.DS.NamedExecContext(ctx, sql, job)
if err != nil {
w.Logger.Info("failed to create entry: %w", err)
return false
}

moduleConfig := &host.ModuleConfig{Logger: logger.NullLogger}
spec, err := host.GetWorkflowSpec(ctx, moduleConfig, workflow, config)
if err != nil {
Expand Down

0 comments on commit cd8c663

Please sign in to comment.