Skip to content

Commit

Permalink
Merge pull request #993 from statechannels/fevm-with-events
Browse files Browse the repository at this point in the history
Add support for FEVM to our chain service
  • Loading branch information
lalexgap authored Dec 6, 2022
2 parents 9a25e38 + adc38ba commit 5d8bdf0
Show file tree
Hide file tree
Showing 11 changed files with 2,735 additions and 1,695 deletions.
15 changes: 14 additions & 1 deletion client/engine/chainservice/eth_chain_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,28 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
NitroAdjudicator "github.com/statechannels/go-nitro/client/engine/chainservice/adjudicator"
nt "github.com/statechannels/go-nitro/types"
)

// getAssetHoldings reads on-chain holdings for a channel,asset address, and block number
func getAssetHoldings(na *NitroAdjudicator.NitroAdjudicator, assetAddress common.Address, blockNumber *big.Int, channelId nt.Destination) (*big.Int, error) {
amount, err := na.Holdings(&bind.CallOpts{BlockNumber: blockNumber}, assetAddress, channelId)
if err != nil {
return big.NewInt(0), err
}

return amount, nil
}

// getChainHolding reads on-chain holdings for a channel and an asset address given a transaction and an event generated by the transaction.
func getChainHolding(na *NitroAdjudicator.NitroAdjudicator, tx *types.Transaction, event *NitroAdjudicator.NitroAdjudicatorAllocationUpdated) (common.Address, *big.Int, error) {

assetAddress, err := assetAddressForIndex(na, tx, event.AssetIndex)
if err != nil {
return assetAddress, &big.Int{}, err
}
amount, err := na.Holdings(&bind.CallOpts{BlockNumber: new(big.Int).SetUint64(event.Raw.BlockNumber)}, assetAddress, event.ChannelId)

amount, err := getAssetHoldings(na, assetAddress, new(big.Int).SetUint64(event.Raw.BlockNumber), event.ChannelId)
if err != nil {
return assetAddress, amount, err
}
Expand Down
227 changes: 176 additions & 51 deletions client/engine/chainservice/eth_chainservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"log"
"math/big"
"time"

"github.com/ethereum/go-ethereum"
Expand All @@ -16,6 +17,8 @@ import (
Token "github.com/statechannels/go-nitro/client/engine/chainservice/erc20"
"github.com/statechannels/go-nitro/protocols"
"github.com/statechannels/go-nitro/types"

"github.com/ethereum/go-ethereum/rpc"
)

var allocationUpdatedTopic = crypto.Keccak256Hash([]byte("AllocationUpdated(bytes32,uint256,uint256)"))
Expand All @@ -38,6 +41,10 @@ type EthChainService struct {
logger *log.Logger
}

// Since we only fetch events if there's a new block number
// it's safe to poll relatively frequently.
const EVENT_POLL_INTERVAL = 500 * time.Millisecond

// RESUB_INTERVAL is how often we resubscribe to log events.
// We do this to avoid https://github.com/ethereum/go-ethereum/issues/23845
// We use 2.5 minutes as the default filter timeout is 5 minutes.
Expand All @@ -53,18 +60,47 @@ func NewEthChainService(chain ethChain, na *NitroAdjudicator.NitroAdjudicator,
// Use a buffered channel so we don't have to worry about blocking on writing to the channel.
ecs := EthChainService{chain, na, naAddress, caAddress, vpaAddress, txSigner, make(chan Event, 10), logger}

err := ecs.subcribeToEvents()
return &ecs, err
if ecs.subscriptionsSupported() {
logger.Printf("Notifications are supported by the chain. Using notifications to listen for events.")
go ecs.subscribeForLogs(context.Background())
} else {
logger.Printf("Notifications are NOT supported by the chain. Using polling to listen for events.")
go ecs.pollForLogs(context.Background())
}
return &ecs, nil
}

// subscriptionsSupported returns true if the node supports subscriptions for events.
// Otherwise returns false
func (ecs *EthChainService) subscriptionsSupported() bool {
// This is slightly painful but seems like the only way to find out if notifications are supported
// We attempt to subscribe (with a query that should never return a result) and check the error
query := ethereum.FilterQuery{
Addresses: []common.Address{ecs.naAddress},
FromBlock: big.NewInt(0),
ToBlock: big.NewInt(0),
}

logs := make(chan ethTypes.Log, 1)
sub, err := ecs.chain.SubscribeFilterLogs(context.Background(), query, logs)
if err == rpc.ErrNotificationsUnsupported {
return false
}

sub.Unsubscribe()
return true
}

// defaultTxOpts returns transaction options suitable for most transaction submissions
func (ecs *EthChainService) defaultTxOpts() *bind.TransactOpts {
return &bind.TransactOpts{
From: ecs.txSigner.From,
Nonce: ecs.txSigner.Nonce,
Signer: ecs.txSigner.Signer,
GasPrice: ecs.txSigner.GasPrice,
GasLimit: ecs.txSigner.GasLimit,
From: ecs.txSigner.From,
Nonce: ecs.txSigner.Nonce,
Signer: ecs.txSigner.Signer,
GasFeeCap: ecs.txSigner.GasFeeCap,
GasTipCap: ecs.txSigner.GasTipCap,
GasLimit: ecs.txSigner.GasLimit,
GasPrice: ecs.txSigner.GasPrice,
}
}

Expand Down Expand Up @@ -116,35 +152,119 @@ func (ecs *EthChainService) SendTransaction(tx protocols.ChainTransaction) error
return fmt.Errorf("unexpected transaction type %T", tx)
}
}
func (ecs *EthChainService) subcribeToEvents() error {

go ecs.listenForLogEvents()
return nil
// fatalError is called to output the error and then panic, killing the chain service.
// If prints out the error to STDOUT, the logger and then exits the program.
func (ecs *EthChainService) fatalError(err error) {
ecs.fatalF("FATAL ERROR\n%+v", err)
}

// fatalF is called to output a message and then panic, killing the chain service.
// It accepts a format string and arguments, as per fmt.Printf.
// If prints out the error to STDOUT, the logger and then exits the program.
func (ecs *EthChainService) fatalF(format string, v ...any) {

// Print to STDOUT in case we're using a noop logger
fmt.Println(fmt.Errorf(format, v...))
// FatalF prints to the logger then calls exit(1)
ecs.logger.Fatalf(format, v...)

// Manually panic in case we're using a logger that doesn't call exit(1)
panic(fmt.Errorf(format, v...))

}

// dispatchChainEvents takes in a collection of event logs from the chain
// and dispatches events to the out channel
func (ecs *EthChainService) dispatchChainEvents(logs []ethTypes.Log) {
for _, l := range logs {
switch l.Topics[0] {
case depositedTopic:
nad, err := ecs.na.ParseDeposited(l)
if err != nil {
ecs.fatalF("error in ParseDeposited: %v", err)
}

event := NewDepositedEvent(nad.Destination, l.BlockNumber, nad.Asset, nad.AmountDeposited, nad.DestinationHoldings)
ecs.out <- event
case allocationUpdatedTopic:
au, err := ecs.na.ParseAllocationUpdated(l)
if err != nil {
ecs.fatalF("error in ParseAllocationUpdated: %v", err)

}

tx, pending, err := ecs.chain.TransactionByHash(context.Background(), l.TxHash)
if pending {
ecs.fatalF("Expected transaction to be part of the chain, but the transaction is pending")

}
var assetAddress types.Address
var amount *big.Int

if err != nil {
ecs.fatalF("error in TransactionByHash: %v", err)
}

assetAddress, amount, err = getChainHolding(ecs.na, tx, au)
if err != nil {
ecs.fatalF("error in getChainHoldings: %v", err)
}

event := NewAllocationUpdatedEvent(au.ChannelId, l.BlockNumber, assetAddress, amount)
ecs.out <- event
case concludedTopic:
ce, err := ecs.na.ParseConcluded(l)
if err != nil {
ecs.fatalF("error in ParseConcluded: %v", err)

}

event := ConcludedEvent{commonEvent: commonEvent{channelID: ce.ChannelId, BlockNum: l.BlockNumber}}
ecs.out <- event

default:
ecs.fatalF("Unknown chain event")
}
}

}

// getCurrentBlockNumber returns the current block number.
func (ecs *EthChainService) getCurrentBlockNum() *big.Int {
h, err := ecs.chain.HeaderByNumber(context.Background(), nil)
if err != nil {
ecs.fatalError(err)
}

return h.Number
}

func (ecs *EthChainService) listenForLogEvents() {
// Subsribe to Adjudicator events
// subscribeForLogs subscribes for logs and pushes them to the out channel.
// It relies on notifications being supported by the chain node.
func (ecs *EthChainService) subscribeForLogs(ctx context.Context) {
// Subscribe to Adjudicator events
query := ethereum.FilterQuery{
Addresses: []common.Address{ecs.naAddress},
}
logs := make(chan ethTypes.Log)
sub, err := ecs.chain.SubscribeFilterLogs(context.Background(), query, logs)
sub, err := ecs.chain.SubscribeFilterLogs(ctx, query, logs)
if err != nil {
panic(err)
ecs.fatalError(err)
}
for {
select {
case err := <-sub.Err():
if err != nil {
panic(err)
ecs.fatalError(err)
}

// If the error is nil then the subscription was closed and we need to re-subscribe.
// This is a workaround for https://github.com/ethereum/go-ethereum/issues/23845
var sErr error
sub, sErr = ecs.chain.SubscribeFilterLogs(context.Background(), query, logs)
sub, sErr = ecs.chain.SubscribeFilterLogs(ctx, query, logs)
if sErr != nil {
panic(err)
ecs.fatalError(err)
}
ecs.logger.Println("resubscribed to filtered logs")

Expand All @@ -153,47 +273,52 @@ func (ecs *EthChainService) listenForLogEvents() {
// We unsub here and recreate the subscription in the next iteration of the select.
sub.Unsubscribe()
case chainEvent := <-logs:
switch chainEvent.Topics[0] {
case depositedTopic:
nad, err := ecs.na.ParseDeposited(chainEvent)
if err != nil {
ecs.logger.Printf("error in ParseDeposited: %v", err)
}
ecs.dispatchChainEvents([]ethTypes.Log{chainEvent})

event := NewDepositedEvent(nad.Destination, chainEvent.BlockNumber, nad.Asset, nad.AmountDeposited, nad.DestinationHoldings)
ecs.out <- event
case allocationUpdatedTopic:
au, err := ecs.na.ParseAllocationUpdated(chainEvent)
if err != nil {
ecs.logger.Printf("error in ParseAllocationUpdated: %v", err)
}
}
}

tx, pending, err := ecs.chain.TransactionByHash(context.Background(), chainEvent.TxHash)
if pending {
ecs.logger.Printf("Expected transacion to be part of the chain, but the transaction is pending")
}
if err != nil {
ecs.logger.Printf("error in TransactoinByHash: %v", err)
}
}

// pollForLogs periodically polls the chain client to check if there new events.
// It can function over a chain node that does not support notifications.
func (ecs *EthChainService) pollForLogs(ctx context.Context) {

// TODO: We are currently querying from the genesis block to the current block.
// We could make this more performant by querying from the nitro adjudicator contract deployment block.
query := ethereum.FilterQuery{
Addresses: []common.Address{ecs.naAddress},
FromBlock: nil,
ToBlock: ecs.getCurrentBlockNum(),
}

fetchedLogs, err := ecs.chain.FilterLogs(context.Background(), query)

if err != nil {
ecs.fatalError(err)
}

ecs.dispatchChainEvents(fetchedLogs)
for {
select {
case <-time.After(EVENT_POLL_INTERVAL):
currentBlock := ecs.getCurrentBlockNum()

if moreRecentBlockAvailable := currentBlock.Cmp(query.ToBlock) > 0; moreRecentBlockAvailable {
// The query includes the from and to blocks so we need to increment the from block to avoid duplicating events
query.FromBlock = big.NewInt(0).Add(query.ToBlock, big.NewInt(1))
query.ToBlock = big.NewInt(0).Set(currentBlock)

fetchedLogs, err := ecs.chain.FilterLogs(context.Background(), query)

assetAddress, amount, err := getChainHolding(ecs.na, tx, au)
if err != nil {
ecs.logger.Printf("error in getChainHoldings: %v", err)
}
event := NewAllocationUpdatedEvent(au.ChannelId, chainEvent.BlockNumber, assetAddress, amount)
ecs.out <- event
case concludedTopic:
ce, err := ecs.na.ParseConcluded(chainEvent)
if err != nil {
ecs.logger.Printf("error in ParseConcluded: %v", err)
ecs.fatalError(err)
}

event := ConcludedEvent{commonEvent: commonEvent{channelID: ce.ChannelId, BlockNum: chainEvent.BlockNumber}}
ecs.out <- event

default:
ecs.logger.Printf("Unknown chain event")
ecs.dispatchChainEvents(fetchedLogs)
}
case <-ctx.Done():
return
}
}

Expand Down
Loading

0 comments on commit 5d8bdf0

Please sign in to comment.