Skip to content

Commit

Permalink
Small changes
Browse files Browse the repository at this point in the history
  • Loading branch information
goran-ethernal committed Dec 2, 2023
1 parent a910ba0 commit 52d15e0
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 36 deletions.
51 changes: 34 additions & 17 deletions tracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
hcf "github.com/hashicorp/go-hclog"
"github.com/umbracle/ethgo"
"github.com/umbracle/ethgo/blocktracker"
"github.com/umbracle/ethgo/jsonrpc"
)

// EventSubscriber is an interface that defines methods for handling tracked logs (events) from a blockchain
Expand Down Expand Up @@ -63,9 +64,6 @@ type EventTrackerConfig struct {
// Logger is the logger instance for event tracker
Logger hcf.Logger `json:"-"`

// Store is the store implementation for data that tracker saves (lastProcessedBlock and logs)
Store store.EventTrackerStore `json:"-"`

// BlockProvider is the implementation of a provider that returns blocks and logs from tracked chain
BlockProvider BlockProvider `json:"-"`

Expand All @@ -82,6 +80,8 @@ type EventTracker struct {

blockTracker blocktracker.BlockTrackerInterface
blockContainer *TrackerBlockContainer

store store.EventTrackerStore
}

// NewEventTracker is a constructor function that creates a new instance of the EventTracker struct.
Expand All @@ -108,34 +108,51 @@ type EventTracker struct {
//
// Inputs:
// - config (TrackerConfig): configuration of EventTracker.
// - store: implementation of EventTrackerStore interface
// - startBlockFromGenesis: block from which to start syncing
//
// Outputs:
// - A new instance of the EventTracker struct.
func NewEventTracker(config *EventTrackerConfig) (*EventTracker, error) {
lastProcessedBlock, err := config.Store.GetLastProcessedBlock()
func NewEventTracker(config *EventTrackerConfig, store store.EventTrackerStore,
startBlockFromGenesis uint64) (*EventTracker, error) {
lastProcessedBlock, err := store.GetLastProcessedBlock()
if err != nil {
return nil, err
}

if lastProcessedBlock == 0 && config.NumOfBlocksToReconcile > 0 {
latestBlock, err := config.BlockProvider.GetBlockByNumber(ethgo.Latest, false)
if err != nil {
return nil, err
}

if latestBlock.Number > config.NumOfBlocksToReconcile {
// if this is a fresh start, then we should start syncing from
// latestBlock.Number - NumOfBlocksToReconcile
lastProcessedBlock = latestBlock.Number - config.NumOfBlocksToReconcile
lastProcessedBlock = startBlockFromGenesis

if err := config.Store.InsertLastProcessedBlock(lastProcessedBlock); err != nil {
if config.NumOfBlocksToReconcile > 0 {
latestBlock, err := config.BlockProvider.GetBlockByNumber(ethgo.Latest, false)
if err != nil {
return nil, err
}

if latestBlock.Number > config.NumOfBlocksToReconcile &&
startBlockFromGenesis < latestBlock.Number-config.NumOfBlocksToReconcile {
// if this is a fresh start, and we missed too much blocks,
// then we should start syncing from
// latestBlock.Number - NumOfBlocksToReconcile
lastProcessedBlock = latestBlock.Number - config.NumOfBlocksToReconcile
}
}
}

// if block provider is not provided externally,
// we can start the ethgo one
if config.BlockProvider == nil {
clt, err := jsonrpc.NewClient(config.RPCEndpoint)
if err != nil {
return nil, err
}

config.BlockProvider = clt.Eth()
}

return &EventTracker{
config: config,
store: store,
closeCh: make(chan struct{}),
blockTracker: blocktracker.NewJSONBlockTracker(config.BlockProvider),
blockContainer: NewTrackerBlockContainer(lastProcessedBlock),
Expand Down Expand Up @@ -414,7 +431,7 @@ func (e *EventTracker) processLogs() error {
}
}

if err := e.config.Store.InsertLastProcessedBlock(toBlock); err != nil {
if err := e.store.InsertLastProcessedBlock(toBlock); err != nil {
e.config.Logger.Error("Process logs failed on saving last processed block",
"fromBlock", fromBlock,
"toBlock", toBlock,
Expand All @@ -423,7 +440,7 @@ func (e *EventTracker) processLogs() error {
return err
}

if err := e.config.Store.InsertLogs(filteredLogs); err != nil {
if err := e.store.InsertLogs(filteredLogs); err != nil {
e.config.Logger.Error("Process logs failed on saving logs to store",
"fromBlock", fromBlock,
"toBlock", toBlock,
Expand Down
1 change: 1 addition & 0 deletions tracker/tracker_fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func FuzzGetNewState(f *testing.F) {
eventTracker := &EventTracker{
config: testConfig,
blockContainer: NewTrackerBlockContainer(data.LastProcessed),
store: store.NewTestTrackerStore(t),
}

require.NoError(t, eventTracker.getNewState(&ethgo.Block{Number: data.Number}))
Expand Down
43 changes: 24 additions & 19 deletions tracker/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestEventTracker_TrackBlock(t *testing.T) {
t.Run("Add block by block - no confirmed blocks", func(t *testing.T) {
t.Parallel()

tracker, err := NewEventTracker(createTestTrackerConfig(t, 10, 10, 0))
tracker, err := NewEventTracker(createTestTrackerConfig(t, 10, 10, 0), store.NewTestTrackerStore(t), 0)

require.NoError(t, err)

Expand All @@ -112,7 +112,7 @@ func TestEventTracker_TrackBlock(t *testing.T) {

// check that the last processed block is 0, since we did not have any confirmed blocks
require.Equal(t, uint64(0), tracker.blockContainer.LastProcessedBlockLocked())
lastProcessedBlockInStore, err := tracker.config.Store.GetLastProcessedBlock()
lastProcessedBlockInStore, err := tracker.store.GetLastProcessedBlock()
require.NoError(t, err)
require.Equal(t, uint64(0), lastProcessedBlockInStore)

Expand All @@ -131,7 +131,8 @@ func TestEventTracker_TrackBlock(t *testing.T) {
blockProviderMock := new(mockProvider)
blockProviderMock.On("GetLogs", mock.Anything).Return([]*ethgo.Log{}, nil).Once()

tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, 10, 0))
tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, 10, 0),
store.NewTestTrackerStore(t), 0)
require.NoError(t, err)

tracker.config.BlockProvider = blockProviderMock
Expand Down Expand Up @@ -164,7 +165,7 @@ func TestEventTracker_TrackBlock(t *testing.T) {
// check if the last confirmed block processed is as expected
require.Equal(t, numOfConfirmedBlocks, tracker.blockContainer.LastProcessedBlock())
// check if the last confirmed block is saved in db as well
lastProcessedConfirmedBlock, err := tracker.config.Store.GetLastProcessedBlock()
lastProcessedConfirmedBlock, err := tracker.store.GetLastProcessedBlock()
require.NoError(t, err)
require.Equal(t, numOfConfirmedBlocks, lastProcessedConfirmedBlock)
// check that in memory cache removed processed confirmed logs
Expand Down Expand Up @@ -197,7 +198,8 @@ func TestEventTracker_TrackBlock(t *testing.T) {
blockProviderMock := new(mockProvider)
blockProviderMock.On("GetLogs", mock.Anything).Return(logs, nil).Once()

tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, 10, 0))
tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, 10, 0),
store.NewTestTrackerStore(t), 0)
require.NoError(t, err)

tracker.config.BlockProvider = blockProviderMock
Expand Down Expand Up @@ -230,12 +232,12 @@ func TestEventTracker_TrackBlock(t *testing.T) {
// check if the last confirmed block processed is as expected
require.Equal(t, numOfConfirmedBlocks, tracker.blockContainer.LastProcessedBlock())
// check if the last confirmed block is saved in db as well
lastProcessedConfirmedBlock, err := tracker.config.Store.GetLastProcessedBlock()
lastProcessedConfirmedBlock, err := tracker.store.GetLastProcessedBlock()
require.NoError(t, err)
require.Equal(t, numOfConfirmedBlocks, lastProcessedConfirmedBlock)
// check if we have logs in store
for _, log := range logs {
logFromDB, err := tracker.config.Store.GetLog(log.BlockNumber, log.LogIndex)
logFromDB, err := tracker.store.GetLog(log.BlockNumber, log.LogIndex)
require.NoError(t, err)
require.Equal(t, log.Address, logFromDB.Address)
require.Equal(t, log.BlockNumber, log.BlockNumber)
Expand Down Expand Up @@ -265,7 +267,8 @@ func TestEventTracker_TrackBlock(t *testing.T) {
blockProviderMock := new(mockProvider)
blockProviderMock.On("GetLogs", mock.Anything).Return(nil, errors.New("some error occurred")).Once()

tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, 10, 0))
tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, 10, 0),
store.NewTestTrackerStore(t), 0)
require.NoError(t, err)

tracker.config.BlockProvider = blockProviderMock
Expand Down Expand Up @@ -298,7 +301,7 @@ func TestEventTracker_TrackBlock(t *testing.T) {
// check if the last confirmed block processed is as expected, in this case 0, because an error occurred
require.Equal(t, uint64(0), tracker.blockContainer.LastProcessedBlock())
// check if the last confirmed block is saved in db as well
lastProcessedConfirmedBlock, err := tracker.config.Store.GetLastProcessedBlock()
lastProcessedConfirmedBlock, err := tracker.store.GetLastProcessedBlock()
require.NoError(t, err)
require.Equal(t, uint64(0), lastProcessedConfirmedBlock)
// check that in memory cache nothing got removed, and that we have the latest block as well
Expand Down Expand Up @@ -335,7 +338,8 @@ func TestEventTracker_TrackBlock(t *testing.T) {
// just mock the call, it will use the provider.blocks map to handle proper returns
blockProviderMock.On("GetBlockByNumber", mock.Anything, mock.Anything).Return(nil, nil).Times(int(numOfMissedBlocks))

tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, batchSize, 0))
tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, batchSize, 0),
store.NewTestTrackerStore(t), 0)
require.NoError(t, err)

tracker.config.BlockProvider = blockProviderMock
Expand Down Expand Up @@ -369,11 +373,11 @@ func TestEventTracker_TrackBlock(t *testing.T) {
expectedLastProcessed := numOfMissedBlocks + 1 - numBlockConfirmations
require.Equal(t, expectedLastProcessed, tracker.blockContainer.LastProcessedBlock())
// check if the last confirmed block is saved in db as well
lastProcessedConfirmedBlock, err := tracker.config.Store.GetLastProcessedBlock()
lastProcessedConfirmedBlock, err := tracker.store.GetLastProcessedBlock()
require.NoError(t, err)
require.Equal(t, expectedLastProcessed, lastProcessedConfirmedBlock)
// check if we have logs in store
logsFromDB, err := tracker.config.Store.GetAllLogs()
logsFromDB, err := tracker.store.GetAllLogs()
require.NoError(t, err)
require.Len(t, logsFromDB, len(logs))

Expand Down Expand Up @@ -421,7 +425,8 @@ func TestEventTracker_TrackBlock(t *testing.T) {
blockProviderMock.On("GetBlockByNumber", mock.Anything, mock.Anything).Return(nil, nil).Times(
int(numOfMissedBlocks + numOfCachedBlocks))

tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, batchSize, 0))
tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, batchSize, 0),
store.NewTestTrackerStore(t), 0)
require.NoError(t, err)

tracker.config.BlockProvider = blockProviderMock
Expand Down Expand Up @@ -468,11 +473,11 @@ func TestEventTracker_TrackBlock(t *testing.T) {
expectedLastProcessed := numOfMissedBlocks + numOfCachedBlocks + 1 - numBlockConfirmations
require.Equal(t, expectedLastProcessed, tracker.blockContainer.LastProcessedBlock())
// check if the last confirmed block is saved in db as well
lastProcessedConfirmedBlock, err := tracker.config.Store.GetLastProcessedBlock()
lastProcessedConfirmedBlock, err := tracker.store.GetLastProcessedBlock()
require.NoError(t, err)
require.Equal(t, expectedLastProcessed, lastProcessedConfirmedBlock)
// check if we have logs in store
logsFromDB, err := tracker.config.Store.GetAllLogs()
logsFromDB, err := tracker.store.GetAllLogs()
require.NoError(t, err)
require.Len(t, logsFromDB, len(logs))

Expand Down Expand Up @@ -516,7 +521,8 @@ func TestEventTracker_TrackBlock(t *testing.T) {
// just mock the call, it will use the provider.blocks map to handle proper returns
blockProviderMock.On("GetBlockByNumber", mock.Anything, mock.Anything).Return(nil, nil).Times(int(numOfCachedBlocks))

tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, batchSize, 0))
tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, batchSize, 0),
store.NewTestTrackerStore(t), 0)
require.NoError(t, err)

tracker.config.BlockProvider = blockProviderMock
Expand Down Expand Up @@ -561,11 +567,11 @@ func TestEventTracker_TrackBlock(t *testing.T) {
expectedLastProcessed := numOfCachedBlocks + 1 - numBlockConfirmations
require.Equal(t, expectedLastProcessed, tracker.blockContainer.LastProcessedBlock())
// check if the last confirmed block is saved in db as well
lastProcessedConfirmedBlock, err := tracker.config.Store.GetLastProcessedBlock()
lastProcessedConfirmedBlock, err := tracker.store.GetLastProcessedBlock()
require.NoError(t, err)
require.Equal(t, expectedLastProcessed, lastProcessedConfirmedBlock)
// check if we have logs in store
logsFromDB, err := tracker.config.Store.GetAllLogs()
logsFromDB, err := tracker.store.GetAllLogs()
require.NoError(t, err)
require.Len(t, logsFromDB, len(logs))

Expand Down Expand Up @@ -597,7 +603,6 @@ func createTestTrackerConfig(t *testing.T,
LogFilter: map[ethgo.Address][]ethgo.Hash{
ethgo.ZeroAddress: {store.StateSyncEventABI.ID()},
},
Store: store.NewTestTrackerStore(t),
EventSubscriber: new(mockEventSubscriber),
BlockProvider: new(mockProvider),
}
Expand Down

0 comments on commit 52d15e0

Please sign in to comment.