Skip to content

Commit

Permalink
[CAPPL-98] fix wrong standard capability should not block initialisat…
Browse files Browse the repository at this point in the history
…ion (#15138)

* fix: wrong standard capability should not block initialisation

* feat: implement timeout

* fix: lint
  • Loading branch information
agparadiso authored Nov 7, 2024
1 parent 3167b0a commit 2aa2f53
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 14 deletions.
52 changes: 38 additions & 14 deletions core/services/standardcapabilities/standard_capabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package standardcapabilities
import (
"context"
"fmt"
"sync"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/loop"
"github.com/smartcontractkit/chainlink-common/pkg/services"
Expand All @@ -12,6 +14,8 @@ import (
"github.com/smartcontractkit/chainlink/v2/plugins"
)

const defaultStartTimeout = 3 * time.Minute

type standardCapabilities struct {
services.StateMachine
log logger.Logger
Expand All @@ -26,6 +30,10 @@ type standardCapabilities struct {
oracleFactory core.OracleFactory

capabilitiesLoop *loop.StandardCapabilitiesService

wg sync.WaitGroup
stopChan services.StopChan
startTimeout time.Duration
}

func newStandardCapabilities(
Expand All @@ -51,6 +59,7 @@ func newStandardCapabilities(
pipelineRunner: pipelineRunner,
relayerSet: relayerSet,
oracleFactory: oracleFactory,
stopChan: make(chan struct{}),
}
}

Expand All @@ -63,38 +72,53 @@ func (s *standardCapabilities) Start(ctx context.Context) error {
Cmd: cmdName,
Env: nil,
})

if err != nil {
return fmt.Errorf("error registering loop: %v", err)
}

s.capabilitiesLoop = loop.NewStandardCapabilitiesService(s.log, opts, cmdFn)

if err = s.capabilitiesLoop.Start(ctx); err != nil {
return fmt.Errorf("error starting standard capabilities service: %v", err)
}

if err = s.capabilitiesLoop.WaitCtx(ctx); err != nil {
return fmt.Errorf("error waiting for standard capabilities service to start: %v", err)
}
s.wg.Add(1)
go func() {
defer s.wg.Done()

if err = s.capabilitiesLoop.Service.Initialise(ctx, s.spec.Config, s.telemetryService, s.store, s.CapabilitiesRegistry, s.errorLog,
s.pipelineRunner, s.relayerSet, s.oracleFactory); err != nil {
return fmt.Errorf("error initialising standard capabilities service: %v", err)
}
if s.startTimeout == 0 {
s.startTimeout = defaultStartTimeout
}

capabilityInfos, err := s.capabilitiesLoop.Service.Infos(ctx)
if err != nil {
return fmt.Errorf("error getting standard capabilities service info: %v", err)
}
cctx, cancel := s.stopChan.CtxWithTimeout(s.startTimeout)
defer cancel()

if err = s.capabilitiesLoop.WaitCtx(cctx); err != nil {
s.log.Errorf("error waiting for standard capabilities service to start: %v", err)
return
}

if err = s.capabilitiesLoop.Service.Initialise(cctx, s.spec.Config, s.telemetryService, s.store, s.CapabilitiesRegistry, s.errorLog,
s.pipelineRunner, s.relayerSet, s.oracleFactory); err != nil {
s.log.Errorf("error initialising standard capabilities service: %v", err)
return
}

capabilityInfos, err := s.capabilitiesLoop.Service.Infos(cctx)
if err != nil {
s.log.Errorf("error getting standard capabilities service info: %v", err)
return
}

s.log.Info("Started standard capabilities for job spec", "spec", s.spec, "capabilities", capabilityInfos)
s.log.Info("Started standard capabilities for job spec", "spec", s.spec, "capabilities", capabilityInfos)
}()

return nil
})
}

func (s *standardCapabilities) Close() error {
close(s.stopChan)
s.wg.Wait()
return s.StopOnce("StandardCapabilities", func() error {
if s.capabilitiesLoop != nil {
return s.capabilitiesLoop.Close()
Expand Down
99 changes: 99 additions & 0 deletions core/services/standardcapabilities/standard_capabilities_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package standardcapabilities

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/loop"
"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink-common/pkg/types/core"
"github.com/smartcontractkit/chainlink-common/pkg/types/core/mocks"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/plugins"
)

func TestStandardCapabilityStart(t *testing.T) {
t.Run("NOK-not_found_binary_does_not_block", func(t *testing.T) {
ctx := tests.Context(t)
lggr := logger.TestLogger(t)

pluginRegistrar := plugins.NewRegistrarConfig(loop.GRPCOpts{}, func(name string) (*plugins.RegisteredLoop, error) { return &plugins.RegisteredLoop{}, nil }, func(loopId string) {})
registry := mocks.NewCapabilitiesRegistry(t)

spec := &job.StandardCapabilitiesSpec{
Command: "not/found/path/to/binary",
OracleFactory: job.OracleFactoryConfig{
Enabled: true,
BootstrapPeers: []string{
"12D3KooWEBVwbfdhKnicois7FTYVsBFGFcoMhMCKXQC57BQyZMhz@localhost:6690",
},
OCRContractAddress: "0x2279B7A0a67DB372996a5FaB50D91eAA73d2eBe6",
ChainID: "31337",
Network: "evm",
}}

standardCapability := newStandardCapabilities(lggr, spec, pluginRegistrar, &telemetryServiceMock{}, &kvstoreMock{}, registry, &errorLogMock{}, &pipelineRunnerServiceMock{}, &relayerSetMock{}, &oracleFactoryMock{})
standardCapability.startTimeout = 1 * time.Second
err := standardCapability.Start(ctx)
require.NoError(t, err)

standardCapability.wg.Wait()
})
}

type telemetryServiceMock struct{}

func (t *telemetryServiceMock) Send(ctx context.Context, network string, chainID string, contractID string, telemetryType string, payload []byte) error {
return nil
}

type kvstoreMock struct{}

func (k *kvstoreMock) Store(ctx context.Context, key string, val []byte) error {
return nil
}
func (k *kvstoreMock) Get(ctx context.Context, key string) ([]byte, error) {
return nil, nil
}

type errorLogMock struct{}

func (e *errorLogMock) SaveError(ctx context.Context, msg string) error {
return nil
}

type relayerSetMock struct{}

func (r *relayerSetMock) Get(ctx context.Context, relayID types.RelayID) (core.Relayer, error) {
return nil, nil
}
func (r *relayerSetMock) List(ctx context.Context, relayIDs ...types.RelayID) (map[types.RelayID]core.Relayer, error) {
return nil, nil
}

type pipelineRunnerServiceMock struct{}

func (p *pipelineRunnerServiceMock) ExecuteRun(ctx context.Context, spec string, vars core.Vars, options core.Options) (core.TaskResults, error) {
return nil, nil
}

type oracleFactoryMock struct{}

func (o *oracleFactoryMock) NewOracle(ctx context.Context, args core.OracleArgs) (core.Oracle, error) {
return &oracleMock{}, nil
}

type oracleMock struct{}

func (o *oracleMock) Start(ctx context.Context) error {
return nil
}
func (o *oracleMock) Close(ctx context.Context) error {
return nil
}

0 comments on commit 2aa2f53

Please sign in to comment.