Skip to content

Commit

Permalink
pkg/cosmos: switch to sqlutil.DataSource
Browse files Browse the repository at this point in the history
  • Loading branch information
jmank88 committed Apr 19, 2024
1 parent 73d1485 commit f165c77
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 31 deletions.
25 changes: 12 additions & 13 deletions pkg/cosmos/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,18 @@ import (
sdk "github.com/cosmos/cosmos-sdk/types"
bank "github.com/cosmos/cosmos-sdk/x/bank/types"

"github.com/jmoiron/sqlx"
"github.com/smartcontractkit/chainlink-common/pkg/chains"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/loop"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
"github.com/smartcontractkit/chainlink-common/pkg/types"

"github.com/smartcontractkit/chainlink-cosmos/pkg/cosmos/adapters"
"github.com/smartcontractkit/chainlink-cosmos/pkg/cosmos/client"
"github.com/smartcontractkit/chainlink-cosmos/pkg/cosmos/config"
"github.com/smartcontractkit/chainlink-cosmos/pkg/cosmos/db"
"github.com/smartcontractkit/chainlink-cosmos/pkg/cosmos/txm"

"github.com/smartcontractkit/chainlink-common/pkg/chains"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/loop"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/types"
)

// defaultRequestTimeout is the default Cosmos client timeout.
Expand All @@ -43,7 +42,7 @@ type Chain = adapters.Chain
// ChainOpts holds options for configuring a Chain.
type ChainOpts struct {
Logger logger.Logger
DB *sqlx.DB
DS sqlutil.DataSource
KeyStore loop.Keystore
}

