Skip to content
This repository has been archived by the owner on Jan 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #55 from itzmeanjan/develop
Browse files Browse the repository at this point in the history
More graphQL query methods & more 👇
  • Loading branch information
itzmeanjan authored Feb 18, 2021
2 parents 3c448ed + ee91d9a commit b53e479
Show file tree
Hide file tree
Showing 17 changed files with 1,799 additions and 311 deletions.
74 changes: 73 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,13 @@ curl -s localhost:7000/v1/synced | jq
}
```

- You can check how many active websocket sessions being managed by your `ette` deployment by


```bash
curl -s localhost:7000/v1/stat | jq
```

---

### Production deployment of `ette` using **systemd**
Expand Down Expand Up @@ -454,6 +461,15 @@ type Block {
}
```

Method | Parameters | Possible use case
--- | --- | ---
`blockByHash` | hash: String! | When you know block hash & want to get whole block data back
`blockByNumber` | number: String! | When you know block number & want to get whole block data back
`blocksByNumberRange` | from: String!, to: String! | When you've a block number range & want to get all blocks in that range, in a single call
`blocksByTimeRange` | from: String!, to: String! | When you've unix timestamp range & want to get all blocks in that range, in a single call

---

### Historical Transaction Data ( GraphQL API ) 🤩

You can query transaction data from `ette`, using following GraphQL methods.
Expand All @@ -464,15 +480,32 @@ You can query transaction data from `ette`, using following GraphQL methods.

```graphql
type Query {
transaction(hash: String!): Transaction!

transactionCountByBlockHash(hash: String!): Int!
transactionsByBlockHash(hash: String!): [Transaction!]!

transactionCountByBlockNumber(number: String!): Int!
transactionsByBlockNumber(number: String!): [Transaction!]!
transaction(hash: String!): Transaction!

transactionCountFromAccountByNumberRange(account: String!, from: String!, to: String!): Int!
transactionsFromAccountByNumberRange(account: String!, from: String!, to: String!): [Transaction!]!

transactionCountFromAccountByTimeRange(account: String!, from: String!, to: String!): Int!
transactionsFromAccountByTimeRange(account: String!, from: String!, to: String!): [Transaction!]!

transactionCountToAccountByNumberRange(account: String!, from: String!, to: String!): Int!
transactionsToAccountByNumberRange(account: String!, from: String!, to: String!): [Transaction!]!

transactionCountToAccountByTimeRange(account: String!, from: String!, to: String!): Int!
transactionsToAccountByTimeRange(account: String!, from: String!, to: String!): [Transaction!]!

transactionCountBetweenAccountsByNumberRange(fromAccount: String!, toAccount: String!, from: String!, to: String!): Int!
transactionsBetweenAccountsByNumberRange(fromAccount: String!, toAccount: String!, from: String!, to: String!): [Transaction!]!

transactionCountBetweenAccountsByTimeRange(fromAccount: String!, toAccount: String!, from: String!, to: String!): Int!
transactionsBetweenAccountsByTimeRange(fromAccount: String!, toAccount: String!, from: String!, to: String!): [Transaction!]!

contractsCreatedFromAccountByNumberRange(account: String!, from: String!, to: String!): [Transaction!]!
contractsCreatedFromAccountByTimeRange(account: String!, from: String!, to: String!): [Transaction!]!
transactionFromAccountWithNonce(account: String!, nonce: String!): Transaction!
Expand All @@ -498,6 +531,31 @@ type Transaction {
}
```

