Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into INFOPLAT-1071-beholde…
Browse files Browse the repository at this point in the history
…r-csa-signer-auth_2
  • Loading branch information
4of9 committed Nov 8, 2024
2 parents b969b66 + 4ae4553 commit 7bd523f
Show file tree
Hide file tree
Showing 15 changed files with 92 additions and 40 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.1
github.com/hashicorp/consul/sdk v0.16.0
github.com/hashicorp/go-hclog v1.5.0
github.com/hashicorp/go-plugin v1.6.2-0.20240829161738-06afb6d7ae99
github.com/hashicorp/go-plugin v1.6.2
github.com/iancoleman/strcase v0.3.0
github.com/invopop/jsonschema v0.12.0
github.com/jmoiron/sqlx v1.4.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ github.com/hashicorp/consul/sdk v0.16.0 h1:SE9m0W6DEfgIVCJX7xU+iv/hUl4m/nxqMTnCd
github.com/hashicorp/consul/sdk v0.16.0/go.mod h1:7pxqqhqoaPqnBnzXD1StKed62LqJeClzVsUEy85Zr0A=
github.com/hashicorp/go-hclog v1.5.0 h1:bI2ocEMgcVlz55Oj1xZNBsVi900c7II+fWDyV9o+13c=
github.com/hashicorp/go-hclog v1.5.0/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M=
github.com/hashicorp/go-plugin v1.6.2-0.20240829161738-06afb6d7ae99 h1:OSQYEsRT3tRttZkk6zyC3aAaliwd7Loi/KgXgXxGtwA=
github.com/hashicorp/go-plugin v1.6.2-0.20240829161738-06afb6d7ae99/go.mod h1:CkgLQ5CZqNmdL9U9JzM532t8ZiYQ35+pj3b1FD37R0Q=
github.com/hashicorp/go-plugin v1.6.2 h1:zdGAEd0V1lCaU0u+MxWQhtSDQmahpkwOun8U8EiRVog=
github.com/hashicorp/go-plugin v1.6.2/go.mod h1:CkgLQ5CZqNmdL9U9JzM532t8ZiYQ35+pj3b1FD37R0Q=
github.com/hashicorp/yamux v0.1.1 h1:yrQxtgseBDrq9Y652vSRDvsKCJKOUD+GzTS4Y0Y8pvE=
github.com/hashicorp/yamux v0.1.1/go.mod h1:CtWFDAQgb7dxtzFs4tWbplKIe2jSi3+5vKbgIO0SLnQ=
github.com/iancoleman/strcase v0.3.0 h1:nTXanmYxhfFAMjZL34Ov6gkzEsSJZ5DbhxWjvSASxEI=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type IdenticalConsensusConfig[T any] struct {
Encoder Encoder
EncoderConfig EncoderConfig
ReportID ReportId
KeyID KeyId
}

