Skip to content

Commit

Permalink
Productionize LLO transmitter (#14355)
Browse files Browse the repository at this point in the history
This mostly copies over similar patterns and structure from the
battle-tested mercury transmitter, with a few modifications to
generalize it to LLO
  • Loading branch information
samsondav authored Sep 11, 2024
1 parent cd72f93 commit 356c70c
Show file tree
Hide file tree
Showing 30 changed files with 1,934 additions and 118 deletions.
21 changes: 21 additions & 0 deletions .changeset/strange-swans-yawn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
---
"chainlink": patch
---

#changed

Productionize transmitter for LLO

Note that some minor changes to prometheus metrics will occur in the transition to LLO. Since feed IDs no longer apply, the metrics for transmissions change as follows:

```
"mercury_transmit_*"
[]string{"feedID", ...},
```

Will change to:

```
"llo_mercury_transmit_*"
[]string{"donID", ...},
```
2 changes: 1 addition & 1 deletion core/cmd/forwarders_commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestEVMForwarderPresenter_RenderTable(t *testing.T) {
t.Parallel()

var (
id = "1"
id = "ID:"
address = utils.RandomAddress()
evmChainID = big.NewI(4)
createdAt = time.Now()
Expand Down
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ require (
github.com/smartcontractkit/chain-selectors v1.0.21 // indirect
github.com/smartcontractkit/chainlink-ccip v0.0.0-20240910072312-810030689426 // indirect
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240910210931-638ba8a76227 // indirect
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240904093355-e40169857652 // indirect
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240906125718-9f0a98d32fbc // indirect
github.com/smartcontractkit/chainlink-feeds v0.0.0-20240910155501-42f20443189f // indirect
github.com/smartcontractkit/chainlink-solana v1.1.1-0.20240910161529-a7050b5193cd // indirect
github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20240910163253-2a5c9ab97de3 // indirect
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1085,8 +1085,8 @@ github.com/smartcontractkit/chainlink-common v0.2.2-0.20240910154010-ed9f50de732
github.com/smartcontractkit/chainlink-common v0.2.2-0.20240910154010-ed9f50de7322/go.mod h1:D/qaCoq0SxXzg5NRN5FtBRv98VBf+D2NOC++RbvvuOc=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240910210931-638ba8a76227 h1:xow5cYrWxRmzTwhz2AjWOlnI9WRF4O5b84bXm1k292E=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240910210931-638ba8a76227/go.mod h1:DUFantPYoBGwBSkNVt2k4ZJi0jPKRRrZVVlAzcZwreA=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240904093355-e40169857652 h1:0aZ3HiEz2bMM5ywHAyKlFMN95qTzpNDn7uvnHLrFX6s=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240904093355-e40169857652/go.mod h1:PwPcmQNAzVmU8r8JWKrDRgvXesDwxnqbMD6DvYt/Z7M=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240906125718-9f0a98d32fbc h1:tRmTlaoAt+7FakMXXgeCuRPmzzBo5jsGpeCVvcU6KMc=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240906125718-9f0a98d32fbc/go.mod h1:PwPcmQNAzVmU8r8JWKrDRgvXesDwxnqbMD6DvYt/Z7M=
github.com/smartcontractkit/chainlink-feeds v0.0.0-20240910155501-42f20443189f h1:p4p3jBT91EQyLuAMvHD+zNJsuAYI/QjJbzuGUJ7wIgg=
github.com/smartcontractkit/chainlink-feeds v0.0.0-20240910155501-42f20443189f/go.mod h1:FLlWBt2hwiMVgt9AcSo6wBJYIRd/nsc8ENbV1Wir1bw=
github.com/smartcontractkit/chainlink-solana v1.1.1-0.20240910161529-a7050b5193cd h1:+RFI4mgXSGEcn847e7bavhQCqaBiW142g1XfGzijFoY=
Expand Down
47 changes: 47 additions & 0 deletions core/services/llo/mercurytransmitter/helpers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package mercurytransmitter

import (
"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"
"github.com/smartcontractkit/libocr/offchainreporting2plus/types"
ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"

llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo"
)

func makeSampleReport() ocr3types.ReportWithInfo[llotypes.ReportInfo] {
return ocr3types.ReportWithInfo[llotypes.ReportInfo]{
Report: ocrtypes.Report{1, 2, 3},
Info: llotypes.ReportInfo{
LifeCycleStage: llotypes.LifeCycleStage("production"),
ReportFormat: llotypes.ReportFormatEVMPremiumLegacy,
},
}
}

func makeSampleConfigDigest() ocrtypes.ConfigDigest {
return ocrtypes.ConfigDigest{1, 2, 3, 4, 5, 6}
}
func makeSampleTransmission(seqNr uint64) *Transmission {
// valid with seqnr of 3
return &Transmission{
ServerURL: "wss://example.com/mercury",
ConfigDigest: types.ConfigDigest{0x0, 0x9, 0x57, 0xdd, 0x2f, 0x63, 0x56, 0x69, 0x34, 0xfd, 0xc2, 0xe1, 0xcd, 0xc1, 0xe, 0x3e, 0x25, 0xb9, 0x26, 0x5a, 0x16, 0x23, 0x91, 0xa6, 0x53, 0x16, 0x66, 0x59, 0x51, 0x0, 0x28, 0x7c},
SeqNr: seqNr,
Report: ocr3types.ReportWithInfo[llotypes.ReportInfo]{
Report: ocrtypes.Report{0x0, 0x3, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x66, 0xde, 0xf5, 0xba, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x66, 0xde, 0xf5, 0xba, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1e, 0x8e, 0x95, 0xcf, 0xb5, 0xd8, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1a, 0xd0, 0x1c, 0x67, 0xa9, 0xcf, 0xb3, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x66, 0xdf, 0x3, 0xca, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1b, 0x1c, 0x93, 0x6d, 0xa4, 0xf2, 0x17, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1b, 0x14, 0x8d, 0x9a, 0xc1, 0xd9, 0x6f, 0xc0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1b, 0x40, 0x5c, 0xcf, 0xa1, 0xbc, 0x63, 0xc0, 0x0},
Info: llotypes.ReportInfo{
LifeCycleStage: llotypes.LifeCycleStage("production"),
ReportFormat: llotypes.ReportFormatEVMPremiumLegacy,
},
},
Sigs: []types.AttributedOnchainSignature{types.AttributedOnchainSignature{Signature: []uint8{0x9d, 0xab, 0x8f, 0xa7, 0xca, 0x7, 0x62, 0x57, 0xf7, 0x11, 0x2c, 0xb7, 0xf3, 0x49, 0x37, 0x12, 0xbd, 0xe, 0x14, 0x27, 0xfc, 0x32, 0x5c, 0xec, 0xa6, 0xb9, 0x7f, 0xf9, 0xd7, 0x7b, 0xa6, 0x36, 0x30, 0x9d, 0x84, 0x29, 0xbf, 0xd4, 0xeb, 0xc5, 0xc9, 0x29, 0xef, 0xdd, 0xd3, 0x2f, 0xa6, 0x25, 0x63, 0xda, 0xd9, 0x2c, 0xa1, 0x4a, 0xba, 0x75, 0xb2, 0x85, 0x25, 0x8f, 0x2b, 0x84, 0xcd, 0x99, 0x1}, Signer: 0x1}, types.AttributedOnchainSignature{Signature: []uint8{0x9a, 0x47, 0x4a, 0x3, 0x1a, 0x95, 0xcf, 0x46, 0x10, 0xaf, 0xcc, 0x90, 0x49, 0xb2, 0xce, 0xbf, 0x63, 0xaa, 0xc7, 0x25, 0x4d, 0x2a, 0x8, 0x36, 0xda, 0xd5, 0x9f, 0x9d, 0x63, 0x69, 0x22, 0xb3, 0x36, 0xd9, 0x6e, 0xf, 0xae, 0x7b, 0xd1, 0x61, 0x59, 0xf, 0x36, 0x4a, 0x22, 0xec, 0xde, 0x45, 0x32, 0xe0, 0x5b, 0x5c, 0xe3, 0x14, 0x29, 0x4, 0x60, 0x7b, 0xce, 0xa3, 0x89, 0x6b, 0xbb, 0xe0, 0x0}, Signer: 0x3}},
}
}

func makeSampleTransmissions() []*Transmission {
return []*Transmission{
makeSampleTransmission(1001),
makeSampleTransmission(1002),
makeSampleTransmission(1003),
}
}
196 changes: 196 additions & 0 deletions core/services/llo/mercurytransmitter/orm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
package mercurytransmitter

import (
"context"
"errors"
"fmt"
"math"

"github.com/lib/pq"

"github.com/smartcontractkit/libocr/commontypes"
ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"

"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
)

// ORM is scoped to a single DON ID
type ORM interface {
DonID() uint32
Insert(ctx context.Context, transmissions []*Transmission) error
Delete(ctx context.Context, hashes [][32]byte) error
Get(ctx context.Context, serverURL string) ([]*Transmission, error)
Prune(ctx context.Context, serverURL string, maxSize int) error
}

type orm struct {
ds sqlutil.DataSource
donID uint32
}

func NewORM(ds sqlutil.DataSource, donID uint32) ORM {
return &orm{ds: ds, donID: donID}
}

func (o *orm) DonID() uint32 {
return o.donID
}

// Insert inserts the transmissions, ignoring duplicates
func (o *orm) Insert(ctx context.Context, transmissions []*Transmission) error {
if len(transmissions) == 0 {
return nil
}

type transmission struct {
DonID uint32 `db:"don_id"`
ServerURL string `db:"server_url"`
ConfigDigest ocrtypes.ConfigDigest `db:"config_digest"`
SeqNr int64 `db:"seq_nr"`
Report []byte `db:"report"`
LifecycleStage string `db:"lifecycle_stage"`
ReportFormat uint32 `db:"report_format"`
Signatures [][]byte `db:"signatures"`
Signers []uint8 `db:"signers"`
TransmissionHash []byte `db:"transmission_hash"`
}
records := make([]transmission, len(transmissions))
for i, t := range transmissions {
signatures := make([][]byte, len(t.Sigs))
signers := make([]uint8, len(t.Sigs))
for j, sig := range t.Sigs {
signatures[j] = sig.Signature
signers[j] = uint8(sig.Signer)
}
h := t.Hash()
if t.SeqNr > math.MaxInt64 {
// this is to appease the linter but shouldn't ever happen
return fmt.Errorf("seqNr is too large (got: %d, max: %d)", t.SeqNr, math.MaxInt64)
}
records[i] = transmission{
DonID: o.donID,
ServerURL: t.ServerURL,
ConfigDigest: t.ConfigDigest,
SeqNr: int64(t.SeqNr), //nolint
Report: t.Report.Report,
LifecycleStage: string(t.Report.Info.LifeCycleStage),
ReportFormat: uint32(t.Report.Info.ReportFormat),
Signatures: signatures,
Signers: signers,
TransmissionHash: h[:],
}
}

_, err := o.ds.NamedExecContext(ctx, `
INSERT INTO llo_mercury_transmit_queue (don_id, server_url, config_digest, seq_nr, report, lifecycle_stage, report_format, signatures, signers, transmission_hash)
VALUES (:don_id, :server_url, :config_digest, :seq_nr, :report, :lifecycle_stage, :report_format, :signatures, :signers, :transmission_hash)
ON CONFLICT (transmission_hash) DO NOTHING
`, records)

if err != nil {
return fmt.Errorf("llo orm: failed to insert transmissions: %w", err)
}
return nil
}

// Delete deletes the given transmissions
func (o *orm) Delete(ctx context.Context, hashes [][32]byte) error {
if len(hashes) == 0 {
return nil
}

var pqHashes pq.ByteaArray
for _, hash := range hashes {
pqHashes = append(pqHashes, hash[:])
}

_, err := o.ds.ExecContext(ctx, `
DELETE FROM llo_mercury_transmit_queue
WHERE transmission_hash = ANY($1)
`, pqHashes)
if err != nil {
return fmt.Errorf("llo orm: failed to delete transmissions: %w", err)
}
return nil
}

// Get returns all transmissions in chronologically descending order
func (o *orm) Get(ctx context.Context, serverURL string) ([]*Transmission, error) {
// The priority queue uses seqnr to sort transmissions so order by
// the same fields here for optimal insertion into the pq.
rows, err := o.ds.QueryContext(ctx, `
SELECT config_digest, seq_nr, report, lifecycle_stage, report_format, signatures, signers
FROM llo_mercury_transmit_queue
WHERE don_id = $1 AND server_url = $2
ORDER BY seq_nr DESC, transmission_hash DESC
`, o.donID, serverURL)
if err != nil {
return nil, fmt.Errorf("llo orm: failed to get transmissions: %w", err)
}
defer rows.Close()

var transmissions []*Transmission
for rows.Next() {
transmission := Transmission{
ServerURL: serverURL,
}
var digest []byte
var signatures pq.ByteaArray
var signers pq.Int32Array

err := rows.Scan(
&digest,
&transmission.SeqNr,
&transmission.Report.Report,
&transmission.Report.Info.LifeCycleStage,
&transmission.Report.Info.ReportFormat,
&signatures,
&signers,
)
if err != nil {
return nil, fmt.Errorf("llo orm: failed to scan transmission: %w", err)
}
transmission.ConfigDigest = ocrtypes.ConfigDigest(digest)
if len(signatures) != len(signers) {
return nil, errors.New("signatures and signers must have the same length")
}
for i, sig := range signatures {
if signers[i] > math.MaxUint8 {
// this is to appease the linter but shouldn't ever happen
return nil, fmt.Errorf("signer is too large (got: %d, max: %d)", signers[i], math.MaxUint8)
}
transmission.Sigs = append(transmission.Sigs, ocrtypes.AttributedOnchainSignature{
Signature: sig,
Signer: commontypes.OracleID(signers[i]), //nolint
})
}

transmissions = append(transmissions, &transmission)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("llo orm: failed to scan transmissions: %w", err)
}

return transmissions, nil
}

// Prune keeps at most maxSize rows for the given job ID,
// deleting the oldest transactions.
func (o *orm) Prune(ctx context.Context, serverURL string, maxSize int) error {
// Prune the oldest requests by epoch and round.
_, err := o.ds.ExecContext(ctx, `
DELETE FROM llo_mercury_transmit_queue
WHERE don_id = $1 AND server_url = $2 AND
transmission_hash NOT IN (
SELECT transmission_hash
FROM llo_mercury_transmit_queue
WHERE don_id = $1 AND server_url = $2
ORDER BY seq_nr DESC, transmission_hash DESC
LIMIT $3
)
`, o.donID, serverURL, maxSize)
if err != nil {
return fmt.Errorf("llo orm: failed to prune transmissions: %w", err)
}
return nil
}
89 changes: 89 additions & 0 deletions core/services/llo/mercurytransmitter/orm_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package mercurytransmitter

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
)

var (
sURL = "wss://example.com/mercury"
sURL2 = "wss://mercuryserver.test"
sURL3 = "wss://mercuryserver.example/foo"
)

func TestORM(t *testing.T) {
ctx := testutils.Context(t)
db := pgtest.NewSqlxDB(t)

donID := uint32(654321)
orm := NewORM(db, donID)

t.Run("DonID", func(t *testing.T) {
assert.Equal(t, donID, orm.DonID())
})

transmissions := makeSampleTransmissions()[:2]

t.Run("Insert", func(t *testing.T) {
err := orm.Insert(ctx, transmissions)
require.NoError(t, err)
})
t.Run("Get", func(t *testing.T) {
result, err := orm.Get(ctx, sURL)
require.NoError(t, err)

assert.ElementsMatch(t, transmissions, result)

result, err = orm.Get(ctx, "other server url")
require.NoError(t, err)

assert.Empty(t, result)
})
t.Run("Delete", func(t *testing.T) {
err := orm.Delete(ctx, [][32]byte{transmissions[0].Hash()})
require.NoError(t, err)

result, err := orm.Get(ctx, sURL)
require.NoError(t, err)

require.Len(t, result, 1)
assert.Equal(t, transmissions[1], result[0])

err = orm.Delete(ctx, [][32]byte{transmissions[1].Hash()})
require.NoError(t, err)

result, err = orm.Get(ctx, sURL)
require.NoError(t, err)
require.Len(t, result, 0)
})
t.Run("Prune", func(t *testing.T) {
err := orm.Insert(ctx, transmissions)
require.NoError(t, err)

err = orm.Prune(ctx, sURL, 1)
require.NoError(t, err)

result, err := orm.Get(ctx, sURL)
require.NoError(t, err)
require.Len(t, result, 1)
assert.Equal(t, transmissions[1], result[0])

err = orm.Prune(ctx, sURL, 1)
require.NoError(t, err)
result, err = orm.Get(ctx, sURL)
require.NoError(t, err)
require.Len(t, result, 1)
assert.Equal(t, transmissions[1], result[0])

err = orm.Prune(ctx, sURL, 0)
require.NoError(t, err)
result, err = orm.Get(ctx, sURL)
require.NoError(t, err)
require.Len(t, result, 0)
})
}
Loading

0 comments on commit 356c70c

Please sign in to comment.