Method | Parameters | Possible use case
--- | --- | ---
`transaction` | hash: String! | When you know txHash & want to get that tx data
`transactionCountByBlockHash` | hash: String! | When you know block hash & want to get count of tx(s) packed in that block
`transactionsByBlockHash` | hash: String! | When you know block hash & want to get all tx(s) packed in that block
`transactionCountByBlockNumber` | number: String! | When you know block number & want to get count of tx(s) packed in that block
`transactionsByBlockNumber` | number: String! | When you know block number & want to get all tx(s) packed in that block
`transactionCountFromAccountByNumberRange` | account: String!, from: String!, to: String! | When you know tx sender address, block number range & want to find out how many tx(s) were sent by this address in that certain block number range
`transactionsFromAccountByNumberRange` | account: String!, from: String!, to: String! | When you know tx sender address, block number range & want to find out all tx(s) that were sent by this address in that certain block number range
`transactionCountFromAccountByTimeRange` | account: String!, from: String!, to: String! | When you know tx sender address, unix time stamp range & want to find out how many tx(s) were sent by this address in that certain timespan
`transactionsFromAccountByTimeRange` | account: String!, from: String!, to: String! | When you know tx sender address, unix time stamp range & want to find out all tx(s) that were sent by this address in that certain timespan
`transactionCountToAccountByNumberRange` | account: String!, from: String!, to: String! | When you know tx receiver address, block number range & want to find out how many tx(s) were sent to this address in that certain block number range
`transactionsToAccountByNumberRange` | account: String!, from: String!, to: String! | When you know tx receiver address, block number range & want to find out all tx(s) that were sent to this address in that certain block number range
`transactionCountToAccountByTimeRange` | account: String!, from: String!, to: String! | When you know tx receiver address, unix time stamp range & want to find out how many tx(s) were sent to this address in that certain timespan
`transactionsToAccountByTimeRange` | account: String!, from: String!, to: String! | When you know tx receiver address, unix time stamp range & want to find out all tx(s) that were sent to this address in that certain timespan
`transactionCountBetweenAccountsByNumberRange` | fromAccount: String!, toAccount: String!, from: String!, to: String! | When you know tx sender & receiver addresses, block number range & want to find out how many tx(s) were sent from sender to receiver in that certain block number range
`transactionsBetweenAccountsByNumberRange` | fromAccount: String!, toAccount: String!, from: String!, to: String! | When you know tx sender & receiver addresses, block number range & want to find out all tx(s) that were sent from sender to receiver in that certain block number range
`transactionCountBetweenAccountsByTimeRange` | fromAccount: String!, toAccount: String!, from: String!, to: String! | When you know tx sender & receiver addresses, unix timestamp range & want to find out how many tx(s) were sent from sender to receiver in that certain timespan
`transactionsBetweenAccountsByTimeRange` | fromAccount: String!, toAccount: String!, from: String!, to: String! | When you know tx sender & receiver addresses, unix timestamp range & want to find out all tx(s) that were sent from sender to receiver in that certain timespan
`contractsCreatedFromAccountByNumberRange` | account: String!, from: String!, to: String! | When you know EOA's _( externally owned account )_ address & want to find out all contracts created by that account in block number range
`contractsCreatedFromAccountByTimeRange` | account: String!, from: String!, to: String! | When you know EOA's _( externally owned account )_ address & want to find out all contracts created by that account in certain time span
`transactionFromAccountWithNonce` | account: String!, nonce: String! | When you have EOA's address & nonce value of it, you can pin point to that tx. This can be used to iterate through all tx(s) from this account, by updating nonce.

---

### Historical Event Data ( GraphQL API ) 🤩