func (c IdenticalConsensusConfig[T]) New(w *sdk.WorkflowSpecFactory, ref string, input IdenticalConsensusInput[T]) SignedReportCap {
Expand All @@ -23,6 +24,7 @@ func (c IdenticalConsensusConfig[T]) New(w *sdk.WorkflowSpecFactory, ref string,
"encoder_config": c.EncoderConfig,
"aggregation_method": "identical",
"report_id": c.ReportID,
"key_id": c.KeyID,
},
CapabilityType: capabilities.CapabilityTypeConsensus,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func TestIdenticalConsensus(t *testing.T) {
Encoder: ocr3.EncoderEVM,
EncoderConfig: ocr3.EncoderConfig{},
ReportID: "0001",
KeyID: "evm",
}.New(workflow, "consensus", ocr3.IdenticalConsensusInput[basictrigger.TriggerOutputs]{
Observation: trigger,
Encoder: "evm",
Expand Down Expand Up @@ -71,6 +72,7 @@ func TestIdenticalConsensus(t *testing.T) {
"encoder_config": map[string]any{},
"aggregation_method": "identical",
"report_id": "0001",
"key_id": "evm",
},
CapabilityType: capabilities.CapabilityTypeConsensus,
},
Expand Down
2 changes: 2 additions & 0 deletions pkg/capabilities/consensus/ocr3/ocr3cap/reduce_consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type ReduceConsensusConfig[T any] struct {
Encoder Encoder
EncoderConfig EncoderConfig
ReportID ReportId
KeyID KeyId
AggregationConfig aggregators.ReduceAggConfig
}

Expand All @@ -26,6 +27,7 @@ func (c ReduceConsensusConfig[T]) New(w *sdk.WorkflowSpecFactory, ref string, in
"encoder": c.Encoder,
"encoder_config": c.EncoderConfig,
"report_id": c.ReportID,
"key_id": c.KeyID,
},
CapabilityType: capabilities.CapabilityTypeConsensus,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func TestReduceConsensus(t *testing.T) {
Encoder: ocr3.EncoderEVM,
EncoderConfig: ocr3.EncoderConfig{},
ReportID: "0001",
KeyID: "evm",
AggregationConfig: aggregators.ReduceAggConfig{
Fields: []aggregators.AggregationField{
{
Expand Down Expand Up @@ -103,6 +104,7 @@ func TestReduceConsensus(t *testing.T) {
"encoder_config": map[string]any{},
"report_id": "0001",
"aggregation_method": "reduce",
"key_id": "evm",
"aggregation_config": map[string]any{
"outputFieldName": "Reports",
"reportFormat": "array",
Expand Down
6 changes: 4 additions & 2 deletions pkg/capabilities/triggers/mercury_remote_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@ type mercuryRemoteAggregator struct {
allowedSigners [][]byte
minRequiredSignatures int
previousLatestReports map[datastreams.FeedID]datastreams.FeedReport
capID string
lggr logger.Logger
}

// This aggregator is used by TriggerSubscriber to aggregate trigger events from multiple remote nodes.
// NOTE: Once Mercury supports parallel composition (and thus guarantee identical sets of reports),
// this will be replaced by the default MODE aggregator.
func NewMercuryRemoteAggregator(codec datastreams.ReportCodec, allowedSigners [][]byte, minRequiredSignatures int, lggr logger.Logger) *mercuryRemoteAggregator {
func NewMercuryRemoteAggregator(codec datastreams.ReportCodec, allowedSigners [][]byte, minRequiredSignatures int, capID string, lggr logger.Logger) *mercuryRemoteAggregator {
if allowedSigners == nil {
allowedSigners = [][]byte{}
}
Expand All @@ -30,6 +31,7 @@ func NewMercuryRemoteAggregator(codec datastreams.ReportCodec, allowedSigners []
allowedSigners: allowedSigners,
minRequiredSignatures: minRequiredSignatures,
previousLatestReports: make(map[datastreams.FeedID]datastreams.FeedReport),
capID: capID,
lggr: lggr,
}
}
Expand Down Expand Up @@ -91,5 +93,5 @@ func (a *mercuryRemoteAggregator) Aggregate(triggerEventID string, responses [][
Signers: a.allowedSigners,
MinRequiredSignatures: a.minRequiredSignatures,
}
return wrapReports(reportList, triggerEventID, latestGlobalTs, meta)
return wrapReports(reportList, triggerEventID, latestGlobalTs, meta, a.capID)
}
5 changes: 3 additions & 2 deletions pkg/capabilities/triggers/mercury_remote_aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const (
eventID = "ev_id_1"
rawReport1 = "abcd"
rawReport2 = "efgh"
capID = "streams-trigger@3.2.1"
)

type testMercuryCodec struct {
Expand All @@ -36,7 +37,7 @@ func (c testMercuryCodec) Wrap(reports []datastreams.FeedReport) (values.Value,
}

func TestMercuryRemoteAggregator(t *testing.T) {
agg := NewMercuryRemoteAggregator(testMercuryCodec{}, nil, 0, logger.Nop())
agg := NewMercuryRemoteAggregator(testMercuryCodec{}, nil, 0, capID, logger.Nop())
signatures := [][]byte{{1, 2, 3}}

feed1Old := datastreams.FeedReport{
Expand Down Expand Up @@ -99,7 +100,7 @@ func TestMercuryRemoteAggregator(t *testing.T) {
}

func getRawResponse(t *testing.T, reports []datastreams.FeedReport, timestamp int64) []byte {
resp, err := wrapReports(reports, eventID, timestamp, datastreams.Metadata{})
resp, err := wrapReports(reports, eventID, timestamp, datastreams.Metadata{}, capID)
require.NoError(t, err)
rawResp, err := pb.MarshalTriggerResponse(resp)
require.NoError(t, err)
Expand Down
45 changes: 27 additions & 18 deletions pkg/capabilities/triggers/mercury_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,14 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/values"
)

const triggerID = "streams-trigger@1.0.0"

var capInfo = capabilities.MustNewCapabilityInfo(
triggerID,
capabilities.CapabilityTypeTrigger,
"Streams Trigger",
const (
defaultCapabilityName = "streams-trigger"
defaultCapabilityVersion = "1.0.0"
defaultTickerResolutionMs = 1000
// TODO pending capabilities configuration implementation - this should be configurable with a sensible default
defaultSendChannelBufferSize = 1000
)

const defaultTickerResolutionMs = 1000

// TODO pending capabilities configuration implementation - this should be configurable with a sensible default
const defaultSendChannelBufferSize = 1000

// This Trigger Service allows for the registration and deregistration of triggers. You can also send reports to the service.
type MercuryTriggerService struct {
capabilities.CapabilityInfo
Expand All @@ -54,17 +49,31 @@ type subscriber struct {
// Mercury Trigger will send events to each subscriber every MaxFrequencyMs (configurable per subscriber).
// Event generation happens whenever local unix time is a multiple of tickerResolutionMs. Therefore,
// all subscribers' MaxFrequencyMs values need to be a multiple of tickerResolutionMs.
func NewMercuryTriggerService(tickerResolutionMs int64, lggr logger.Logger) *MercuryTriggerService {
func NewMercuryTriggerService(tickerResolutionMs int64, capName string, capVersion string, lggr logger.Logger) (*MercuryTriggerService, error) {
if tickerResolutionMs == 0 {
tickerResolutionMs = defaultTickerResolutionMs
}
if capName == "" {
capName = defaultCapabilityName
}
if capVersion == "" {
capVersion = defaultCapabilityVersion
}
capInfo, err := capabilities.NewCapabilityInfo(
capName+"@"+capVersion,
capabilities.CapabilityTypeTrigger,
"Streams Trigger",
)
if err != nil {
return nil, err
}
return &MercuryTriggerService{
CapabilityInfo: capInfo,
tickerResolutionMs: tickerResolutionMs,
subscribers: make(map[string]*subscriber),
latestReports: make(map[datastreams.FeedID]datastreams.FeedReport),
stopCh: make(services.StopChan),
lggr: logger.Named(lggr, "MercuryTriggerService")}
lggr: logger.Named(lggr, "MercuryTriggerService")}, nil
}

func (o *MercuryTriggerService) SetMetaOverride(meta datastreams.Metadata) {
Expand Down Expand Up @@ -95,7 +104,7 @@ func (o *MercuryTriggerService) RegisterTrigger(ctx context.Context, req capabil

// If triggerId is already registered, return an error
if _, ok := o.subscribers[req.TriggerID]; ok {
return nil, fmt.Errorf("triggerId %s already registered", triggerID)
return nil, fmt.Errorf("triggerId %s already registered", o.ID)
}

if int64(config.MaxFrequencyMs)%o.tickerResolutionMs != 0 {
Expand Down Expand Up @@ -133,7 +142,7 @@ func (o *MercuryTriggerService) UnregisterTrigger(ctx context.Context, req capab

subscriber, ok := o.subscribers[req.TriggerID]
if !ok {
return fmt.Errorf("triggerId %s not registered", triggerID)
return fmt.Errorf("triggerId %s not registered", o.ID)
}
close(subscriber.ch)
delete(o.subscribers, req.TriggerID)
Expand Down Expand Up @@ -186,7 +195,7 @@ func (o *MercuryTriggerService) process(timestamp int64) {

// use 32-byte-padded timestamp as EventID (human-readable)
eventID := fmt.Sprintf("streams_%024s", strconv.FormatInt(timestamp, 10))
capabilityResponse, err := wrapReports(reportList, eventID, timestamp, o.metaOverride)
capabilityResponse, err := wrapReports(reportList, eventID, timestamp, o.metaOverride, o.ID)
if err != nil {
o.lggr.Errorw("error wrapping reports", "err", err)
continue
Expand All @@ -202,7 +211,7 @@ func (o *MercuryTriggerService) process(timestamp int64) {
}
}

func wrapReports(reportList []datastreams.FeedReport, eventID string, timestamp int64, meta datastreams.Metadata) (capabilities.TriggerResponse, error) {
func wrapReports(reportList []datastreams.FeedReport, eventID string, timestamp int64, meta datastreams.Metadata, capID string) (capabilities.TriggerResponse, error) {
out := datastreams.StreamsTriggerEvent{
Payload: reportList,
Metadata: meta,
Expand All @@ -216,7 +225,7 @@ func wrapReports(reportList []datastreams.FeedReport, eventID string, timestamp
// Create a new TriggerRegistrationResponse with the MercuryTriggerEvent
return capabilities.TriggerResponse{
Event: capabilities.TriggerEvent{
TriggerType: triggerID,
TriggerType: capID,
ID: eventID,
Outputs: outputsv,
},
Expand Down
21 changes: 13 additions & 8 deletions pkg/capabilities/triggers/mercury_trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ func registerTrigger(
return triggerEventsCh, registerRequest
}

var (
const (
triggerID = "streams-trigger@4.5.6"
feedOne = "0x1111111111111111111100000000000000000000000000000000000000000000"
feedTwo = "0x2222222222222222222200000000000000000000000000000000000000000000"
feedThree = "0x3333333333333333333300000000000000000000000000000000000000000000"
Expand All @@ -60,9 +61,10 @@ var (
)

func TestMercuryTrigger(t *testing.T) {
ts := NewMercuryTriggerService(100, logger.Nop())
ts, err := NewMercuryTriggerService(100, "", "4.5.6", logger.Nop())
require.NoError(t, err)
ctx := tests.Context(t)
err := ts.Start(ctx)
err = ts.Start(ctx)
require.NoError(t, err)
// use registerTriggerHelper to register a trigger
callback, registerUnregisterRequest := registerTrigger(
Expand Down Expand Up @@ -100,9 +102,10 @@ func TestMercuryTrigger(t *testing.T) {
}

func TestMultipleMercuryTriggers(t *testing.T) {
ts := NewMercuryTriggerService(100, logger.Nop())
ts, err := NewMercuryTriggerService(100, "", "4.5.6", logger.Nop())
require.NoError(t, err)
ctx := tests.Context(t)
err := ts.Start(ctx)
err = ts.Start(ctx)
require.NoError(t, err)
callback1, cr1 := registerTrigger(
ctx,
Expand Down Expand Up @@ -214,7 +217,8 @@ func TestMultipleMercuryTriggers(t *testing.T) {
}

func TestMercuryTrigger_RegisterTriggerErrors(t *testing.T) {
ts := NewMercuryTriggerService(100, logger.Nop())
ts, err := NewMercuryTriggerService(100, "", "4.5.6", logger.Nop())
require.NoError(t, err)
ctx := tests.Context(t)
require.NoError(t, ts.Start(ctx))

Expand Down Expand Up @@ -293,7 +297,8 @@ func TestMercuryTrigger_ConfigValidation(t *testing.T) {
return newConfig(t, []string{feedID}, 1000)
}

ts := NewMercuryTriggerService(1000, logger.Nop())
ts, err := NewMercuryTriggerService(1000, "", "4.5.6", logger.Nop())
require.NoError(t, err)
rawConf := newConfigSingleFeed(t, "012345678901234567890123456789012345678901234567890123456789000000")
conf, err := ts.ValidateConfig(rawConf)
require.Error(t, err)
Expand Down Expand Up @@ -355,7 +360,7 @@ func TestMercuryTrigger_WrapReports(t *testing.T) {
ObservationTimestamp: 876543,
})
}
wrapped, err := wrapReports(reportList, "event_id", 1234, meta)
wrapped, err := wrapReports(reportList, "event_id", 1234, meta, triggerID)
require.NoError(t, err)
require.NotNil(t, wrapped.Event)
require.Len(t, wrapped.Event.Outputs.Underlying["Payload"].(*values.List).Underlying, P)
Expand Down
7 changes: 6 additions & 1 deletion pkg/codec/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,10 @@ func getMapsFromPath(valueMap map[string]any, path []string) ([]map[string]any,
}

iItem := reflect.ValueOf(item)
if iItem.Kind() == reflect.Ptr {
iItem = iItem.Elem()
}

switch iItem.Kind() {
case reflect.Array, reflect.Slice:
length := iItem.Len()
Expand All @@ -340,7 +344,8 @@ func getMapsFromPath(valueMap map[string]any, path []string) ([]map[string]any,

// cleanup empty values for non path keys
for k, v := range m {
if k != p && reflect.ValueOf(v).IsZero() {
valueOfV := reflect.ValueOf(v)
if k != p && valueOfV.IsValid() && valueOfV.IsZero() {
delete(m, k)
}
}
Expand Down
27 changes: 23 additions & 4 deletions pkg/codec/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,36 @@ func TestGetMapsFromPath(t *testing.T) {
TestASlice []testA
}

type testC struct {
TestAPtrSlice *[]testA
}

type testStruct struct {
A testA
B testB
C, D int
C testC
D, F int
}

testMap := map[string]any{"A": map[string]any{"B": []testStruct{{B: testB{TestASlice: []testA{{IntSlice: []int{3, 2, 0}}, {IntSlice: []int{0, 1, 2}}}}, C: 10, D: 100}, {C: 20, D: 200}}}}
ptrSlice := &[]testA{{IntSlice: []int{4, 3, 2}}, {IntSlice: []int{1, 2, 3}}}
testMap := map[string]any{"A": map[string]any{"B": []testStruct{{B: testB{TestASlice: []testA{{IntSlice: []int{3, 2, 0}}, {IntSlice: []int{0, 1, 2}}}}, C: testC{TestAPtrSlice: ptrSlice}, D: 10, F: 100}, {D: 20, F: 200}}}}
t.Parallel()
actual, err := getMapsFromPath(testMap, []string{"A"})
require.NoError(t, err)
assert.Equal(t, []map[string]any{{"B": []testStruct{{B: testB{TestASlice: []testA{{IntSlice: []int{3, 2, 0}}, {IntSlice: []int{0, 1, 2}}}}, C: 10, D: 100}, {C: 20, D: 200}}}}, actual)
assert.Equal(t, []map[string]any{{"B": []testStruct{{B: testB{TestASlice: []testA{{IntSlice: []int{3, 2, 0}}, {IntSlice: []int{0, 1, 2}}}}, C: testC{TestAPtrSlice: ptrSlice}, D: 10, F: 100}, {D: 20, F: 200}}}}, actual)

actual, err = getMapsFromPath(testMap, []string{"A", "B"})
require.NoError(t, err)
assert.Equal(t, []map[string]any{{"A": map[string]any{"IntSlice": []int(nil)}, "B": map[string]any{"TestASlice": []testA{{IntSlice: []int{3, 2, 0}}, {IntSlice: []int{0, 1, 2}}}}, "C": 10, "D": 100}, {"A": map[string]any{"IntSlice": []int(nil)}, "B": map[string]any{"TestASlice": []testA(nil)}, "C": 20, "D": 200}}, actual)
assert.Equal(t, []map[string]interface{}{
{
"A": map[string]interface{}{"IntSlice": []int(nil)},
"B": map[string]interface{}{"TestASlice": []testA{{IntSlice: []int{3, 2, 0}}, {IntSlice: []int{0, 1, 2}}}},
"C": map[string]interface{}{"TestAPtrSlice": ptrSlice}, "D": 10, "F": 100},
{
"A": map[string]interface{}{"IntSlice": []int(nil)},
"B": map[string]interface{}{"TestASlice": []testA(nil)}, "C": map[string]interface{}{"TestAPtrSlice": (*[]testA)(nil)}, "D": 20, "F": 200,
},
}, actual)

actual, err = getMapsFromPath(testMap, []string{"A", "B", "B"})
require.NoError(t, err)
Expand All @@ -48,6 +63,10 @@ func TestGetMapsFromPath(t *testing.T) {
actual, err = getMapsFromPath(testMap, []string{"A", "B", "B", "TestASlice"})
require.NoError(t, err)
assert.Equal(t, []map[string]any{{"IntSlice": []int{3, 2, 0}}, {"IntSlice": []int{0, 1, 2}}}, actual)

actual, err = getMapsFromPath(testMap, []string{"A", "B", "C", "TestAPtrSlice"})
require.NoError(t, err)
assert.Equal(t, []map[string]any{{"IntSlice": []int{4, 3, 2}}, {"IntSlice": []int{1, 2, 3}}}, actual)
}

func TestFitsInNBitsSigned(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions pkg/loop/internal/goplugin/plugin_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func (s *PluginService[P, S]) launch() (*plugin.Client, plugin.ClientProtocol, e
s.lggr.Debug("Launching")

cc := s.grpcPlug.ClientConfig()
cc.SkipHostEnv = true
cc.Cmd = s.cmd()
client := plugin.NewClient(cc)
cp, err := client.Client()
Expand Down
Loading

0 comments on commit 7bd523f

Please sign in to comment.