Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/cosmos: switch to sqlutil.DataSource #400

Merged
merged 1 commit into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 12 additions & 13 deletions pkg/cosmos/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,18 @@ import (
sdk "github.com/cosmos/cosmos-sdk/types"
bank "github.com/cosmos/cosmos-sdk/x/bank/types"

"github.com/jmoiron/sqlx"
"github.com/smartcontractkit/chainlink-common/pkg/chains"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/loop"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
"github.com/smartcontractkit/chainlink-common/pkg/types"

"github.com/smartcontractkit/chainlink-cosmos/pkg/cosmos/adapters"
"github.com/smartcontractkit/chainlink-cosmos/pkg/cosmos/client"
"github.com/smartcontractkit/chainlink-cosmos/pkg/cosmos/config"
"github.com/smartcontractkit/chainlink-cosmos/pkg/cosmos/db"
"github.com/smartcontractkit/chainlink-cosmos/pkg/cosmos/txm"

"github.com/smartcontractkit/chainlink-common/pkg/chains"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/loop"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/types"
)

// defaultRequestTimeout is the default Cosmos client timeout.
Expand All @@ -43,7 +42,7 @@ type Chain = adapters.Chain
// ChainOpts holds options for configuring a Chain.
type ChainOpts struct {
Logger logger.Logger
DB *sqlx.DB
DS sqlutil.DataSource
KeyStore loop.Keystore
}

