From 2aa2f53e7d24a4b6d0b4ee27f8671ce459153031 Mon Sep 17 00:00:00 2001 From: Gabriel Paradiso Date: Thu, 7 Nov 2024 20:34:30 +0100 Subject: [PATCH] [CAPPL-98] fix wrong standard capability should not block initialisation (#15138) * fix: wrong standard capability should not block initialisation * feat: implement timeout * fix: lint --- .../standard_capabilities.go | 52 +++++++--- .../standard_capabilities_test.go | 99 +++++++++++++++++++ 2 files changed, 137 insertions(+), 14 deletions(-) create mode 100644 core/services/standardcapabilities/standard_capabilities_test.go diff --git a/core/services/standardcapabilities/standard_capabilities.go b/core/services/standardcapabilities/standard_capabilities.go index fe3dad7bb2f..76ca5fa1952 100644 --- a/core/services/standardcapabilities/standard_capabilities.go +++ b/core/services/standardcapabilities/standard_capabilities.go @@ -3,6 +3,8 @@ package standardcapabilities import ( "context" "fmt" + "sync" + "time" "github.com/smartcontractkit/chainlink-common/pkg/loop" "github.com/smartcontractkit/chainlink-common/pkg/services" @@ -12,6 +14,8 @@ import ( "github.com/smartcontractkit/chainlink/v2/plugins" ) +const defaultStartTimeout = 3 * time.Minute + type standardCapabilities struct { services.StateMachine log logger.Logger @@ -26,6 +30,10 @@ type standardCapabilities struct { oracleFactory core.OracleFactory capabilitiesLoop *loop.StandardCapabilitiesService + + wg sync.WaitGroup + stopChan services.StopChan + startTimeout time.Duration } func newStandardCapabilities( @@ -51,6 +59,7 @@ func newStandardCapabilities( pipelineRunner: pipelineRunner, relayerSet: relayerSet, oracleFactory: oracleFactory, + stopChan: make(chan struct{}), } } @@ -63,38 +72,53 @@ func (s *standardCapabilities) Start(ctx context.Context) error { Cmd: cmdName, Env: nil, }) - if err != nil { return fmt.Errorf("error registering loop: %v", err) } s.capabilitiesLoop = loop.NewStandardCapabilitiesService(s.log, opts, cmdFn) - if err = s.capabilitiesLoop.Start(ctx); err != nil { return fmt.Errorf("error starting standard capabilities service: %v", err) } - if err = s.capabilitiesLoop.WaitCtx(ctx); err != nil { - return fmt.Errorf("error waiting for standard capabilities service to start: %v", err) - } + s.wg.Add(1) + go func() { + defer s.wg.Done() - if err = s.capabilitiesLoop.Service.Initialise(ctx, s.spec.Config, s.telemetryService, s.store, s.CapabilitiesRegistry, s.errorLog, - s.pipelineRunner, s.relayerSet, s.oracleFactory); err != nil { - return fmt.Errorf("error initialising standard capabilities service: %v", err) - } + if s.startTimeout == 0 { + s.startTimeout = defaultStartTimeout + } - capabilityInfos, err := s.capabilitiesLoop.Service.Infos(ctx) - if err != nil { - return fmt.Errorf("error getting standard capabilities service info: %v", err) - } + cctx, cancel := s.stopChan.CtxWithTimeout(s.startTimeout) + defer cancel() + + if err = s.capabilitiesLoop.WaitCtx(cctx); err != nil { + s.log.Errorf("error waiting for standard capabilities service to start: %v", err) + return + } + + if err = s.capabilitiesLoop.Service.Initialise(cctx, s.spec.Config, s.telemetryService, s.store, s.CapabilitiesRegistry, s.errorLog, + s.pipelineRunner, s.relayerSet, s.oracleFactory); err != nil { + s.log.Errorf("error initialising standard capabilities service: %v", err) + return + } + + capabilityInfos, err := s.capabilitiesLoop.Service.Infos(cctx) + if err != nil { + s.log.Errorf("error getting standard capabilities service info: %v", err) + return + } - s.log.Info("Started standard capabilities for job spec", "spec", s.spec, "capabilities", capabilityInfos) + s.log.Info("Started standard capabilities for job spec", "spec", s.spec, "capabilities", capabilityInfos) + }() return nil }) } func (s *standardCapabilities) Close() error { + close(s.stopChan) + s.wg.Wait() return s.StopOnce("StandardCapabilities", func() error { if s.capabilitiesLoop != nil { return s.capabilitiesLoop.Close() diff --git a/core/services/standardcapabilities/standard_capabilities_test.go b/core/services/standardcapabilities/standard_capabilities_test.go new file mode 100644 index 00000000000..538e08c65ad --- /dev/null +++ b/core/services/standardcapabilities/standard_capabilities_test.go @@ -0,0 +1,99 @@ +package standardcapabilities + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-common/pkg/loop" + "github.com/smartcontractkit/chainlink-common/pkg/types" + "github.com/smartcontractkit/chainlink-common/pkg/types/core" + "github.com/smartcontractkit/chainlink-common/pkg/types/core/mocks" + "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" + + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/job" + "github.com/smartcontractkit/chainlink/v2/plugins" +) + +func TestStandardCapabilityStart(t *testing.T) { + t.Run("NOK-not_found_binary_does_not_block", func(t *testing.T) { + ctx := tests.Context(t) + lggr := logger.TestLogger(t) + + pluginRegistrar := plugins.NewRegistrarConfig(loop.GRPCOpts{}, func(name string) (*plugins.RegisteredLoop, error) { return &plugins.RegisteredLoop{}, nil }, func(loopId string) {}) + registry := mocks.NewCapabilitiesRegistry(t) + + spec := &job.StandardCapabilitiesSpec{ + Command: "not/found/path/to/binary", + OracleFactory: job.OracleFactoryConfig{ + Enabled: true, + BootstrapPeers: []string{ + "12D3KooWEBVwbfdhKnicois7FTYVsBFGFcoMhMCKXQC57BQyZMhz@localhost:6690", + }, + OCRContractAddress: "0x2279B7A0a67DB372996a5FaB50D91eAA73d2eBe6", + ChainID: "31337", + Network: "evm", + }} + + standardCapability := newStandardCapabilities(lggr, spec, pluginRegistrar, &telemetryServiceMock{}, &kvstoreMock{}, registry, &errorLogMock{}, &pipelineRunnerServiceMock{}, &relayerSetMock{}, &oracleFactoryMock{}) + standardCapability.startTimeout = 1 * time.Second + err := standardCapability.Start(ctx) + require.NoError(t, err) + + standardCapability.wg.Wait() + }) +} + +type telemetryServiceMock struct{} + +func (t *telemetryServiceMock) Send(ctx context.Context, network string, chainID string, contractID string, telemetryType string, payload []byte) error { + return nil +} + +type kvstoreMock struct{} + +func (k *kvstoreMock) Store(ctx context.Context, key string, val []byte) error { + return nil +} +func (k *kvstoreMock) Get(ctx context.Context, key string) ([]byte, error) { + return nil, nil +} + +type errorLogMock struct{} + +func (e *errorLogMock) SaveError(ctx context.Context, msg string) error { + return nil +} + +type relayerSetMock struct{} + +func (r *relayerSetMock) Get(ctx context.Context, relayID types.RelayID) (core.Relayer, error) { + return nil, nil +} +func (r *relayerSetMock) List(ctx context.Context, relayIDs ...types.RelayID) (map[types.RelayID]core.Relayer, error) { + return nil, nil +} + +type pipelineRunnerServiceMock struct{} + +func (p *pipelineRunnerServiceMock) ExecuteRun(ctx context.Context, spec string, vars core.Vars, options core.Options) (core.TaskResults, error) { + return nil, nil +} + +type oracleFactoryMock struct{} + +func (o *oracleFactoryMock) NewOracle(ctx context.Context, args core.OracleArgs) (core.Oracle, error) { + return &oracleMock{}, nil +} + +type oracleMock struct{} + +func (o *oracleMock) Start(ctx context.Context) error { + return nil +} +func (o *oracleMock) Close(ctx context.Context) error { + return nil +}