Skip to content

Commit

Permalink
DF-20481: Add new OCR3DataFeeds telemetry type for Mercury jobs (#14470)
Browse files Browse the repository at this point in the history
* Add new OCR3DataFeeds telemetry type for Mercury jobs

* Update plugin_test.go

* Changeset

* Update curvy-beans-scream.md

* Update .changeset/curvy-beans-scream.md

* Update curvy-beans-scream.md
  • Loading branch information
austinborn authored Sep 18, 2024
1 parent 358fc17 commit 5885454
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 14 deletions.
5 changes: 5 additions & 0 deletions .changeset/curvy-beans-scream.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#changed: Add new OCR3DataFeeds telemetry type for Mercury jobs
19 changes: 17 additions & 2 deletions core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,21 @@ func (d *Delegate) newServicesMercury(
lggr.ErrorIf(d.jobORM.RecordError(ctx, jb.ID, msg), "unable to record error")
})

var relayConfig evmrelaytypes.RelayConfig
err = json.Unmarshal(jb.OCR2OracleSpec.RelayConfig.Bytes(), &relayConfig)
if err != nil {
return nil, fmt.Errorf("error while unmarshalling relay config: %w", err)
}

var telemetryType synchronization.TelemetryType
if relayConfig.EnableTriggerCapability && jb.OCR2OracleSpec.PluginConfig == nil {
telemetryType = synchronization.OCR3DataFeeds
// First use case for TriggerCapability transmission is Data Feeds, so telemetry should be routed accordingly.
// This is only true if TriggerCapability is the *only* transmission method (PluginConfig == nil).
} else {
telemetryType = synchronization.OCR3Mercury
}

oracleArgsNoPlugin := libocr2.MercuryOracleArgs{
BinaryNetworkEndpointFactory: d.peerWrapper.Peer2,
V2Bootstrappers: bootstrapPeers,
Expand All @@ -890,7 +905,7 @@ func (d *Delegate) newServicesMercury(
Database: ocrDB,
LocalConfig: lc,
Logger: ocrLogger,
MonitoringEndpoint: d.monitoringEndpointGen.GenMonitoringEndpoint(rid.Network, rid.ChainID, spec.FeedID.String(), synchronization.OCR3Mercury),
MonitoringEndpoint: d.monitoringEndpointGen.GenMonitoringEndpoint(rid.Network, rid.ChainID, spec.FeedID.String(), telemetryType),
OffchainConfigDigester: mercuryProvider.OffchainConfigDigester(),
OffchainKeyring: kb,
OnchainKeyring: kb,
Expand All @@ -901,7 +916,7 @@ func (d *Delegate) newServicesMercury(

mCfg := mercury.NewMercuryConfig(d.cfg.JobPipeline().MaxSuccessfulRuns(), d.cfg.JobPipeline().ResultWriteQueueDepth(), d.cfg)

mercuryServices, err2 := mercury.NewServices(jb, mercuryProvider, d.pipelineRunner, lggr, oracleArgsNoPlugin, mCfg, chEnhancedTelem, d.mercuryORM, (mercuryutils.FeedID)(*spec.FeedID))
mercuryServices, err2 := mercury.NewServices(jb, mercuryProvider, d.pipelineRunner, lggr, oracleArgsNoPlugin, mCfg, chEnhancedTelem, d.mercuryORM, (mercuryutils.FeedID)(*spec.FeedID), relayConfig.EnableTriggerCapability)

if ocrcommon.ShouldCollectEnhancedTelemetryMercury(jb) {
enhancedTelemService := ocrcommon.NewEnhancedTelemetryService(&jb, chEnhancedTelem, make(chan struct{}), d.monitoringEndpointGen.GenMonitoringEndpoint(rid.Network, rid.ChainID, spec.FeedID.String(), synchronization.EnhancedEAMercury), lggr.Named("EnhancedTelemetryMercury"))
Expand Down
15 changes: 5 additions & 10 deletions core/services/ocr2/plugins/mercury/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
mercuryv2 "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v2"
mercuryv3 "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v3"
mercuryv4 "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v4"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types"
"github.com/smartcontractkit/chainlink/v2/plugins"
)

Expand Down Expand Up @@ -74,20 +73,16 @@ func NewServices(
chEnhancedTelem chan ocrcommon.EnhancedTelemetryMercuryData,
orm types.DataSourceORM,
feedID utils.FeedID,
enableTriggerCapability bool,
) ([]job.ServiceCtx, error) {
if jb.PipelineSpec == nil {
return nil, errors.New("expected job to have a non-nil PipelineSpec")
}

var relayConfig evmtypes.RelayConfig
err := json.Unmarshal(jb.OCR2OracleSpec.RelayConfig.Bytes(), &relayConfig)
if err != nil {
return nil, fmt.Errorf("error while unmarshalling relay config: %w", err)
}

var err error
var pluginConfig config.PluginConfig
if jb.OCR2OracleSpec.PluginConfig == nil {
if !relayConfig.EnableTriggerCapability {
if !enableTriggerCapability {
return nil, fmt.Errorf("at least one transmission option must be configured")
}
} else {
Expand All @@ -106,8 +101,8 @@ func NewServices(
// encapsulate all the subservices and ensure we close them all if any fail to start
srvs := []job.ServiceCtx{ocr2Provider}
abort := func() {
if cerr := services.MultiCloser(srvs).Close(); err != nil {
lggr.Errorw("Error closing unused services", "err", cerr)
if err = services.MultiCloser(srvs).Close(); err != nil {
lggr.Errorw("Error closing unused services", "err", err)
}
}
saver := ocrcommon.NewResultRunSaver(pipelineRunner, lggr, cfg.MaxSuccessfulRuns(), cfg.ResultWriteQueueDepth())
Expand Down
2 changes: 1 addition & 1 deletion core/services/ocr2/plugins/mercury/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func newServicesTestWrapper(t *testing.T, pluginConfig job.JSONConfig, feedID ut
t.Helper()
jb := testJob
jb.OCR2OracleSpec.PluginConfig = pluginConfig
return mercuryocr2.NewServices(jb, &testProvider{}, nil, logger.TestLogger(t), testArgsNoPlugin, testCfg, nil, &testDataSourceORM{}, feedID)
return mercuryocr2.NewServices(jb, &testProvider{}, nil, logger.TestLogger(t), testArgsNoPlugin, testCfg, nil, &testDataSourceORM{}, feedID, false)
}

type testProvider struct{}
Expand Down
1 change: 1 addition & 0 deletions core/services/synchronization/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const (
OCR2S4 TelemetryType = "ocr2-s4"
OCR2Median TelemetryType = "ocr2-median"
OCR3Mercury TelemetryType = "ocr3-mercury"
OCR3DataFeeds TelemetryType = "ocr3-data-feeds"
AutomationCustom TelemetryType = "automation-custom"
OCR3Automation TelemetryType = "ocr3-automation"
OCR3Rebalancer TelemetryType = "ocr3-rebalancer"
Expand Down
2 changes: 1 addition & 1 deletion core/services/telemetry/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (m *Manager) GenMonitoringEndpoint(network string, chainID string, contract
e, found := m.getEndpoint(network, chainID)

if !found {
m.eng.Warnf("no telemetry endpoint found for network %q chainID %q, telemetry %q for contactID %q will NOT be sent", network, chainID, telemType, contractID)
m.eng.Warnf("no telemetry endpoint found for network %q chainID %q, telemetry %q for contractID %q will NOT be sent", network, chainID, telemType, contractID)
return &NoopAgent{}
}

Expand Down

0 comments on commit 5885454

Please sign in to comment.