Expand All @@ -54,8 +53,8 @@ func (o *ChainOpts) Validate() (err error) {
if o.Logger == nil {
err = multierr.Append(err, required("Logger'"))
}
if o.DB == nil {
err = multierr.Append(err, required("DB"))
if o.DS == nil {
err = multierr.Append(err, required("DataSource"))
}
if o.KeyStore == nil {
err = multierr.Append(err, required("KeyStore"))
Expand All @@ -67,7 +66,7 @@ func NewChain(cfg *config.TOMLConfig, opts ChainOpts) (adapters.Chain, error) {
if !cfg.IsEnabled() {
return nil, fmt.Errorf("cannot create new chain with ID %s, the chain is disabled", *cfg.ChainID)
}
c, err := newChain(*cfg.ChainID, cfg, opts.DB, opts.KeyStore, opts.Logger)
c, err := newChain(*cfg.ChainID, cfg, opts.DS, opts.KeyStore, opts.Logger)
if err != nil {
return nil, err
}
Expand All @@ -84,7 +83,7 @@ type chain struct {
lggr logger.Logger
}

func newChain(id string, cfg *config.TOMLConfig, db *sqlx.DB, ks loop.Keystore, lggr logger.Logger) (*chain, error) {
func newChain(id string, cfg *config.TOMLConfig, ds sqlutil.DataSource, ks loop.Keystore, lggr logger.Logger) (*chain, error) {
lggr = logger.With(lggr, "cosmosChainID", id)
var ch = chain{
id: id,
Expand All @@ -101,7 +100,7 @@ func newChain(id string, cfg *config.TOMLConfig, db *sqlx.DB, ks loop.Keystore,
}, nil
}),
}, lggr)
ch.txm = txm.NewTxm(db, tc, *gpe, ch.id, cfg, ks, lggr)
ch.txm = txm.NewTxm(ds, tc, *gpe, ch.id, cfg, ks, lggr)

return &ch, nil
}
Expand Down
20 changes: 10 additions & 10 deletions pkg/cosmos/txm/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,19 @@ import (
// ORM manages the data model for cosmos tx management.
type ORM struct {
chainID string
db sqlutil.Queryer
ds sqlutil.DataSource
}

// NewORM creates an ORM scoped to chainID.
func NewORM(chainID string, db sqlutil.Queryer) *ORM {
func NewORM(chainID string, ds sqlutil.DataSource) *ORM {
return &ORM{
chainID: chainID,
db: db,
ds: ds,
}
}

func (o *ORM) Transaction(ctx context.Context, fn func(*ORM) error) (err error) {
return sqlutil.Transact(ctx, o.new, o.db, nil, fn)
return sqlutil.Transact(ctx, o.new, o.ds, nil, fn)
}

// new returns a NewORM like o, but backed by q.
Expand All @@ -37,7 +37,7 @@ func (o *ORM) new(q sqlutil.Queryer) *ORM { return NewORM(o.chainID, q) }
func (o *ORM) InsertMsg(ctx context.Context, contractID, typeURL string, msg []byte) (int64, error) {
var tm adapters.Msg

err := o.db.GetContext(ctx, &tm, `INSERT INTO cosmos_msgs (contract_id, type, raw, state, cosmos_chain_id, created_at, updated_at)
err := o.ds.GetContext(ctx, &tm, `INSERT INTO cosmos_msgs (contract_id, type, raw, state, cosmos_chain_id, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, NOW(), NOW()) RETURNING *`, contractID, typeURL, msg, db.Unstarted, o.chainID)
if err != nil {
return 0, err
Expand All @@ -47,7 +47,7 @@ func (o *ORM) InsertMsg(ctx context.Context, contractID, typeURL string, msg []b

// UpdateMsgsContract updates messages for the given contract.
func (o *ORM) UpdateMsgsContract(ctx context.Context, contractID string, from, to db.State) error {
_, err := o.db.ExecContext(ctx, `UPDATE cosmos_msgs SET state = $1, updated_at = NOW()
_, err := o.ds.ExecContext(ctx, `UPDATE cosmos_msgs SET state = $1, updated_at = NOW()
WHERE cosmos_chain_id = $2 AND contract_id = $3 AND state = $4`, to, o.chainID, contractID, from)
if err != nil {
return err
Expand All @@ -61,7 +61,7 @@ func (o *ORM) GetMsgsState(ctx context.Context, state db.State, limit int64) (ad
return adapters.Msgs{}, errors.New("limit must be greater than 0")
}
var msgs adapters.Msgs
if err := o.db.SelectContext(ctx, &msgs, `SELECT * FROM cosmos_msgs WHERE state = $1 AND cosmos_chain_id = $2 ORDER BY id ASC LIMIT $3`, state, o.chainID, limit); err != nil {
if err := o.ds.SelectContext(ctx, &msgs, `SELECT * FROM cosmos_msgs WHERE state = $1 AND cosmos_chain_id = $2 ORDER BY id ASC LIMIT $3`, state, o.chainID, limit); err != nil {
return nil, err
}
return msgs, nil
Expand All @@ -70,7 +70,7 @@ func (o *ORM) GetMsgsState(ctx context.Context, state db.State, limit int64) (ad
// GetMsgs returns any messages matching ids.
func (o *ORM) GetMsgs(ctx context.Context, ids ...int64) (adapters.Msgs, error) {
var msgs adapters.Msgs
if err := o.db.SelectContext(ctx, &msgs, `SELECT * FROM cosmos_msgs WHERE id = ANY($1)`, ids); err != nil {
if err := o.ds.SelectContext(ctx, &msgs, `SELECT * FROM cosmos_msgs WHERE id = ANY($1)`, ids); err != nil {
return nil, err
}
return msgs, nil
Expand All @@ -85,9 +85,9 @@ func (o *ORM) UpdateMsgs(ctx context.Context, ids []int64, state db.State, txHas
var res sql.Result
var err error
if state == db.Broadcasted {
res, err = o.db.ExecContext(ctx, `UPDATE cosmos_msgs SET state = $1, updated_at = NOW(), tx_hash = $2 WHERE id = ANY($3)`, state, *txHash, ids)
res, err = o.ds.ExecContext(ctx, `UPDATE cosmos_msgs SET state = $1, updated_at = NOW(), tx_hash = $2 WHERE id = ANY($3)`, state, *txHash, ids)
} else {
res, err = o.db.ExecContext(ctx, `UPDATE cosmos_msgs SET state = $1, updated_at = NOW() WHERE id = ANY($2)`, state, ids)
res, err = o.ds.ExecContext(ctx, `UPDATE cosmos_msgs SET state = $1, updated_at = NOW() WHERE id = ANY($2)`, state, ids)
}
if err != nil {
return err
Expand Down
16 changes: 8 additions & 8 deletions pkg/cosmos/txm/txm.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"time"

"github.com/gogo/protobuf/proto"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"

wasmtypes "github.com/CosmWasm/wasmd/x/wasm/types"
Expand All @@ -19,15 +18,16 @@ import (
txtypes "github.com/cosmos/cosmos-sdk/types/tx"
"github.com/cosmos/cosmos-sdk/x/bank/types"

"github.com/smartcontractkit/chainlink-cosmos/pkg/cosmos/adapters"
"github.com/smartcontractkit/chainlink-cosmos/pkg/cosmos/client"
"github.com/smartcontractkit/chainlink-cosmos/pkg/cosmos/config"
"github.com/smartcontractkit/chainlink-cosmos/pkg/cosmos/db"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/loop"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
"github.com/smartcontractkit/chainlink-common/pkg/utils"

"github.com/smartcontractkit/chainlink-cosmos/pkg/cosmos/adapters"
"github.com/smartcontractkit/chainlink-cosmos/pkg/cosmos/client"
"github.com/smartcontractkit/chainlink-cosmos/pkg/cosmos/config"
"github.com/smartcontractkit/chainlink-cosmos/pkg/cosmos/db"
)

var (
Expand All @@ -49,11 +49,11 @@ type Txm struct {
}

// NewTxm creates a txm. Uses simulation so should only be used to send txes to trusted contracts i.e. OCR.
func NewTxm(db *sqlx.DB, tc func() (client.ReaderWriter, error), gpe client.ComposedGasPriceEstimator, chainID string, cfg config.Config, ks loop.Keystore, lggr logger.Logger) *Txm {
func NewTxm(ds sqlutil.DataSource, tc func() (client.ReaderWriter, error), gpe client.ComposedGasPriceEstimator, chainID string, cfg config.Config, ks loop.Keystore, lggr logger.Logger) *Txm {
keystoreAdapter := newKeystoreAdapter(ks, cfg.Bech32Prefix())
return &Txm{
newMsgs: make(chan struct{}, 1), // buffered to hold one pending request while unblocking callers
orm: NewORM(chainID, db),
orm: NewORM(chainID, ds),
lggr: logger.Sugared(lggr).Named("Txm"),
tc: tc,
keystoreAdapter: keystoreAdapter,
Expand Down
Loading