Skip to content
This repository has been archived by the owner on Jan 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #52 from itzmeanjan/develop
Browse files Browse the repository at this point in the history
Improved pubsub with always publish attempt
  • Loading branch information
itzmeanjan authored Jan 27, 2021
2 parents 2d61a8b + 2937977 commit 5fe5c25
Show file tree
Hide file tree
Showing 16 changed files with 75 additions and 62 deletions.
3 changes: 1 addition & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,5 @@ graphql_gen:
build:
go build -o ette

run:
go build -o ette
run: build
./ette
11 changes: 10 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ And that's `ette`

- Make sure you've Go _( >= 1.15 )_ installed
- You need to also install & set up PostgreSQL. I found [this](https://www.digitalocean.com/community/tutorials/how-to-install-and-use-postgresql-on-ubuntu-20-04) guide helpful.

> Make sure you've `pgcrypto` extension enabled on PostgreSQL Database.
> Check existing extensions using : `\dx`
> Create extension using : `create extension pgcrypto;`
- Redis needs to be installed too. Consider following [this](https://www.digitalocean.com/community/tutorials/how-to-install-and-secure-redis-on-ubuntu-20-04) guide.

> Note : Redis **v6.0.6** is required
Expand Down Expand Up @@ -101,7 +108,7 @@ cd ette
- Skipping `RedisPassword` is absolutely fine, if you don't want to use any password in Redis instance. [ **Not recommended** ]
- Replace `Domain` with your domain name i.e. `ette.company.com`
- Set `Production` to `yes` before running it in production; otherwise you can simply skip it
- `ette` can be run in any of 5 possible modes, which can be set by `EtteMode`
- `ette` can be run in any of 👇 5 possible modes, which can be set by `EtteMode`

```json
{
Expand Down Expand Up @@ -532,6 +539,8 @@ type Event {

### Real time notification for mined blocks ⛏

![pubsub-ette](./sc/pubsub-ette.png)

For listening to blocks getting mined, connect to `/v1/ws` endpoint using websocket client library & once connected, you need to send **subscription** request with 👇 payload _( JSON encoded )_

```json
Expand Down
5 changes: 4 additions & 1 deletion app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ func bootstrap(configFile, subscriptionPlansFile string) (*d.BlockChainNodeConne
graph.GetDatabaseConnection(_db)

_status := &d.StatusHolder{
State: &d.SyncState{BlockCountAtStartUp: db.GetBlockCount(_db)},
State: &d.SyncState{
BlockCountAtStartUp: db.GetBlockCount(_db),
MaxBlockNumberAtStartUp: db.GetCurrentBlockNumber(_db),
},
Mutex: &sync.RWMutex{},
}

Expand Down
12 changes: 0 additions & 12 deletions app/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,18 +164,6 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm
// we're exiting from this context, while putting this block number in retry queue
if !(result.Failure == 0) {

// If it's being run in mode 2, no need to put it in retry queue
//
// We can miss blocks, but will not be able deliver it over websocket channel
// to subscribed clients
//
// @todo This needs to be improved, so that even if we miss a block now
// we get to process & publish it over websocket based channel, where
// clients subscribe for real-time data
if !(cfg.Get("EtteMode") == "1" || cfg.Get("EtteMode") == "3") {
return
}

PushBlockIntoRetryQueue(redis, block.Number().String())
return

Expand Down
18 changes: 2 additions & 16 deletions app/block/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/gookit/color"
cfg "github.com/itzmeanjan/ette/app/config"
d "github.com/itzmeanjan/ette/app/data"
"github.com/itzmeanjan/ette/app/db"
"gorm.io/gorm"
Expand All @@ -25,19 +24,6 @@ func FetchBlockByHash(client *ethclient.Client, hash common.Hash, number string,

block, err := client.BlockByHash(context.Background(), hash)
if err != nil {

// If it's being run in mode 2, no need to put it in retry queue
//
// We can miss blocks, but will not be able deliver it over websocket channel
// to subscribed clients
//
// @todo This needs to be improved, so that even if we miss a block now
// we get to process & publish it over websocket based channel, where
// clients subscribe for real-time data
if !(cfg.Get("EtteMode") == "1" || cfg.Get("EtteMode") == "3") {
return
}

// Pushing block number into Redis queue for retrying later
PushBlockIntoRetryQueue(redis, number)

Expand All @@ -50,7 +36,7 @@ func FetchBlockByHash(client *ethclient.Client, hash common.Hash, number string,
}

// FetchBlockByNumber - Fetching block content using block number
func FetchBlockByNumber(client *ethclient.Client, number uint64, _db *gorm.DB, redis *d.RedisInfo, _status *d.StatusHolder) {
func FetchBlockByNumber(client *ethclient.Client, number uint64, _db *gorm.DB, redis *d.RedisInfo, publishable bool, _status *d.StatusHolder) {

// Starting block processing at
startingAt := time.Now().UTC()
Expand All @@ -67,7 +53,7 @@ func FetchBlockByNumber(client *ethclient.Client, number uint64, _db *gorm.DB, r
return
}

ProcessBlockContent(client, block, _db, redis, false, _status, startingAt)
ProcessBlockContent(client, block, _db, redis, publishable, _status, startingAt)

}

Expand Down
22 changes: 10 additions & 12 deletions app/block/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/gookit/color"
cfg "github.com/itzmeanjan/ette/app/config"
d "github.com/itzmeanjan/ette/app/data"
"github.com/itzmeanjan/ette/app/db"
"gorm.io/gorm"
)

Expand All @@ -28,9 +27,6 @@ func SubscribeToNewBlocks(connection *d.BlockChainNodeConnection, _db *gorm.DB,
// Scheduling unsubscribe, to be executed when end of this execution scope is reached
defer subs.Unsubscribe()

// Last time `ette` stopped syncing here
currentHighestBlockNumber := db.GetCurrentBlockNumber(_db)

// Flag to check for whether this is first time block header being received or not
//
// If yes, we'll start syncer to fetch all block in range (last block processed, latest block)
Expand Down Expand Up @@ -58,24 +54,25 @@ func SubscribeToNewBlocks(connection *d.BlockChainNodeConnection, _db *gorm.DB,
// Starting now, to be used for calculating system performance, uptime etc.
status.SetStartedAt()

// Starting go routine for fetching blocks `ette` failed to process in previous attempt
//
// Uses Redis backed queue for fetching pending block hash & retries
go RetryQueueManager(connection.RPC, _db, redis, status)

// If historical data query features are enabled
// only then we need to sync to latest state of block chain
if cfg.Get("EtteMode") == "1" || cfg.Get("EtteMode") == "3" {

// Starting syncer in another thread, where it'll keep fetching
// blocks from highest block number it fetched last time to current network block number
// i.e. trying to fill up gap, which was caused when `ette` was offline
//
// Backward traversal mechanism gives us more recent blockchain happenings to cover
go SyncBlocksByRange(connection.RPC, _db, redis, header.Number.Uint64()-1, currentHighestBlockNumber, status)

// Starting go routine for fetching blocks `ette` failed to process in previous attempt
//
// Uses Redis backed queue for fetching pending block hash & retries
go RetryQueueManager(connection.RPC, _db, redis, status)
go SyncBlocksByRange(connection.RPC, _db, redis, header.Number.Uint64()-1, status.MaxBlockNumberAtStartUp(), status)

// Making sure on when next latest block header is received, it'll not
// start another syncer
}
// Making sure that when next latest block header is received, it'll not
// start another syncer
first = false

}
Expand Down Expand Up @@ -127,6 +124,7 @@ func SubscribeToNewBlocks(connection *d.BlockChainNodeConnection, _db *gorm.DB,
_oldestBlock,
_db,
redis,
false,
status)

})
Expand Down
10 changes: 10 additions & 0 deletions app/block/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ func PublishBlock(block *db.PackedBlock, redis *d.RedisInfo) {
return
}

log.Printf(color.LightMagenta.Sprintf("[*] Published block %d", block.Block.Number))

PublishTxs(block.Block.Number, block.Transactions, redis)

}
Expand All @@ -46,10 +48,18 @@ func PublishTxs(blockNumber uint64, txs []*db.PackedTransaction, redis *d.RedisI
return
}

var eventCount uint64

for _, t := range txs {
PublishTx(blockNumber, t, redis)

// how many events are present in this block, in total
eventCount += uint64(len(t.Events))
}

log.Printf(color.LightMagenta.Sprintf("[*] Published %d transactions of block %d", len(txs), blockNumber))
log.Printf(color.LightMagenta.Sprintf("[*] Published %d events of block %d", eventCount, blockNumber))

}

// PublishTx - Publishes tx & events in tx, related data to respective
Expand Down
20 changes: 14 additions & 6 deletions app/block/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,24 @@ func RetryQueueManager(client *ethclient.Client, _db *gorm.DB, redis *data.Redis
// which will be picked up & processed
//
// This will stop us from blindly creating too many go routines
func(blockNumber uint64) {
func(_blockNumber uint64) {

wp.Submit(func() {

FetchBlockByNumber(client,
parsedBlockNumber,
_db,
redis,
status)
// This check helps us in determining whether we should
// consider sending notification over pubsub channel for this block
// whose processing failed due to some reasons in last attempt
if status.MaxBlockNumberAtStartUp() <= _blockNumber {

FetchBlockByNumber(client, _blockNumber, _db, redis, true, status)
return

}

FetchBlockByNumber(client, _blockNumber, _db, redis, false, status)

})

}(parsedBlockNumber)
}
}
Expand Down
4 changes: 2 additions & 2 deletions app/block/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func SyncBlocksByRange(client *ethclient.Client, _db *gorm.DB, redis *data.Redis

}

FetchBlockByNumber(j.Client, j.Block, j.DB, j.Redis, j.Status)
FetchBlockByNumber(j.Client, j.Block, j.DB, j.Redis, false, j.Status)

})
}
Expand Down Expand Up @@ -154,7 +154,7 @@ func SyncMissingBlocksInDB(client *ethclient.Client, _db *gorm.DB, redis *data.R
block := db.GetBlock(j.DB, j.Block)
if block == nil && !CheckBlockInRetryQueue(redis, fmt.Sprintf("%d", j.Block)) {
// If not found, block fetching cycle is run, for this block
FetchBlockByNumber(j.Client, j.Block, j.DB, j.Redis, j.Status)
FetchBlockByNumber(j.Client, j.Block, j.DB, j.Redis, false, j.Status)
}

})
Expand Down
23 changes: 18 additions & 5 deletions app/data/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ import (

// SyncState - Whether `ette` is synced with blockchain or not
type SyncState struct {
Done uint64
StartedAt time.Time
BlockCountAtStartUp uint64
NewBlocksInserted uint64
LatestBlockNumber uint64
Done uint64
StartedAt time.Time
BlockCountAtStartUp uint64
MaxBlockNumberAtStartUp uint64
NewBlocksInserted uint64
LatestBlockNumber uint64
}

// BlockCountInDB - Blocks currently present in database
Expand All @@ -36,6 +37,18 @@ type StatusHolder struct {
Mutex *sync.RWMutex
}

// MaxBlockNumberAtStartUp - Attempting to safely read latest block number
// when `ette` was started, will help us in deciding whether a missing
// block related notification needs to be sent on a pubsub channel or not
func (s *StatusHolder) MaxBlockNumberAtStartUp() uint64 {

s.Mutex.RLock()
defer s.Mutex.RUnlock()

return s.State.MaxBlockNumberAtStartUp

}

// SetStartedAt - Sets started at time
func (s *StatusHolder) SetStartedAt() {

Expand Down
2 changes: 1 addition & 1 deletion app/db/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (u *Users) ToJSON() []byte {
// This is to be used for controlling client application's access
// to resources they're requesting
type DeliveryHistory struct {
ID uint64 `gorm:"column:id;type:bigserial;primaryKey"`
ID string `gorm:"column:id;type:uuid;default:gen_random_uuid();primaryKey"`
Client string `gorm:"column:client;type:char(42);not null;index"`
TimeStamp time.Time `gorm:"column:ts;type:timestamp;not null;index:,sort:asc"`
EndPoint string `gorm:"column:endpoint;type:varchar(100);not null"`
Expand Down
1 change: 0 additions & 1 deletion app/pubsub/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@ func (b *BlockConsumer) SendData(data interface{}) bool {
return false
}

log.Printf("[+] Delivered `block` data to client\n")
return true

}
Expand Down
2 changes: 1 addition & 1 deletion app/pubsub/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,8 @@ func (e *EventConsumer) SendData(data interface{}) bool {
return false
}

log.Printf("[+] Delivered `event` data to client\n")
return true

}

// Unsubscribe - Unsubscribe from event data publishing topic, to be called
Expand Down
2 changes: 1 addition & 1 deletion app/pubsub/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,8 @@ func (t *TransactionConsumer) SendData(data interface{}) bool {
return false
}

log.Printf("[+] Delivered `transaction` data to client\n")
return true

}

// Unsubscribe - Unsubscribe from transactions pubsub topic, which client has subscribed to
Expand Down
2 changes: 1 addition & 1 deletion db/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ create table users (
create index on users(address);

create table delivery_history (
id bigserial primary key,
id uuid default gen_random_uuid() primary key,
client char(42) not null,
ts timestamp not null,
endpoint varchar(100) not null,
Expand Down
Binary file added sc/pubsub-ette.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit 5fe5c25

Please sign in to comment.