Expand All @@ -54,8 +53,8 @@ func (o *ChainOpts) Validate() (err error) {
if o.Logger == nil {
err = multierr.Append(err, required("Logger'"))
}
if o.DB == nil {
err = multierr.Append(err, required("DB"))
if o.DS == nil {
err = multierr.Append(err, required("DataSource"))
}
if o.KeyStore == nil {
err = multierr.Append(err, required("KeyStore"))
Expand All @@ -67,7 +66,7 @@ func NewChain(cfg *config.TOMLConfig, opts ChainOpts) (adapters.Chain, error) {
if !cfg.IsEnabled() {
return nil, fmt.Errorf("cannot create new chain with ID %s, the chain is disabled", *cfg.ChainID)
}
c, err := newChain(*cfg.ChainID, cfg, opts.DB, opts.KeyStore, opts.Logger)
c, err := newChain(*cfg.ChainID, cfg, opts.DS, opts.KeyStore, opts.Logger)
if err != nil {
return nil, err
}
Expand All @@ -84,7 +83,7 @@ type chain struct {
lggr logger.Logger
}

func newChain(id string, cfg *config.TOMLConfig, db *sqlx.DB, ks loop.Keystore, lggr logger.Logger) (*chain, error) {
func newChain(id string, cfg *config.TOMLConfig, ds sqlutil.DataSource, ks loop.Keystore, lggr logger.Logger) (*chain, error) {
lggr = logger.With(lggr, "cosmosChainID", id)
var ch = chain{
id: id,
Expand All @@ -101,7 +100,7 @@ func newChain(id string, cfg *config.TOMLConfig, db *sqlx.DB, ks loop.Keystore,
}, nil
}),
}, lggr)
ch.txm = txm.NewTxm(db, tc, *gpe, ch.id, cfg, ks, lggr)
ch.txm = txm.NewTxm(ds, tc, *gpe, ch.id, cfg, ks, lggr)

return &ch, nil
}
Expand Down
20 changes: 10 additions & 10 deletions pkg/cosmos/txm/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,19 @@ import (
// ORM manages the data model for cosmos tx management.
type ORM struct {
chainID string
db sqlutil.Queryer
ds sqlutil.DataSource
}

// NewORM creates an ORM scoped to chainID.
func NewORM(chainID string, db sqlutil.Queryer) *ORM {
func NewORM(chainID string, ds sqlutil.DataSource) *ORM {
return &ORM{
chainID: chainID,
db: db,
ds: ds,
}
}

func (o *ORM) Transaction(ctx context.Context, fn func(*ORM) error) (err error) {
return sqlutil.Transact(ctx, o.new, o.db, nil, fn)
return sqlutil.Transact(ctx, o.new, o.ds, nil, fn)
}

// new returns a NewORM like o, but backed by q.
Expand All @@ -37,7 +37,7 @@ func (o *ORM) new(q sqlutil.Queryer) *ORM { return NewORM(o.chainID, q) }
func (o *ORM) InsertMsg(ctx context.Context, contractID, typeURL string, msg []byte) (int64, error) {
var tm adapters.Msg

err := o.db.GetContext(ctx, &tm, `INSERT INTO cosmos_msgs (contract_id, type, raw, state, cosmos_chain_id, created_at, updated_at)
err := o.ds.GetContext(ctx, &tm, `INSERT INTO cosmos_msgs (contract_id, type, raw, state, cosmos_chain_id, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, NOW(), NOW()) RETURNING *`, contractID, typeURL, msg, db.Unstarted, o.chainID)
if err != nil {
return 0, err
Expand All @@ -47,7 +47,7 @@ func (o *ORM) InsertMsg(ctx context.Context, contractID, typeURL string, msg []b

// UpdateMsgsContract updates messages for the given contract.
func (o *ORM) UpdateMsgsContract(ctx context.Context, contractID string, from, to db.State) error {
_, err := o.db.ExecContext(ctx, `UPDATE cosmos_msgs SET state = $1, updated_at = NOW()
_, err := o.ds.ExecContext(ctx, `UPDATE cosmos_msgs SET state = $1, updated_at = NOW()
WHERE cosmos_chain_id = $2 AND contract_id = $3 AND state = $4`, to, o.chainID, contractID, from)
if err != nil {
return err
Expand All @@ -61,7 +61,7 @@ func (o *ORM) GetMsgsState(ctx context.Context, state db.State, limit int64) (ad
return adapters.Msgs{}, errors.New("limit must be greater than 0")
}
var msgs adapters.Msgs
if err := o.db.SelectContext(ctx, &msgs, `SELECT * FROM cosmos_msgs WHERE state = $1 AND cosmos_chain_id = $2 ORDER BY id ASC LIMIT $3`, state, o.chainID, limit); err != nil {
if err := o.ds.SelectContext(ctx, &msgs, `SELECT * FROM cosmos_msgs WHERE state = $1 AND cosmos_chain_id = $2 ORDER BY id ASC LIMIT $3`, state, o.chainID, limit); err != nil {
return nil, err
}
return msgs, nil
Expand All @@ -70,7 +70,7 @@ func (o *ORM) GetMsgsState(ctx context.Context, state db.State, limit int64) (ad
// GetMsgs returns any messages matching ids.
func (o *ORM) GetMsgs(ctx context.Context, ids ...int64) (adapters.Msgs, error) {
var msgs adapters.Msgs
if err := o.db.SelectContext(ctx, &msgs, `SELECT * FROM cosmos_msgs WHERE id = ANY($1)`, ids); err != nil {
if err := o.ds.SelectContext(ctx, &msgs, `SELECT * FROM cosmos_msgs WHERE id = ANY($1)`, ids); err != nil {
return nil, err
}
return msgs, nil
Expand All @@ -85,9 +85,9 @@ func (o *ORM) UpdateMsgs(ctx context.Context, ids []int64, state db.State, txHas
var res sql.Result
var err error
if state == db.Broadcasted {
res, err = o.db.ExecContext(ctx, `UPDATE cosmos_msgs SET state = $1, updated_at = NOW(), tx_hash = $2 WHERE id = ANY($3)`, state, *txHash, ids)
res, err = o.ds.ExecContext(ctx, `UPDATE cosmos_msgs SET state = $1, updated_at = NOW(), tx_hash = $2 WHERE id = ANY($3)`, state, *txHash, ids)
} else {
res, err = o.db.ExecContext(ctx, `UPDATE cosmos_msgs SET state = $1, updated_at = NOW() WHERE id = ANY($2)`, state, ids)
res, err = o.ds.ExecContext(ctx, `UPDATE cosmos_msgs SET state = $1, updated_at = NOW() WHERE id = ANY($2)`, state, ids)
}
if err != nil {
return err
Expand Down
16 changes: 8 additions & 8 deletions pkg/cosmos/txm/txm.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"time"

"github.com/gogo/protobuf/proto"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"

wasmtypes "github.com/CosmWasm/wasmd/x/wasm/types"
Expand All @@ -19,15 +18,16 @@ import (
txtypes "github.com/cosmos/cosmos-sdk/types/tx"
"github.com/cosmos/cosmos-sdk/x/bank/types"

"github.com/smartcontractkit/chainlink-cosmos/pkg/cosmos/adapters"
"github.com/smartcontractkit/chainlink-cosmos/pkg/cosmos/client"
"github.com/smartcontractkit/chainlink-cosmos/pkg/cosmos/config"
"github.com/smartcontractkit/chainlink-cosmos/pkg/cosmos/db"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/loop"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
"github.com/smartcontractkit/chainlink-common/pkg/utils"

"github.com/smartcontractkit/chainlink-cosmos/pkg/cosmos/adapters"
"github.com/smartcontractkit/chainlink-cosmos/pkg/cosmos/client"
"github.com/smartcontractkit/chainlink-cosmos/pkg/cosmos/config"
"github.com/smartcontractkit/chainlink-cosmos/pkg/cosmos/db"
)

var (
Expand All @@ -49,11 +49,11 @@ type Txm struct {
}

// NewTxm creates a txm. Uses simulation so should only be used to send txes to trusted contracts i.e. OCR.
func NewTxm(db *sqlx.DB, tc func() (client.ReaderWriter, error), gpe client.ComposedGasPriceEstimator, chainID string, cfg config.Config, ks loop.Keystore, lggr logger.Logger) *Txm {
func NewTxm(ds sqlutil.DataSource, tc func() (client.ReaderWriter, error), gpe client.ComposedGasPriceEstimator, chainID string, cfg config.Config, ks loop.Keystore, lggr logger.Logger) *Txm {
keystoreAdapter := newKeystoreAdapter(ks, cfg.Bech32Prefix())
return &Txm{
newMsgs: make(chan struct{}, 1), // buffered to hold one pending request while unblocking callers
orm: NewORM(chainID, db),
orm: NewORM(chainID, ds),
lggr: logger.Sugared(lggr).Named("Txm"),
tc: tc,
keystoreAdapter: keystoreAdapter,
Expand Down

0 comments on commit f165c77

Please sign in to comment.