Skip to content

Commit

Permalink
Fix log fetching in arbrollup
Browse files Browse the repository at this point in the history
  • Loading branch information
hkalodner committed Jan 13, 2020
1 parent 9a54e11 commit 3682d27
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 53 deletions.
89 changes: 36 additions & 53 deletions packages/arb-validator/ethbridge/arbRollupWatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package ethbridge
import (
"bytes"
"context"
"log"
"math/big"
"strings"

Expand Down Expand Up @@ -76,7 +75,6 @@ func init() {
}

type ethRollupWatcher struct {
Client *ethclient.Client
ArbRollup *rollup.ArbRollup
GlobalPendingInbox *rollup.IGlobalPendingInbox

Expand All @@ -86,7 +84,7 @@ type ethRollupWatcher struct {
}

func newRollupWatcher(address ethcommon.Address, client *ethclient.Client) (*ethRollupWatcher, error) {
vm := &ethRollupWatcher{Client: client, address: address}
vm := &ethRollupWatcher{client: client, address: address}
err := vm.setupContracts()
return vm, err
}
Expand Down Expand Up @@ -123,7 +121,7 @@ func (vm *ethRollupWatcher) rollupFilter() ethereum.FilterQuery {
}

func (vm *ethRollupWatcher) setupContracts() error {
arbitrumRollupContract, err := rollup.NewArbRollup(vm.address, vm.Client)
arbitrumRollupContract, err := rollup.NewArbRollup(vm.address, vm.client)
if err != nil {
return errors2.Wrap(err, "Failed to connect to arbRollup")
}
Expand All @@ -136,7 +134,7 @@ func (vm *ethRollupWatcher) setupContracts() error {
return errors2.Wrap(err, "Failed to get GlobalPendingInbox address")
}
vm.pendingInboxAddress = globalPendingInboxAddress
globalPendingContract, err := rollup.NewIGlobalPendingInbox(globalPendingInboxAddress, vm.Client)
globalPendingContract, err := rollup.NewIGlobalPendingInbox(globalPendingInboxAddress, vm.client)
if err != nil {
return errors2.Wrap(err, "Failed to connect to GlobalPendingInbox")
}
Expand All @@ -152,30 +150,24 @@ func (vm *ethRollupWatcher) StartConnection(ctx context.Context, outChan chan ar
}

headers := make(chan *types.Header)
headersSub, err := vm.Client.SubscribeNewHead(ctx, headers)
headersSub, err := vm.client.SubscribeNewHead(ctx, headers)
if err != nil {
return err
}

filter := vm.rollupFilter()
messagesFilter := vm.messageFilter()
logChan := make(chan types.Log, 1024)
logErrChan := make(chan error, 10)

logChan := make(chan types.Log)
logSub, err := vm.Client.SubscribeFilterLogs(ctx, filter, logChan)
if err != nil {
if err := getLogs(ctx, vm.client, vm.rollupFilter(), big.NewInt(0), logChan, logErrChan); err != nil {
return err
}

messagesLogChan := make(chan types.Log)
messagesLogSub, err := vm.Client.SubscribeFilterLogs(ctx, messagesFilter, messagesLogChan)
if err != nil {
if err := getLogs(ctx, vm.client, vm.messageFilter(), big.NewInt(0), logChan, logErrChan); err != nil {
return err
}

go func() {
defer headersSub.Unsubscribe()
defer messagesLogSub.Unsubscribe()
defer logSub.Unsubscribe()

for {
select {
Expand All @@ -187,23 +179,15 @@ func (vm *ethRollupWatcher) StartConnection(ctx context.Context, outChan chan ar
BlockHeight: header.Number,
Event: arbbridge.NewTimeEvent{},
}
case log := <-messagesLogChan:
if err := vm.processEvents(ctx, log, outChan); err != nil {
errChan <- err
return
}
case log := <-logChan:
if err := vm.processEvents(ctx, log, outChan); err != nil {
case ethLog := <-logChan:
if err := vm.processEvents(ctx, ethLog, outChan); err != nil {
errChan <- err
return
}
case err := <-headersSub.Err():
errChan <- err
return
case err := <-messagesLogSub.Err():
case err := <-logErrChan:
errChan <- err
return
case err := <-logSub.Err():
case err := <-headersSub.Err():
errChan <- err
return
}
Expand All @@ -212,19 +196,19 @@ func (vm *ethRollupWatcher) StartConnection(ctx context.Context, outChan chan ar
return nil
}

func (vm *ethRollupWatcher) processEvents(ctx context.Context, log types.Log, outChan chan arbbridge.Notification) error {
func (vm *ethRollupWatcher) processEvents(ctx context.Context, ethLog types.Log, outChan chan arbbridge.Notification) error {
event, err := func() (arbbridge.Event, error) {
if log.Topics[0] == rollupStakeCreatedID {
eventVal, err := vm.ArbRollup.ParseRollupStakeCreated(log)
if ethLog.Topics[0] == rollupStakeCreatedID {
eventVal, err := vm.ArbRollup.ParseRollupStakeCreated(ethLog)
if err != nil {
return nil, err
}
return arbbridge.StakeCreatedEvent{
Staker: common.NewAddressFromEth(eventVal.Staker),
NodeHash: eventVal.NodeHash,
}, nil
} else if log.Topics[0] == rollupChallengeStartedID {
eventVal, err := vm.ArbRollup.ParseRollupChallengeStarted(log)
} else if ethLog.Topics[0] == rollupChallengeStartedID {
eventVal, err := vm.ArbRollup.ParseRollupChallengeStarted(ethLog)
if err != nil {
return nil, err
}
Expand All @@ -234,8 +218,8 @@ func (vm *ethRollupWatcher) processEvents(ctx context.Context, log types.Log, ou
ChallengeType: structures.ChildType(eventVal.ChallengeType.Uint64()),
ChallengeContract: common.NewAddressFromEth(eventVal.ChallengeContract),
}, nil
} else if log.Topics[0] == rollupChallengeCompletedID {
eventVal, err := vm.ArbRollup.ParseRollupChallengeCompleted(log)
} else if ethLog.Topics[0] == rollupChallengeCompletedID {
eventVal, err := vm.ArbRollup.ParseRollupChallengeCompleted(ethLog)
if err != nil {
return nil, err
}
Expand All @@ -244,33 +228,33 @@ func (vm *ethRollupWatcher) processEvents(ctx context.Context, log types.Log, ou
Loser: common.NewAddressFromEth(eventVal.Loser),
ChallengeContract: common.NewAddressFromEth(eventVal.ChallengeContract),
}, nil
} else if log.Topics[0] == rollupRefundedID {
eventVal, err := vm.ArbRollup.ParseRollupStakeRefunded(log)
} else if ethLog.Topics[0] == rollupRefundedID {
eventVal, err := vm.ArbRollup.ParseRollupStakeRefunded(ethLog)
if err != nil {
return nil, err
}
return arbbridge.StakeRefundedEvent{
Staker: common.NewAddressFromEth(eventVal.Staker),
}, nil
} else if log.Topics[0] == rollupPrunedID {
eventVal, err := vm.ArbRollup.ParseRollupPruned(log)
} else if ethLog.Topics[0] == rollupPrunedID {
eventVal, err := vm.ArbRollup.ParseRollupPruned(ethLog)
if err != nil {
return nil, err
}
return arbbridge.PrunedEvent{
Leaf: eventVal.Leaf,
}, nil
} else if log.Topics[0] == rollupStakeMovedID {
eventVal, err := vm.ArbRollup.ParseRollupStakeMoved(log)
} else if ethLog.Topics[0] == rollupStakeMovedID {
eventVal, err := vm.ArbRollup.ParseRollupStakeMoved(ethLog)
if err != nil {
return nil, err
}
return arbbridge.StakeMovedEvent{
Staker: common.NewAddressFromEth(eventVal.Staker),
Location: eventVal.ToNodeHash,
}, nil
} else if log.Topics[0] == rollupAssertedID {
eventVal, err := vm.ArbRollup.ParseRollupAsserted(log)
} else if ethLog.Topics[0] == rollupAssertedID {
eventVal, err := vm.ArbRollup.ParseRollupAsserted(ethLog)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -300,24 +284,24 @@ func (vm *ethRollupWatcher) processEvents(ctx context.Context, log types.Log, ou
MaxPendingTop: eventVal.Fields[1],
MaxPendingCount: eventVal.PendingCount,
}, nil
} else if log.Topics[0] == rollupConfirmedID {
eventVal, err := vm.ArbRollup.ParseRollupConfirmed(log)
} else if ethLog.Topics[0] == rollupConfirmedID {
eventVal, err := vm.ArbRollup.ParseRollupConfirmed(ethLog)
if err != nil {
return nil, err
}
return arbbridge.ConfirmedEvent{
NodeHash: eventVal.NodeHash,
}, nil
} else if log.Topics[0] == confirmedAssertionID {
eventVal, err := vm.ArbRollup.ParseConfirmedAssertion(log)
} else if ethLog.Topics[0] == confirmedAssertionID {
eventVal, err := vm.ArbRollup.ParseConfirmedAssertion(ethLog)
if err != nil {
return nil, err
}
return arbbridge.ConfirmedAssertionEvent{
LogsAccHash: eventVal.LogsAccHash,
}, nil
} else if log.Topics[0] == messageDeliveredID {
val, err := vm.GlobalPendingInbox.ParseMessageDelivered(log)
} else if ethLog.Topics[0] == messageDeliveredID {
val, err := vm.GlobalPendingInbox.ParseMessageDelivered(ethLog)
if err != nil {
return nil, err
}
Expand All @@ -338,7 +322,7 @@ func (vm *ethRollupWatcher) processEvents(ctx context.Context, log types.Log, ou

msgVal, _ := value.NewTupleFromSlice([]value.Value{
msgData,
value.NewIntValue(new(big.Int).SetUint64(log.BlockNumber)),
value.NewIntValue(new(big.Int).SetUint64(ethLog.BlockNumber)),
value.NewIntValue(msgHashInt),
})

Expand All @@ -354,7 +338,7 @@ func (vm *ethRollupWatcher) processEvents(ctx context.Context, log types.Log, ou
return err
}
if event != nil {
header, err := vm.Client.HeaderByHash(ctx, log.BlockHash)
header, err := vm.client.HeaderByHash(ctx, ethLog.BlockHash)
if err != nil {
return err
}
Expand All @@ -363,14 +347,13 @@ func (vm *ethRollupWatcher) processEvents(ctx context.Context, log types.Log, ou
BlockHeight: header.Number,
VMID: common.NewAddressFromEth(vm.address),
Event: event,
TxHash: log.TxHash,
TxHash: ethLog.TxHash,
}
}
return nil
}

func (vm *ethRollupWatcher) GetParams(ctx context.Context) (structures.ChainParams, error) {
log.Println("Calling GetParams")
rawParams, err := vm.ArbRollup.VmParams(nil)
if err != nil {
return structures.ChainParams{}, err
Expand Down
96 changes: 96 additions & 0 deletions packages/arb-validator/ethbridge/logFetcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright 2020, Offchain Labs, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package ethbridge

import (
"context"
"log"
"math/big"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
)

func getLogs(
ctx context.Context,
client *ethclient.Client,
filter ethereum.FilterQuery,
startHeight *big.Int,
logChan chan types.Log,
errChan chan error,
) error {
streamingLogChan := make(chan types.Log)
logSub, err := client.SubscribeFilterLogs(ctx, filter, streamingLogChan)
if err != nil {
return err
}
header, err := client.HeaderByNumber(ctx, nil)
if err != nil {
return err
}
go func() {
defer close(logChan)
defer logSub.Unsubscribe()
// Get initial old logs
filter.FromBlock = startHeight
filter.ToBlock = header.Number
logs, err := client.FilterLogs(ctx, filter)
if err != nil {
errChan <- err
return
}
for _, ethLog := range logs {
log.Println("getLogs1", ethLog.BlockNumber, ethLog.TxIndex, ethLog.Index)
logChan <- ethLog
}

// Retreive for log from stream
ethLog := <-streamingLogChan
log.Println("getLogs2", ethLog.BlockNumber, ethLog.TxIndex, ethLog.Index)

// If there was a gap between initial retrieval and the stream, fill it in
if ethLog.BlockNumber > header.Number.Uint64() {
filter.FromBlock = header.Number
filter.ToBlock = new(big.Int).Sub(new(big.Int).SetUint64(ethLog.BlockNumber), big.NewInt(1))
logs, err := client.FilterLogs(ctx, filter)
if err != nil {
errChan <- err
return
}
for _, ethLog := range logs {
log.Println("getLogs3", ethLog.BlockNumber, ethLog.TxIndex, ethLog.Index)
logChan <- ethLog
}
}
logChan <- ethLog

for {
select {
case <-ctx.Done():
return
case ethLog := <-streamingLogChan:
log.Println("getLogs4", ethLog.BlockNumber, ethLog.TxIndex, ethLog.Index)
logChan <- ethLog
case err := <-logSub.Err():
errChan <- err
}
}
}()

return nil
}

0 comments on commit 3682d27

Please sign in to comment.