Skip to content

Commit

Permalink
[TECHDEBT] Consolidated PostgresContext and PostgresDB (#149) (#219)
Browse files Browse the repository at this point in the history
## Description

Tend to tech debt in the `persistence` module related to simplifying the `PostgresContext` struct.

## Issue

Fixes #149

## Type of change

Please mark the relevant option(s):

- [ ] New feature, functionality or library
- [ ] Bug fix (non-breaking change which fixes an issue)
- [x] Code health or cleanup
- [ ] Major breaking change
- [ ] Documentation
- [ ] Other: <REPLACE_ME>

## List of changes

- Consolidates `PostgresContext` and `PostgresDB`
- Reduces scope of `Tx` and `BlockStore`
- Improve error handling in `NewFuzzTestPostgresContext`

## Testing

- [x] `make test_all`
- [ ] [LocalNet](https://github.com/pokt-network/pocket/blob/main/docs/development/README.md) w/ all of the steps outlined in the `README`

## Checklist

- [x] I have performed a self-review of my own code
- [x] I have commented my code, particularly in hard-to-understand areas
- [x] I have tested my changes using the available tooling
- [ ] If applicable, I have made corresponding changes to related local or global README
- [ ] If applicable, I have added new diagrams using [mermaid.js](https://mermaid-js.github.io)
- [ ] If applicable, I have added tests that prove my fix is effective or that my feature works
  • Loading branch information
Olshansk authored Sep 23, 2022
1 parent 20d91ae commit 82df934
Show file tree
Hide file tree
Showing 13 changed files with 140 additions and 139 deletions.
3 changes: 2 additions & 1 deletion p2p/module_raintree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"os"
"path/filepath"
"sort"
Expand Down Expand Up @@ -288,7 +289,7 @@ func prepareBusMock(t *testing.T, wg *sync.WaitGroup, consensusMock *modulesMock

busMock.EXPECT().PublishEventToBus(gomock.Any()).Do(func(e *debug.PocketEvent) {
wg.Done()
fmt.Println("App specific bus mock publishing event to bus")
log.Println("App specific bus mock publishing event to bus")
}).MaxTimes(1) // Using `MaxTimes` rather than `Times` because originator node implicitly handles the message

busMock.EXPECT().GetConsensusModule().Return(consensusMock).AnyTimes()
Expand Down
10 changes: 8 additions & 2 deletions persistence/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [0.0.0.5] - 2022-09-14

- Consolidated `PostgresContext` and `PostgresDb` into a single structure

## [0.0.0.4] - 2022-08-25

**Encapsulate structures previously in shared [#163](github.com/pokt-network/pocket/issues/163)**
- Renamed schema -> types

- Renamed schema -> types
- Added genesis, config, and unstaking proto files from shared
- Ensured proto structures implement shared interfaces
- Populate `PersistenceGenesisState` uses shared interfaces in order to accept `MockPersistenceGenesisState`
- Populate `PersistenceGenesisState` uses shared interfaces in order to accept `MockPersistenceGenesisState`
- ^ Same applies for `PersistenceConfig`
- Bumped cleanup TODOs to #149 due to scope size of #163

Expand Down
24 changes: 12 additions & 12 deletions persistence/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ func (p PostgresContext) GetAccountAmount(address []byte, height int64) (amount
}

func (p PostgresContext) getAccountAmountStr(address string, height int64) (amount string, err error) {
ctx, txn, err := p.DB.GetCtxAndTxn()
ctx, tx, err := p.GetCtxAndTx()
if err != nil {
return
}

amount = defaultAccountAmountStr
if err = txn.QueryRow(ctx, types.GetAccountAmountQuery(address, height)).Scan(&amount); err != pgx.ErrNoRows {
if err = tx.QueryRow(ctx, types.GetAccountAmountQuery(address, height)).Scan(&amount); err != pgx.ErrNoRows {
return
}

Expand All @@ -53,7 +53,7 @@ func (p PostgresContext) SubtractAccountAmount(address []byte, amount string) er
// DISCUSS(team): If we are okay with `GetAccountAmount` return 0 as a default, this function can leverage
// `operationAccountAmount` with `*orig = *delta` and make everything much simpler.
func (p PostgresContext) SetAccountAmount(address []byte, amount string) error {
ctx, txn, err := p.DB.GetCtxAndTxn()
ctx, tx, err := p.GetCtxAndTx()
if err != nil {
return err
}
Expand All @@ -62,7 +62,7 @@ func (p PostgresContext) SetAccountAmount(address []byte, amount string) error {
return err
}
// DISCUSS(team): Do we want to panic if `amount < 0` here?
if _, err = txn.Exec(ctx, types.InsertAccountAmountQuery(hex.EncodeToString(address), amount, height)); err != nil {
if _, err = tx.Exec(ctx, types.InsertAccountAmountQuery(hex.EncodeToString(address), amount, height)); err != nil {
return err
}
return nil
Expand All @@ -76,28 +76,28 @@ func (p *PostgresContext) operationAccountAmount(address []byte, deltaAmount str

// TODO(andrew): remove address param
func (p PostgresContext) InsertPool(name string, address []byte, amount string) error {
ctx, txn, err := p.DB.GetCtxAndTxn()
ctx, tx, err := p.GetCtxAndTx()
if err != nil {
return err
}
height, err := p.GetHeight()
if err != nil {
return err
}
if _, err = txn.Exec(ctx, types.InsertPoolAmountQuery(name, amount, height)); err != nil {
if _, err = tx.Exec(ctx, types.InsertPoolAmountQuery(name, amount, height)); err != nil {
return err
}
return nil
}

func (p PostgresContext) GetPoolAmount(name string, height int64) (amount string, err error) {
ctx, txn, err := p.DB.GetCtxAndTxn()
ctx, tx, err := p.GetCtxAndTx()
if err != nil {
return
}

amount = defaultAccountAmountStr
if err = txn.QueryRow(ctx, types.GetPoolAmountQuery(name, height)).Scan(&amount); err != pgx.ErrNoRows {
if err = tx.QueryRow(ctx, types.GetPoolAmountQuery(name, height)).Scan(&amount); err != pgx.ErrNoRows {
return
}

Expand All @@ -122,15 +122,15 @@ func (p PostgresContext) SubtractPoolAmount(name string, amount string) error {
// `operationPoolAmount` with `*orig = *delta` and make everything much simpler.
// DISCUSS(team): Do we have a use-case for this function?
func (p PostgresContext) SetPoolAmount(name string, amount string) error {
ctx, txn, err := p.DB.GetCtxAndTxn()
ctx, tx, err := p.GetCtxAndTx()
if err != nil {
return err
}
height, err := p.GetHeight()
if err != nil {
return err
}
if _, err = txn.Exec(ctx, types.InsertPoolAmountQuery(name, amount, height)); err != nil {
if _, err = tx.Exec(ctx, types.InsertPoolAmountQuery(name, amount, height)); err != nil {
return err
}
return nil
Expand All @@ -144,7 +144,7 @@ func (p *PostgresContext) operationPoolOrAccAmount(name, amount string,
op func(*big.Int, *big.Int) error,
getAmount func(string, int64) (string, error),
insert func(name, amount string, height int64) string) error {
ctx, txn, err := p.DB.GetCtxAndTxn()
ctx, tx, err := p.GetCtxAndTx()
if err != nil {
return err
}
Expand All @@ -167,7 +167,7 @@ func (p *PostgresContext) operationPoolOrAccAmount(name, amount string,
if err := op(originalAmountBig, amountBig); err != nil {
return err
}
if _, err = txn.Exec(ctx, insert(name, types.BigIntToString(originalAmountBig), height)); err != nil {
if _, err = tx.Exec(ctx, insert(name, types.BigIntToString(originalAmountBig), height)); err != nil {
return err
}
return nil
Expand Down
15 changes: 8 additions & 7 deletions persistence/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,31 @@ package persistence
import (
"encoding/binary"
"encoding/hex"
"github.com/pokt-network/pocket/persistence/types"
"log"

"github.com/pokt-network/pocket/persistence/types"
)

// OPTIMIZE(team): get from blockstore or keep in memory
func (p PostgresContext) GetLatestBlockHeight() (latestHeight uint64, err error) {
ctx, txn, err := p.DB.GetCtxAndTxn()
ctx, tx, err := p.GetCtxAndTx()
if err != nil {
return 0, err
}

err = txn.QueryRow(ctx, types.GetLatestBlockHeightQuery()).Scan(&latestHeight)
err = tx.QueryRow(ctx, types.GetLatestBlockHeightQuery()).Scan(&latestHeight)
return
}

// OPTIMIZE(team): get from blockstore or keep in cache/memory
func (p PostgresContext) GetBlockHash(height int64) ([]byte, error) {
ctx, txn, err := p.DB.GetCtxAndTxn()
ctx, tx, err := p.GetCtxAndTx()
if err != nil {
return nil, err
}

var hexHash string
err = txn.QueryRow(ctx, types.GetBlockHashQuery(height)).Scan(&hexHash)
err = tx.QueryRow(ctx, types.GetBlockHashQuery(height)).Scan(&hexHash)
if err != nil {
return nil, err
}
Expand All @@ -52,11 +53,11 @@ func (p PostgresContext) StoreBlock(blockProtoBytes []byte) error {
// INVESTIGATE: Note that we are writing this directly to the blockStore. Depending on how
// the use of the PostgresContext evolves, we may need to write this to `ContextStore` and copy
// over to `BlockStore` when the block is committed.
return p.DB.Blockstore.Put(heightToBytes(p.Height), blockProtoBytes)
return p.blockstore.Put(heightToBytes(p.Height), blockProtoBytes)
}

func (p PostgresContext) InsertBlock(height uint64, hash string, proposerAddr []byte, quorumCert []byte) error {
ctx, tx, err := p.DB.GetCtxAndTxn()
ctx, tx, err := p.GetCtxAndTx()
if err != nil {
return err
}
Expand Down
12 changes: 6 additions & 6 deletions persistence/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ func (p PostgresContext) NewSavePoint(bytes []byte) error {

func (p PostgresContext) RollbackToSavePoint(bytes []byte) error {
log.Println("TODO: RollbackToSavePoint not fully implemented")
return p.DB.Tx.Rollback(context.TODO())
return p.GetTx().Rollback(context.TODO())
}

func (p PostgresContext) AppHash() ([]byte, error) {
Expand All @@ -28,10 +28,10 @@ func (p PostgresContext) Commit() error {
log.Printf("About to commit context at height %d.\n", p.Height)

ctx := context.TODO()
if err := p.DB.Tx.Commit(context.TODO()); err != nil {
if err := p.GetTx().Commit(context.TODO()); err != nil {
return err
}
if err := p.DB.conn.Close(ctx); err != nil {
if err := p.conn.Close(ctx); err != nil {
log.Println("[TODO][ERROR] Implement connection pooling. Error when closing DB connecting...", err)

}
Expand All @@ -42,10 +42,10 @@ func (p PostgresContext) Release() error {
log.Printf("About to release context at height %d.\n", p.Height)

ctx := context.TODO()
if err := p.DB.Tx.Rollback(ctx); err != nil {
if err := p.GetTx().Rollback(ctx); err != nil {
return err
}
if err := p.DB.conn.Close(ctx); err != nil {
if err := p.conn.Close(ctx); err != nil {
log.Println("[TODO][ERROR] Implement connection pooling. Error when closing DB connecting...", err)
}
return nil
Expand All @@ -54,5 +54,5 @@ func (p PostgresContext) Release() error {
func (p PostgresContext) Close() error {
log.Printf("About to close context at height %d.\n", p.Height)

return p.DB.conn.Close(context.TODO())
return p.conn.Close(context.TODO())
}
26 changes: 9 additions & 17 deletions persistence/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"

"github.com/pokt-network/pocket/persistence/types"

"github.com/jackc/pgconn"
Expand Down Expand Up @@ -34,28 +35,19 @@ var protocolActorSchemas = []types.ProtocolActorSchema{

var _ modules.PersistenceRWContext = &PostgresContext{}

// TODO(pocket/issues/149): Consolidate `PostgresContext and PostgresDB` into a single struct and
// avoid exposing it for testing purposes after the consolidation. A helper
// with default context values should be created.
// TODO: These are only externalized for testing purposes, so they should be made private and
// it is trivial to create a helper to initial a context with some values.
type PostgresContext struct {
Height int64
DB PostgresDB
}
type PostgresDB struct {
Height int64 // TODO(olshansky): `Height` is only externalized for testing purposes. Replace with helpers...
conn *pgx.Conn
Tx pgx.Tx
Blockstore kvstore.KVStore
tx pgx.Tx
blockstore kvstore.KVStore
}

func (pg *PostgresDB) GetCtxAndTxn() (context.Context, pgx.Tx, error) {
tx, err := pg.GetTxn()
return context.TODO(), tx, err
func (pg *PostgresContext) GetCtxAndTx() (context.Context, pgx.Tx, error) {
return context.TODO(), pg.GetTx(), nil
}

func (pg *PostgresDB) GetTxn() (pgx.Tx, error) {
return pg.Tx, nil
func (pg *PostgresContext) GetTx() pgx.Tx {
return pg.tx
}

func (pg *PostgresContext) GetCtx() (context.Context, error) {
Expand Down Expand Up @@ -171,7 +163,7 @@ func initializeBlockTables(ctx context.Context, db *pgx.Conn) error {

// Exposed for testing purposes only
func (p PostgresContext) DebugClearAll() error {
ctx, tx, err := p.DB.GetCtxAndTxn()
ctx, tx, err := p.GetCtxAndTx()
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 82df934

Please sign in to comment.