You can ask `ette` for event data using GraphQL API.
Expand Down Expand Up @@ -533,6 +591,20 @@ type Event {
}
```

Method | Parameters | Possible use case
--- | --- | ---
`eventsFromContractByNumberRange` | contract: String!, from: String!, to: String! | When you've one contract address, block number range & you want to find out all events emitted by that contract in given block range
`eventsFromContractByTimeRange` | contract: String!, from: String!, to: String! | When you know contract address, unix time stamp range & you want to find out all events emitted by that contract in given timespan
`eventsByBlockHash` | hash: String! | When you've block hash & want to find out all events emitted in tx(s) packed in that block
`eventsByTxHash` | hash: String! | When you've txHash & want to find out all events emitted during execution of that tx
`eventsFromContractWithTopicsByNumberRange` | contract: String!, from: String!, to: String!, topics: [String!]! | When you've smart contract address, block number range & an ordered list of event log's topic signature(s), you can find out all events emitted by that contract with specific signature(s) in block range
`eventsFromContractWithTopicsByTimeRange` | contract: String!, from: String!, to: String!, topics: [String!]! | When you've smart contract address, unix time stamp range & an ordered list of event log's topic signature(s), you can find out all events emitted by that contract with specific signature(s) in given timespan
`lastXEventsFromContract` | contract: String!, x: Int! | When you know just contract address & want to find out last **X** events emitted by that contract **[ Very useful sometimes 😅 ]**
`eventByBlockHashAndLogIndex` | hash: String!, index: String! | When you know block hash, index of event log in block & want to get back specific event in that position
`eventByBlockHashAndLogIndex` | number: String!, index: String! | When you know block number, index of event log in block & want to get back specific event in that position

---

> Browser based GraphQL Playground : **/v1/graphql-playground** 👇🤩

![graphql_playground](./sc/graphQL_playground.png)
Expand Down
5 changes: 5 additions & 0 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/itzmeanjan/ette/app/db"
"github.com/itzmeanjan/ette/app/rest"
"github.com/itzmeanjan/ette/app/rest/graph"
srv "github.com/itzmeanjan/ette/app/services"
ss "github.com/itzmeanjan/ette/app/snapshot"
"gorm.io/gorm"
)
Expand Down Expand Up @@ -160,6 +161,10 @@ func Run(configFile, subscriptionPlansFile string) {
// Pushing block header propagation listener to another thread of execution
go blk.SubscribeToNewBlocks(_connection, _db, _status, &_redisInfo)

// Periodic clean up job being started, to be run every 24 hours to clean up
// delivery history data, older than 24 hours
go srv.DeliveryHistoryCleanUpService(_db)

// Starting http server on main thread
rest.RunHTTPServer(_db, _status, _redisClient)
}
44 changes: 22 additions & 22 deletions app/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,6 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm

}

if !HasBlockFinalized(status, packedBlock.Block.Number) {

log.Print(color.LightRed.Sprintf("[x] Non-final block %d with 0 tx(s) [ Took : %s | Latest Block : %d | In Queue : %d ]", packedBlock.Block.Number, time.Now().UTC().Sub(startingAt), status.GetLatestBlockNumber(), GetUnfinalizedQueueLength(redis)))

// Pushing into unfinalized block queue, to be picked up only when
// finality for this block has been achieved
PushBlockIntoUnfinalizedQueue(redis, fmt.Sprintf("%d", packedBlock.Block.Number))
return true

}

// If block doesn't contain any tx, we'll attempt to persist only block
if err := db.StoreBlock(_db, packedBlock, status); err != nil {

Expand All @@ -86,6 +75,17 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm

}

if !HasBlockFinalized(status, packedBlock.Block.Number) {

log.Print(color.LightRed.Sprintf("[x] Non-final block %d with 0 tx(s) [ Took : %s | Latest Block : %d | In Queue : %d ]", packedBlock.Block.Number, time.Now().UTC().Sub(startingAt), status.GetLatestBlockNumber(), GetUnfinalizedQueueLength(redis)))

// Pushing into unfinalized block queue, to be picked up only when
// finality for this block has been achieved
PushBlockIntoUnfinalizedQueue(redis, fmt.Sprintf("%d", packedBlock.Block.Number))
return true

}

// Successfully processed block
log.Print(color.Green.Sprintf("[+] Block %d with 0 tx(s) [ Took : %s ]", block.NumberU64(), time.Now().UTC().Sub(startingAt)))
status.IncrementBlocksProcessed()
Expand Down Expand Up @@ -187,17 +187,6 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm

}

if !HasBlockFinalized(status, packedBlock.Block.Number) {

log.Print(color.LightRed.Sprintf("[x] Non-final block %d with %d tx(s) [ Took : %s | Latest Block : %d | In Queue : %d ]", packedBlock.Block.Number, block.Transactions().Len(), time.Now().UTC().Sub(startingAt), status.GetLatestBlockNumber(), GetUnfinalizedQueueLength(redis)))

// Pushing into unfinalized block queue, to be picked up only when
// finality for this block has been achieved
PushBlockIntoUnfinalizedQueue(redis, fmt.Sprintf("%d", packedBlock.Block.Number))
return true

}

// If block doesn't contain any tx, we'll attempt to persist only block
if err := db.StoreBlock(_db, packedBlock, status); err != nil {

Expand All @@ -209,6 +198,17 @@ func ProcessBlockContent(client *ethclient.Client, block *types.Block, _db *gorm

}

if !HasBlockFinalized(status, packedBlock.Block.Number) {

log.Print(color.LightRed.Sprintf("[x] Non-final block %d with %d tx(s) [ Took : %s | Latest Block : %d | In Queue : %d ]", packedBlock.Block.Number, block.Transactions().Len(), time.Now().UTC().Sub(startingAt), status.GetLatestBlockNumber(), GetUnfinalizedQueueLength(redis)))

// Pushing into unfinalized block queue, to be picked up only when
// finality for this block has been achieved
PushBlockIntoUnfinalizedQueue(redis, fmt.Sprintf("%d", packedBlock.Block.Number))
return true

}

// Successfully processed block
log.Print(color.Green.Sprintf("[+] Block %d with %d tx(s) [ Took : %s ]", block.NumberU64(), block.Transactions().Len(), time.Now().UTC().Sub(startingAt)))

Expand Down
19 changes: 17 additions & 2 deletions app/block/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,24 @@ func SubscribeToNewBlocks(connection *d.BlockChainNodeConnection, _db *gorm.DB,
// Starting syncer in another thread, where it'll keep fetching
// blocks from highest block number it fetched last time to current network block number
// i.e. trying to fill up gap, which was caused when `ette` was offline

// Upper limit of syncing, in terms of block number
from := header.Number.Uint64() - 1
// Lower limit of syncing, in terms of block number
//
// Subtracting confirmation required block number count, due to
// the fact it might be case those block contents might have changed due to
// some reorg, in the time duration, when `ette` was offline
//
// Backward traversal mechanism gives us more recent blockchain happenings to cover
go SyncBlocksByRange(connection.RPC, _db, redis, header.Number.Uint64()-1, status.MaxBlockNumberAtStartUp(), status)
// So we've to take a look at those
to := status.MaxBlockNumberAtStartUp() - cfg.GetBlockConfirmations()

// block number can never be negative
if to < 0 {
to = 0
}

go SyncBlocksByRange(connection.RPC, _db, redis, from, to, status)

}
// Making sure that when next latest block header is received, it'll not
Expand Down
37 changes: 37 additions & 0 deletions app/data/stat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package data

import "sync/atomic"

// ActiveSubscriptions - Keeps track of how many active websocket
// connections being maintained now by `ette`
type ActiveSubscriptions struct {
Count uint64
}

// Increment - Safely increment count by `X`
func (a *ActiveSubscriptions) Increment(by uint64) {
atomic.AddUint64(&a.Count, by)
}

// Decrement - Safely decrement count by `X`
func (a *ActiveSubscriptions) Decrement(by uint64) {
atomic.AddUint64(&a.Count, ^uint64(by-1))
}

// SendReceiveCounter - Keeps track of how many read & write ops
// were performed to & from socket during life time of one single
// websocket connection
type SendReceiveCounter struct {
Send uint64
Receive uint64
}

// IncrementSend -To be invoked when new data written into socket
func (s *SendReceiveCounter) IncrementSend(by uint64) {
atomic.AddUint64(&s.Send, by)
}

// IncrementReceive - To be invoked when new data read from socket
func (s *SendReceiveCounter) IncrementReceive(by uint64) {
atomic.AddUint64(&s.Receive, by)
}
Loading

0 comments on commit b53e479

Please sign in to comment.