diff --git a/dataRetriever/requestHandlers/requestHandler.go b/dataRetriever/requestHandlers/requestHandler.go index a95cd19f7ef..333e132ac79 100644 --- a/dataRetriever/requestHandlers/requestHandler.go +++ b/dataRetriever/requestHandlers/requestHandler.go @@ -116,8 +116,8 @@ func (rrh *resolverRequestHandler) RequestTransaction(destShardID uint32, txHash rrh.requestByHashes(destShardID, txHashes, factory.TransactionTopic, uniqueTxSuffix) } -// RequestReceipts method ask for receipts from the connected peers -func (rrh *resolverRequestHandler) RequestReceipts(receiptsHashes [][]byte) { +// RequestReceiptsTrieNodes method ask for receipts from the connected peers +func (rrh *resolverRequestHandler) RequestReceiptsTrieNodes(receiptsHashes [][]byte) { rrh.requestByHashes(rrh.shardID, receiptsHashes, factory.ReceiptTopic, uniqueReceiptSuffix) } diff --git a/epochStart/bootstrap/process_test.go b/epochStart/bootstrap/process_test.go index 33117a335dc..c93ac9c15b0 100644 --- a/epochStart/bootstrap/process_test.go +++ b/epochStart/bootstrap/process_test.go @@ -979,6 +979,9 @@ func TestCreateSyncers(t *testing.T) { HeartbeatsCalled: func() storage.Cacher { return testscommon.NewCacherStub() }, + ReceiptsCalled: func() storage.Cacher { + return testscommon.NewCacherStub() + }, } epochStartProvider.whiteListHandler = &testscommon.WhiteListHandlerStub{} epochStartProvider.whiteListerVerifiedTxs = &testscommon.WhiteListHandlerStub{} @@ -2394,6 +2397,9 @@ func TestSyncSetGuardianTransaction(t *testing.T) { HeartbeatsCalled: func() storage.Cacher { return testscommon.NewCacherStub() }, + ReceiptsCalled: func() storage.Cacher { + return testscommon.NewCacherStub() + }, } epochStartProvider.whiteListHandler = &testscommon.WhiteListHandlerStub{ IsWhiteListedCalled: func(interceptedData process.InterceptedData) bool { diff --git a/genesis/process/disabled/requestHandler.go b/genesis/process/disabled/requestHandler.go index a1f26781b7d..25ce28a87aa 100644 --- a/genesis/process/disabled/requestHandler.go +++ b/genesis/process/disabled/requestHandler.go @@ -6,6 +6,10 @@ import "time" type RequestHandler struct { } +// RequestReceiptsTrieNodes - +func (r *RequestHandler) RequestReceiptsTrieNodes(_ [][]byte) { +} + // SetEpoch does nothing func (r *RequestHandler) SetEpoch(_ uint32) { } diff --git a/go.mod b/go.mod index cc810550931..6cd571a2866 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/klauspost/cpuid/v2 v2.2.5 github.com/mitchellh/mapstructure v1.5.0 github.com/multiversx/mx-chain-communication-go v1.1.0 - github.com/multiversx/mx-chain-core-go v1.2.22-0.20240821080303-1673ce1a0fd8 + github.com/multiversx/mx-chain-core-go v1.2.23-0.20240924080818-9cfce0926498 github.com/multiversx/mx-chain-crypto-go v1.2.12 github.com/multiversx/mx-chain-es-indexer-go v1.7.4 github.com/multiversx/mx-chain-logger-go v1.0.15 diff --git a/go.sum b/go.sum index 21fd4313714..4907961ae3e 100644 --- a/go.sum +++ b/go.sum @@ -387,8 +387,8 @@ github.com/multiversx/concurrent-map v0.1.4 h1:hdnbM8VE4b0KYJaGY5yJS2aNIW9TFFsUY github.com/multiversx/concurrent-map v0.1.4/go.mod h1:8cWFRJDOrWHOTNSqgYCUvwT7c7eFQ4U2vKMOp4A/9+o= github.com/multiversx/mx-chain-communication-go v1.1.0 h1:J7bX6HoN3HiHY7cUeEjG8AJWgQDDPcY+OPDOsSUOkRE= github.com/multiversx/mx-chain-communication-go v1.1.0/go.mod h1:WK6bP4pGEHGDDna/AYRIMtl6G9OA0NByI1Lw8PmOnRM= -github.com/multiversx/mx-chain-core-go v1.2.22-0.20240821080303-1673ce1a0fd8 h1:zlm+Brw3jPj5lQ8m7wGLwYWD4ebWArGd5qE5/99qIWQ= -github.com/multiversx/mx-chain-core-go v1.2.22-0.20240821080303-1673ce1a0fd8/go.mod h1:B5zU4MFyJezmEzCsAHE9YNULmGCm2zbPHvl9hazNxmE= +github.com/multiversx/mx-chain-core-go v1.2.23-0.20240924080818-9cfce0926498 h1:BBBAzI/Mg6FVvxi/YHbUbFM8teVGX+MxsHdOvbXd8zg= +github.com/multiversx/mx-chain-core-go v1.2.23-0.20240924080818-9cfce0926498/go.mod h1:B5zU4MFyJezmEzCsAHE9YNULmGCm2zbPHvl9hazNxmE= github.com/multiversx/mx-chain-crypto-go v1.2.12 h1:zWip7rpUS4CGthJxfKn5MZfMfYPjVjIiCID6uX5BSOk= github.com/multiversx/mx-chain-crypto-go v1.2.12/go.mod h1:HzcPpCm1zanNct/6h2rIh+MFrlXbjA5C8+uMyXj3LI4= github.com/multiversx/mx-chain-es-indexer-go v1.7.4 h1:SjJk9G9SN8baz0sFIU2jymYCfx3XiikGEB2wW0jwvfw= diff --git a/process/factory/interceptorscontainer/baseInterceptorsContainerFactory.go b/process/factory/interceptorscontainer/baseInterceptorsContainerFactory.go index cfed22b39c9..df0798da517 100644 --- a/process/factory/interceptorscontainer/baseInterceptorsContainerFactory.go +++ b/process/factory/interceptorscontainer/baseInterceptorsContainerFactory.go @@ -213,6 +213,50 @@ func createTopicAndAssignHandlerOnMessenger( return messenger.RegisterMessageProcessor(topic, common.DefaultInterceptorsIdentifier, interceptor) } +func (bicf *baseInterceptorsContainerFactory) generateReceiptNodeInterceptor() error { + shardC := bicf.shardCoordinator + + keys := make([]string, 0) + interceptorsSlice := make([]process.Interceptor, 0) + + receiptNodesInterceptorProcessor, err := processor.NewReceiptsNodesInterceptorProcessor(bicf.dataPool.Receipts()) + if err != nil { + return err + } + + interceptedTrieNodeFactory, err := interceptorFactory.NewInterceptedTrieNodeDataFactory(bicf.argInterceptorFactory) + if err != nil { + return err + } + + internalMarshaller := bicf.argInterceptorFactory.CoreComponents.InternalMarshalizer() + topic := factory.ReceiptTopic + shardC.CommunicationIdentifier(core.MetachainShardId) + multiDataInterceptor, err := interceptors.NewMultiDataInterceptor(interceptors.ArgMultiDataInterceptor{ + Topic: topic, + Marshalizer: internalMarshaller, + DataFactory: interceptedTrieNodeFactory, + Processor: receiptNodesInterceptorProcessor, + Throttler: bicf.globalThrottler, + AntifloodHandler: bicf.antifloodHandler, + WhiteListRequest: bicf.whiteListHandler, + CurrentPeerId: bicf.mainMessenger.ID(), + PreferredPeersHolder: bicf.preferredPeersHolder, + }) + if err != nil { + return err + } + + interceptor, err := bicf.createTopicAndAssignHandler(topic, multiDataInterceptor, true) + if err != nil { + return err + } + + keys = append(keys, topic) + interceptorsSlice = append(interceptorsSlice, interceptor) + + return bicf.addInterceptorsToContainers(keys, interceptorsSlice) +} + // ------- Tx interceptors func (bicf *baseInterceptorsContainerFactory) generateTxInterceptors() error { diff --git a/process/factory/interceptorscontainer/metaInterceptorsContainerFactory.go b/process/factory/interceptorscontainer/metaInterceptorsContainerFactory.go index 31a4344b771..139bbcfa3f5 100644 --- a/process/factory/interceptorscontainer/metaInterceptorsContainerFactory.go +++ b/process/factory/interceptorscontainer/metaInterceptorsContainerFactory.go @@ -196,6 +196,11 @@ func (micf *metaInterceptorsContainerFactory) Create() (process.InterceptorsCont return nil, nil, err } + err = micf.generateReceiptNodeInterceptor() + if err != nil { + return nil, nil, err + } + return micf.mainContainer, micf.fullArchiveContainer, nil } diff --git a/process/factory/interceptorscontainer/metaInterceptorsContainerFactory_test.go b/process/factory/interceptorscontainer/metaInterceptorsContainerFactory_test.go index b9124001264..38b5cf8d529 100644 --- a/process/factory/interceptorscontainer/metaInterceptorsContainerFactory_test.go +++ b/process/factory/interceptorscontainer/metaInterceptorsContainerFactory_test.go @@ -78,6 +78,9 @@ func createMetaDataPools() dataRetriever.PoolsHolder { RewardTransactionsCalled: func() dataRetriever.ShardedDataCacherNotifier { return testscommon.NewShardedDataStub() }, + ReceiptsCalled: func() storage.Cacher { + return testscommon.NewCacherStub() + }, } return pools @@ -605,10 +608,11 @@ func TestMetaInterceptorsContainerFactory_With4ShardsShouldWork(t *testing.T) { numInterceptorsHeartbeatForMetachain := 1 numInterceptorsShardValidatorInfoForMetachain := 1 numInterceptorValidatorInfo := 1 + numInterceptorReceiptsData := 1 totalInterceptors := numInterceptorsMetablock + numInterceptorsShardHeadersForMetachain + numInterceptorsTrieNodes + numInterceptorsTransactionsForMetachain + numInterceptorsUnsignedTxsForMetachain + numInterceptorsMiniBlocksForMetachain + numInterceptorsRewardsTxsForMetachain + numInterceptorsPeerAuthForMetachain + numInterceptorsHeartbeatForMetachain + - numInterceptorsShardValidatorInfoForMetachain + numInterceptorValidatorInfo + numInterceptorsShardValidatorInfoForMetachain + numInterceptorValidatorInfo + numInterceptorReceiptsData assert.Nil(t, err) assert.Equal(t, totalInterceptors, mainContainer.Len()) @@ -655,10 +659,11 @@ func TestMetaInterceptorsContainerFactory_With4ShardsShouldWork(t *testing.T) { numInterceptorsHeartbeatForMetachain := 1 numInterceptorsShardValidatorInfoForMetachain := 1 numInterceptorValidatorInfo := 1 + numInterceptorReceiptsData := 1 totalInterceptors := numInterceptorsMetablock + numInterceptorsShardHeadersForMetachain + numInterceptorsTrieNodes + numInterceptorsTransactionsForMetachain + numInterceptorsUnsignedTxsForMetachain + numInterceptorsMiniBlocksForMetachain + numInterceptorsRewardsTxsForMetachain + numInterceptorsPeerAuthForMetachain + numInterceptorsHeartbeatForMetachain + - numInterceptorsShardValidatorInfoForMetachain + numInterceptorValidatorInfo + numInterceptorsShardValidatorInfoForMetachain + numInterceptorValidatorInfo + numInterceptorReceiptsData assert.Nil(t, err) assert.Equal(t, totalInterceptors, mainContainer.Len()) diff --git a/process/factory/interceptorscontainer/shardInterceptorsContainerFactory.go b/process/factory/interceptorscontainer/shardInterceptorsContainerFactory.go index 26224fbc152..385214cce85 100644 --- a/process/factory/interceptorscontainer/shardInterceptorsContainerFactory.go +++ b/process/factory/interceptorscontainer/shardInterceptorsContainerFactory.go @@ -195,6 +195,11 @@ func (sicf *shardInterceptorsContainerFactory) Create() (process.InterceptorsCon return nil, nil, err } + err = sicf.generateReceiptNodeInterceptor() + if err != nil { + return nil, nil, err + } + return sicf.mainContainer, sicf.fullArchiveContainer, nil } diff --git a/process/factory/interceptorscontainer/shardInterceptorsContainerFactory_test.go b/process/factory/interceptorscontainer/shardInterceptorsContainerFactory_test.go index f802562ae35..b6bf3338c64 100644 --- a/process/factory/interceptorscontainer/shardInterceptorsContainerFactory_test.go +++ b/process/factory/interceptorscontainer/shardInterceptorsContainerFactory_test.go @@ -88,6 +88,9 @@ func createShardDataPools() dataRetriever.PoolsHolder { pools.CurrBlockTxsCalled = func() dataRetriever.TransactionCacher { return &mock.TxForCurrentBlockStub{} } + pools.ReceiptsCalled = func() storage.Cacher { + return testscommon.NewCacherStub() + } return pools } @@ -610,9 +613,11 @@ func TestShardInterceptorsContainerFactory_With4ShardsShouldWork(t *testing.T) { numInterceptorHeartbeat := 1 numInterceptorsShardValidatorInfo := 1 numInterceptorValidatorInfo := 1 + numInterceptorReceiptsData := 1 totalInterceptors := numInterceptorTxs + numInterceptorsUnsignedTxs + numInterceptorsRewardTxs + numInterceptorHeaders + numInterceptorMiniBlocks + numInterceptorMetachainHeaders + numInterceptorTrieNodes + - numInterceptorPeerAuth + numInterceptorHeartbeat + numInterceptorsShardValidatorInfo + numInterceptorValidatorInfo + numInterceptorPeerAuth + numInterceptorHeartbeat + numInterceptorsShardValidatorInfo + numInterceptorValidatorInfo + + numInterceptorReceiptsData assert.Nil(t, err) assert.Equal(t, totalInterceptors, mainContainer.Len()) @@ -658,9 +663,11 @@ func TestShardInterceptorsContainerFactory_With4ShardsShouldWork(t *testing.T) { numInterceptorHeartbeat := 1 numInterceptorsShardValidatorInfo := 1 numInterceptorValidatorInfo := 1 + numInterceptorReceiptsData := 1 totalInterceptors := numInterceptorTxs + numInterceptorsUnsignedTxs + numInterceptorsRewardTxs + numInterceptorHeaders + numInterceptorMiniBlocks + numInterceptorMetachainHeaders + numInterceptorTrieNodes + - numInterceptorPeerAuth + numInterceptorHeartbeat + numInterceptorsShardValidatorInfo + numInterceptorValidatorInfo + numInterceptorPeerAuth + numInterceptorHeartbeat + numInterceptorsShardValidatorInfo + numInterceptorValidatorInfo + + numInterceptorReceiptsData assert.Nil(t, err) assert.Equal(t, totalInterceptors, mainContainer.Len()) diff --git a/process/interceptors/processor/receiptsDataNodeInterceptorProcessor.go b/process/interceptors/processor/receiptsDataNodeInterceptorProcessor.go new file mode 100644 index 00000000000..c12e2f51962 --- /dev/null +++ b/process/interceptors/processor/receiptsDataNodeInterceptorProcessor.go @@ -0,0 +1,52 @@ +package processor + +import ( + "github.com/multiversx/mx-chain-core-go/core" + "github.com/multiversx/mx-chain-core-go/core/check" + "github.com/multiversx/mx-chain-go/process" + "github.com/multiversx/mx-chain-go/storage" +) + +var _ process.InterceptorProcessor = (*receiptsNodeInterceptorProcessor)(nil) + +// receiptsNodeInterceptorProcessor is the processor used when intercepting receipts nodes +type receiptsNodeInterceptorProcessor struct { + interceptedNodes storage.Cacher +} + +// NewReceiptsNodesInterceptorProcessor creates a new instance of ReceiptsNodeInterceptorProcessor +func NewReceiptsNodesInterceptorProcessor(interceptedNodes storage.Cacher) (*receiptsNodeInterceptorProcessor, error) { + if check.IfNil(interceptedNodes) { + return nil, process.ErrNilCacher + } + + return &receiptsNodeInterceptorProcessor{ + interceptedNodes: interceptedNodes, + }, nil +} + +// Validate checks if the intercepted data can be processed +func (p *receiptsNodeInterceptorProcessor) Validate(_ process.InterceptedData, _ core.PeerID) error { + return nil +} + +// Save saves the intercepted trie node in the intercepted nodes cacher +func (p *receiptsNodeInterceptorProcessor) Save(data process.InterceptedData, _ core.PeerID, _ string) error { + nodeData, ok := data.(interceptedDataSizeHandler) + if !ok { + return process.ErrWrongTypeAssertion + } + + p.interceptedNodes.Put(data.Hash(), nodeData, nodeData.SizeInBytes()+len(data.Hash())) + return nil +} + +// RegisterHandler registers a callback function to be notified of incoming trie nodes +func (p *receiptsNodeInterceptorProcessor) RegisterHandler(_ func(topic string, hash []byte, data interface{})) { + log.Error("ReceiptsNodeInterceptorProcessor.RegisterHandler", "error", "not implemented") +} + +// IsInterfaceNil returns true if there is no value under the interface +func (p *receiptsNodeInterceptorProcessor) IsInterfaceNil() bool { + return p == nil +} diff --git a/process/interface.go b/process/interface.go index 8e943d0a44e..5955ad7acf9 100644 --- a/process/interface.go +++ b/process/interface.go @@ -600,6 +600,7 @@ type RequestHandler interface { RequestTrieNode(requestHash []byte, topic string, chunkIndex uint32) CreateTrieNodeIdentifier(requestHash []byte, chunkIndex uint32) []byte RequestPeerAuthenticationsByHashes(destShardID uint32, hashes [][]byte) + RequestReceiptsTrieNodes(receiptsHashes [][]byte) RequestValidatorInfo(hash []byte) RequestValidatorsInfo(hashes [][]byte) IsInterfaceNil() bool diff --git a/process/receiptslog/errors.go b/process/receiptslog/errors.go index 3fc34df45e9..d9d3fa11742 100644 --- a/process/receiptslog/errors.go +++ b/process/receiptslog/errors.go @@ -4,3 +4,9 @@ import "errors" // ErrNilTrieInteractor signals that a nil trie interactor has been provided var ErrNilTrieInteractor = errors.New("trie interactor is nil") + +// ErrNilReceiptsDataSyncer signals that a nil receipt data syncer has been provided +var ErrNilReceiptsDataSyncer = errors.New("receipts data syncer is nil") + +// ErrReceiptTrieRootHashDoesNotMatch signal that the receipts trie root hash does not match +var ErrReceiptTrieRootHashDoesNotMatch = errors.New("receipts trie root hash does not match") diff --git a/process/receiptslog/interface.go b/process/receiptslog/interface.go index b88b230e966..d3295fd4968 100644 --- a/process/receiptslog/interface.go +++ b/process/receiptslog/interface.go @@ -1,12 +1,20 @@ package receiptslog -import "github.com/multiversx/mx-chain-core-go/data/state" +import ( + "context" + "github.com/multiversx/mx-chain-core-go/data/state" + "github.com/multiversx/mx-chain-go/common" + "github.com/multiversx/mx-chain-go/storage" +) // Interactor defines what a trie interactor should be able to do type Interactor interface { CreateNewTrie() error AddReceiptData(receiptData state.Receipt) error Save() ([]byte, error) + GetSerializedNode(nodeHash []byte) ([]byte, error) + RecreateTrieFromDB(rootHash []byte, db storage.Persister) (common.Trie, error) + SaveNewTrie(localTrie common.Trie) ([]byte, error) IsInterfaceNil() bool } @@ -16,3 +24,11 @@ type ReceiptsManagerHandler interface { SyncReceiptsTrie(receiptsRootHash []byte) error IsInterfaceNil() bool } + +// ReceiptsDataSyncer defines what a receipts data syncer should be able to do +type ReceiptsDataSyncer interface { + SyncReceiptsDataFor(hashes [][]byte, ctx context.Context) error + GetReceiptsData() (map[string][]byte, error) + ClearFields() + IsInterfaceNil() bool +} diff --git a/process/receiptslog/receiptsManager.go b/process/receiptslog/receiptsManager.go index c405e4481fb..38ea4184d32 100644 --- a/process/receiptslog/receiptsManager.go +++ b/process/receiptslog/receiptsManager.go @@ -1,13 +1,26 @@ package receiptslog import ( + "bytes" + "context" + "encoding/hex" + "fmt" + "github.com/multiversx/mx-chain-core-go/core/check" "github.com/multiversx/mx-chain-core-go/data/state" + "github.com/multiversx/mx-chain-core-go/hashing" + "github.com/multiversx/mx-chain-core-go/marshal" + "github.com/multiversx/mx-chain-go/storage" + "github.com/multiversx/mx-chain-go/storage/database" + "github.com/multiversx/mx-chain-go/trie" ) // ArgsReceiptsManager is the structure that holds the components needed to a new receipts manager type ArgsReceiptsManager struct { - TrieHandler Interactor + TrieHandler Interactor + ReceiptsDataSyncer ReceiptsDataSyncer + Marshaller marshal.Marshalizer + Hasher hashing.Hasher } // ArgsGenerateReceiptsAndSave is the DTO needed to provided input data to generate receipts @@ -17,7 +30,10 @@ type ArgsGenerateReceiptsAndSave struct { } type receiptsManager struct { - trieInteractor Interactor + trieInteractor Interactor + receiptsDataSyncer ReceiptsDataSyncer + marshaller marshal.Marshalizer + hasher hashing.Hasher } // NewReceiptsManager will create a new instance of receipts manager @@ -25,9 +41,15 @@ func NewReceiptsManager(args ArgsReceiptsManager) (*receiptsManager, error) { if check.IfNil(args.TrieHandler) { return nil, ErrNilTrieInteractor } + if check.IfNil(args.ReceiptsDataSyncer) { + return nil, ErrNilReceiptsDataSyncer + } return &receiptsManager{ - trieInteractor: args.TrieHandler, + trieInteractor: args.TrieHandler, + receiptsDataSyncer: args.ReceiptsDataSyncer, + marshaller: args.Marshaller, + hasher: args.Hasher, }, nil } @@ -58,6 +80,80 @@ func (rm *receiptsManager) GenerateReceiptsTrieAndSaveDataInStorage(args ArgsGen // SyncReceiptsTrie will sync the receipts trie from network func (rm *receiptsManager) SyncReceiptsTrie(receiptsRootHash []byte) error { + nodesMap, err := rm.syncBranchNodesData(receiptsRootHash) + if err != nil { + return err + } + + memoryDB := database.NewMemDB() + leafNodesHashes, err := trie.GetLeafHashesAndPutNodesInRamStorage(nodesMap, memoryDB, rm.hasher, rm.marshaller) + if err != nil { + return err + } + + err = rm.syncLeafNodesAndPutInStorer(leafNodesHashes, memoryDB) + if err != nil { + return err + } + + newTrie, err := rm.trieInteractor.RecreateTrieFromDB(receiptsRootHash, memoryDB) + if err != nil { + return err + } + + newTrieRootHash, err := rm.trieInteractor.SaveNewTrie(newTrie) + if err != nil { + return err + } + + if !bytes.Equal(newTrieRootHash, receiptsRootHash) { + return fmt.Errorf("%v , expected=%s, actual=%s", ErrReceiptTrieRootHashDoesNotMatch, hex.EncodeToString(receiptsRootHash), hex.EncodeToString(newTrieRootHash)) + } + + return nil +} + +func (rm *receiptsManager) syncBranchNodesData(receiptsRootHash []byte) (map[string][]byte, error) { + receiptsDataMap, err := rm.syncData([][]byte{receiptsRootHash}) + if err != nil { + return nil, err + } + + receiptTrieBranchNodesBytes := receiptsDataMap[string(receiptsRootHash)] + + serializedNodes := state.NewSerializedNodesMap() + err = rm.marshaller.Unmarshal(serializedNodes, receiptTrieBranchNodesBytes) + if err != nil { + return nil, err + } + + return serializedNodes.SerializedNodes, nil +} + +func (rm *receiptsManager) syncData(hashes [][]byte) (map[string][]byte, error) { + err := rm.receiptsDataSyncer.SyncReceiptsDataFor(hashes, context.Background()) + if err != nil { + return nil, err + } + + defer rm.receiptsDataSyncer.ClearFields() + + return rm.receiptsDataSyncer.GetReceiptsData() +} + +func (rm *receiptsManager) syncLeafNodesAndPutInStorer(hashes [][]byte, db storage.Persister) error { + leafNodesMap, err := rm.syncData(hashes) + if err != nil { + return err + } + + for leafHash, leafBytes := range leafNodesMap { + err = db.Put([]byte(leafHash), leafBytes) + if err != nil { + return err + } + } + return nil } diff --git a/process/receiptslog/receiptsManagerCreator.go b/process/receiptslog/receiptsManagerCreator.go index 57e3516a0ee..385897316be 100644 --- a/process/receiptslog/receiptsManagerCreator.go +++ b/process/receiptslog/receiptsManagerCreator.go @@ -6,11 +6,13 @@ import ( "github.com/multiversx/mx-chain-go/common" "github.com/multiversx/mx-chain-go/process" "github.com/multiversx/mx-chain-go/storage" + updateSync "github.com/multiversx/mx-chain-go/update/sync" ) // ArgsCreateReceiptsManager holds all the components needed to create a receipts manager type ArgsCreateReceiptsManager struct { ReceiptDataStorer storage.Storer + ReceiptDataCacher storage.Cacher Marshaller marshal.Marshalizer Hasher hashing.Hasher EnableEpochsHandler common.EnableEpochsHandler @@ -29,7 +31,16 @@ func CreateReceiptsManager(args ArgsCreateReceiptsManager) (*receiptsManager, er return nil, err } + receiptsDataSyncer, err := updateSync.NewReceiptsDataSyncer(updateSync.ArgsNewReceiptsDataSyncer{ + Cache: args.ReceiptDataCacher, + RequestHandler: args.RequestHandler, + }) + if err != nil { + return nil, err + } + return NewReceiptsManager(ArgsReceiptsManager{ - TrieHandler: trieHandler, + TrieHandler: trieHandler, + ReceiptsDataSyncer: receiptsDataSyncer, }) } diff --git a/process/receiptslog/receiptsManager_test.go b/process/receiptslog/receiptsManager_test.go new file mode 100644 index 00000000000..528914c3a38 --- /dev/null +++ b/process/receiptslog/receiptsManager_test.go @@ -0,0 +1,70 @@ +package receiptslog + +import ( + "context" + "testing" + + "github.com/multiversx/mx-chain-core-go/data/state" + "github.com/multiversx/mx-chain-core-go/marshal" + "github.com/multiversx/mx-chain-go/process/mock" + "github.com/multiversx/mx-chain-go/testscommon" + "github.com/stretchr/testify/require" +) + +func TestReceiptsManager_SyncReceiptsTrie(t *testing.T) { + rHash1 := []byte("hash1") + rHash2 := []byte("hash2") + + rec1 := state.Receipt{ + TxHash: rHash1, + } + rec2 := state.Receipt{ + TxHash: rHash2, + } + + storer := mock.NewStorerMock() + args := createArgsTrieInteractor() + args.Marshaller = &marshal.GogoProtoMarshalizer{} + args.ReceiptDataStorer = storer + + interactor, err := NewTrieInteractor(args) + require.Nil(t, err) + + err = interactor.CreateNewTrie() + require.Nil(t, err) + err = interactor.AddReceiptData(rec1) + require.Nil(t, err) + err = interactor.AddReceiptData(rec2) + require.Nil(t, err) + + receiptsTrieHash, err := interactor.Save() + require.Nil(t, err) + + resultsMap := make(map[string][]byte) + + manager, err := NewReceiptsManager(ArgsReceiptsManager{ + TrieHandler: interactor, + ReceiptsDataSyncer: &testscommon.ReceiptsDataSyncerStub{ + SyncReceiptsDataForCalled: func(hashes [][]byte, ctx context.Context) error { + resultsMap = make(map[string][]byte) + for _, hash := range hashes { + res, errGet := storer.Get(hash) + require.Nil(t, errGet) + + resultsMap[string(hash)] = res + } + + return nil + }, + GetReceiptsDataCalled: func() (map[string][]byte, error) { + return resultsMap, nil + }, + }, + Marshaller: args.Marshaller, + Hasher: args.Hasher, + }) + require.Nil(t, err) + + err = manager.SyncReceiptsTrie(receiptsTrieHash) + require.Nil(t, err) +} diff --git a/process/receiptslog/storageManager.go b/process/receiptslog/storageManager.go new file mode 100644 index 00000000000..ffded406653 --- /dev/null +++ b/process/receiptslog/storageManager.go @@ -0,0 +1,124 @@ +package receiptslog + +import ( + "github.com/multiversx/mx-chain-go/common" + "github.com/multiversx/mx-chain-go/common/statistics/disabled" + "github.com/multiversx/mx-chain-go/storage" +) + +type storageManagerOnlyGet struct { + db storage.Persister +} + +// NewStorageManagerOnlyGet will create a new instance of storageManagerOnlyGet +func NewStorageManagerOnlyGet(db storage.Persister) (*storageManagerOnlyGet, error) { + return &storageManagerOnlyGet{ + db: db, + }, nil +} + +// Put - +func (s storageManagerOnlyGet) Put(_, _ []byte) error { + return nil +} + +// Get will return the data from local db based on the provided key +func (s storageManagerOnlyGet) Get(key []byte) ([]byte, error) { + return s.db.Get(key) +} + +// Remove - +func (s storageManagerOnlyGet) Remove(_ []byte) error { + return nil +} + +// Close - +func (s storageManagerOnlyGet) Close() error { + return nil +} + +// IsInterfaceNil - +func (s storageManagerOnlyGet) IsInterfaceNil() bool { + return false +} + +// PutInEpoch - +func (s storageManagerOnlyGet) PutInEpoch(_ []byte, _ []byte, _ uint32) error { + return nil +} + +// GetIdentifier - +func (s storageManagerOnlyGet) GetIdentifier() string { + return "" +} + +// GetStateStatsHandler - +func (s storageManagerOnlyGet) GetStateStatsHandler() common.StateStatisticsHandler { + return disabled.NewStateStatistics() +} + +// GetFromCurrentEpoch - +func (s storageManagerOnlyGet) GetFromCurrentEpoch(_ []byte) ([]byte, error) { + return nil, nil +} + +// PutInEpochWithoutCache - +func (s storageManagerOnlyGet) PutInEpochWithoutCache(_ []byte, _ []byte, _ uint32) error { + return nil +} + +// TakeSnapshot - +func (s storageManagerOnlyGet) TakeSnapshot(_ string, _ []byte, _ []byte, _ *common.TrieIteratorChannels, _ chan []byte, _ common.SnapshotStatisticsHandler, _ uint32) { +} + +// GetLatestStorageEpoch - +func (s storageManagerOnlyGet) GetLatestStorageEpoch() (uint32, error) { + return 0, nil +} + +// IsPruningEnabled - +func (s storageManagerOnlyGet) IsPruningEnabled() bool { + return false +} + +// IsPruningBlocked - +func (s storageManagerOnlyGet) IsPruningBlocked() bool { + return false +} + +// EnterPruningBufferingMode - +func (s storageManagerOnlyGet) EnterPruningBufferingMode() { +} + +// ExitPruningBufferingMode - +func (s storageManagerOnlyGet) ExitPruningBufferingMode() { +} + +// RemoveFromAllActiveEpochs - +func (s storageManagerOnlyGet) RemoveFromAllActiveEpochs(_ []byte) error { + return nil +} + +// SetEpochForPutOperation - +func (s storageManagerOnlyGet) SetEpochForPutOperation(_ uint32) { +} + +// ShouldTakeSnapshot - +func (s storageManagerOnlyGet) ShouldTakeSnapshot() bool { + return false +} + +// IsSnapshotSupported - +func (s storageManagerOnlyGet) IsSnapshotSupported() bool { + return false +} + +// GetBaseTrieStorageManager - +func (s storageManagerOnlyGet) GetBaseTrieStorageManager() common.StorageManager { + return nil +} + +// IsClosed - +func (s storageManagerOnlyGet) IsClosed() bool { + return false +} diff --git a/process/receiptslog/trieInteractor.go b/process/receiptslog/trieInteractor.go index 34a6759e551..2e2af473c02 100644 --- a/process/receiptslog/trieInteractor.go +++ b/process/receiptslog/trieInteractor.go @@ -6,10 +6,11 @@ import ( "github.com/multiversx/mx-chain-core-go/hashing" "github.com/multiversx/mx-chain-core-go/marshal" "github.com/multiversx/mx-chain-go/common" + "github.com/multiversx/mx-chain-go/common/holders" "github.com/multiversx/mx-chain-go/dataRetriever" "github.com/multiversx/mx-chain-go/process" "github.com/multiversx/mx-chain-go/storage" - "github.com/multiversx/mx-chain-go/testscommon/storageManager" + "github.com/multiversx/mx-chain-go/storage/database" "github.com/multiversx/mx-chain-go/trie" ) @@ -49,7 +50,10 @@ func NewTrieInteractor(args ArgsTrieInteractor) (*trieInteractor, error) { // CreateNewTrie will create a new local trie(also will overwrite the old local trie) func (ti *trieInteractor) CreateNewTrie() error { - disabledStorageManager := &storageManager.StorageManagerStub{} + disabledStorageManager, err := NewStorageManagerOnlyGet(database.NewMemDB()) + if err != nil { + return err + } localTrie, err := trie.NewTrie(disabledStorageManager, ti.marshaller, ti.hasher, ti.enableEpochsHandler, maxTrieLevelInMemory) if err != nil { @@ -61,9 +65,16 @@ func (ti *trieInteractor) CreateNewTrie() error { return nil } +// SaveNewTrie will save in storage the synced trie +func (ti *trieInteractor) SaveNewTrie(localTrie common.Trie) ([]byte, error) { + ti.localTrie = localTrie + + return ti.Save() +} + // AddReceiptData will add receipt data in local trie func (ti *trieInteractor) AddReceiptData(receiptData state.Receipt) error { - receiptDataBytes, err := ti.marshaller.Marshal(receiptData) + receiptDataBytes, err := ti.marshaller.Marshal(&receiptData) if err != nil { return err } @@ -83,8 +94,8 @@ func (ti *trieInteractor) Save() ([]byte, error) { return nil, errGet } - serializedNodes := make([][]byte, 0) - serializedNodes, err = ti.saveNodeData(currentNodeData, serializedNodes) + serializedNodes := state.NewSerializedNodesMap() + err = ti.saveNodeData(currentNodeData, serializedNodes) if err != nil { return nil, err } @@ -100,14 +111,14 @@ func (ti *trieInteractor) Save() ([]byte, error) { return nil, errGet } - serializedNodes, err = ti.saveNodeData(currentNodeData, serializedNodes) + err = ti.saveNodeData(currentNodeData, serializedNodes) if err != nil { return nil, err } } - listOfSerializedNodesBytes, err := ti.marshaller.Marshal(&serializedNodes) + listOfSerializedNodesBytes, err := ti.marshaller.Marshal(serializedNodes) if err != nil { return nil, err } @@ -125,6 +136,11 @@ func (ti *trieInteractor) Save() ([]byte, error) { return receiptTrieRootHash, nil } +// GetSerializedNode will return the serialized node with the provided hash +func (ti *trieInteractor) GetSerializedNode(nodeHash []byte) ([]byte, error) { + return ti.storage.Get(nodeHash) +} + func (ti *trieInteractor) saveReceiptTxHashLeafKey(leafHash []byte, leafData []byte) error { receiptData := &state.Receipt{} err := ti.marshaller.Unmarshal(receiptData, leafData) @@ -135,11 +151,6 @@ func (ti *trieInteractor) saveReceiptTxHashLeafKey(leafHash []byte, leafData []b return ti.storage.Put(receiptData.TxHash, leafHash) } -// IsInterfaceNil returns true if there is no value under the interface -func (ti *trieInteractor) IsInterfaceNil() bool { - return ti == nil -} - func checkArgs(args ArgsTrieInteractor) error { if check.IfNil(args.EnableEpochsHandler) { return process.ErrNilEnableEpochsHandler @@ -157,21 +168,37 @@ func checkArgs(args ArgsTrieInteractor) error { return nil } -func (ti *trieInteractor) saveNodeData(currentNodeData *trie.CurrentNodeInfo, serializedNodes [][]byte) ([][]byte, error) { - if currentNodeData.Type != trie.LeafNodeType { - serializedNodes = append(serializedNodes, currentNodeData.SerializedNode) - return serializedNodes, nil +// RecreateTrieFromDB will recreate the trie from the provided storer +func (ti *trieInteractor) RecreateTrieFromDB(rootHash []byte, db storage.Persister) (common.Trie, error) { + storageManager, err := NewStorageManagerOnlyGet(db) + if err != nil { + return nil, err } - err := ti.storage.Put(currentNodeData.Hash, currentNodeData.SerializedNode) + localTrie, err := trie.NewTrie(storageManager, ti.marshaller, ti.hasher, ti.enableEpochsHandler, maxTrieLevelInMemory) if err != nil { return nil, err } - err = ti.saveReceiptTxHashLeafKey(currentNodeData.Hash, currentNodeData.Value) + rootHashHolder := holders.NewDefaultRootHashesHolder(rootHash) + return localTrie.Recreate(rootHashHolder) +} + +func (ti *trieInteractor) saveNodeData(currentNodeData *trie.CurrentNodeInfo, serializedNodes *state.SerializedNodeMap) error { + if currentNodeData.Type != trie.LeafNodeType { + serializedNodes.SerializedNodes[string(currentNodeData.Hash)] = currentNodeData.SerializedNode + return nil + } + + err := ti.storage.Put(currentNodeData.Hash, currentNodeData.SerializedNode) if err != nil { - return nil, err + return err } - return serializedNodes, nil + return ti.saveReceiptTxHashLeafKey(currentNodeData.Hash, currentNodeData.Value) +} + +// IsInterfaceNil returns true if there is no value under the interface +func (ti *trieInteractor) IsInterfaceNil() bool { + return ti == nil } diff --git a/process/receiptslog/trieRecreate.go b/process/receiptslog/trieRecreate.go deleted file mode 100644 index b9ecf0f2ba0..00000000000 --- a/process/receiptslog/trieRecreate.go +++ /dev/null @@ -1 +0,0 @@ -package receiptslog diff --git a/testscommon/realConfigsHandling.go b/testscommon/realConfigsHandling.go index e58b36923f8..6fcec3182da 100644 --- a/testscommon/realConfigsHandling.go +++ b/testscommon/realConfigsHandling.go @@ -8,6 +8,7 @@ import ( "github.com/multiversx/mx-chain-go/common" "github.com/multiversx/mx-chain-go/config" + p2pConfig "github.com/multiversx/mx-chain-go/p2p/config" ) // CreateTestConfigs will try to copy the whole configs directory to a temp directory and return the configs after load @@ -112,8 +113,9 @@ func CreateTestConfigs(tempDir string, originalConfigsPath string) (*config.Conf SmartContracts: newGenesisSmartContractsFilename, ValidatorKey: "validatorKey.pem", }, - EpochConfig: epochConfig, - RoundConfig: roundConfig, + EpochConfig: epochConfig, + RoundConfig: roundConfig, + LightClientP2pConfig: &p2pConfig.P2PConfig{}, }, nil } diff --git a/testscommon/receiptsDataSyncerStub.go b/testscommon/receiptsDataSyncerStub.go new file mode 100644 index 00000000000..9c3476bdeaf --- /dev/null +++ b/testscommon/receiptsDataSyncerStub.go @@ -0,0 +1,35 @@ +package testscommon + +import "context" + +// ReceiptsDataSyncerStub - +type ReceiptsDataSyncerStub struct { + GetReceiptsDataCalled func() (map[string][]byte, error) + SyncReceiptsDataForCalled func(hashes [][]byte, ctx context.Context) error +} + +// SyncReceiptsDataFor - +func (r *ReceiptsDataSyncerStub) SyncReceiptsDataFor(hashes [][]byte, ctx context.Context) error { + if r.SyncReceiptsDataForCalled != nil { + return r.SyncReceiptsDataForCalled(hashes, ctx) + } + return nil +} + +// GetReceiptsData - +func (r *ReceiptsDataSyncerStub) GetReceiptsData() (map[string][]byte, error) { + if r.GetReceiptsDataCalled != nil { + return r.GetReceiptsDataCalled() + } + return nil, nil +} + +// ClearFields - +func (r *ReceiptsDataSyncerStub) ClearFields() { + +} + +// IsInterfaceNil - +func (r *ReceiptsDataSyncerStub) IsInterfaceNil() bool { + return r == nil +} diff --git a/testscommon/requestHandlerStub.go b/testscommon/requestHandlerStub.go index 395e78e3100..4ff6f499b34 100644 --- a/testscommon/requestHandlerStub.go +++ b/testscommon/requestHandlerStub.go @@ -24,6 +24,10 @@ type RequestHandlerStub struct { RequestValidatorsInfoCalled func(hashes [][]byte) } +// RequestReceiptsTrieNodes - +func (rhs *RequestHandlerStub) RequestReceiptsTrieNodes(_ [][]byte) { +} + // SetNumPeersToQuery - func (rhs *RequestHandlerStub) SetNumPeersToQuery(key string, intra int, cross int) error { if rhs.SetNumPeersToQueryCalled != nil { diff --git a/trie/branchNode.go b/trie/branchNode.go index c4d7095ff39..a2bb373fd8f 100644 --- a/trie/branchNode.go +++ b/trie/branchNode.go @@ -999,6 +999,18 @@ func (*branchNode) getType() string { return BranchNodeType } +func (bn *branchNode) getChildrenHashes() [][]byte { + hashes := make([][]byte, 0, len(bn.EncodedChildren)) + for _, hash := range bn.EncodedChildren { + if len(hash) == 0 { + continue + } + hashes = append(hashes, hash) + } + + return hashes +} + // IsInterfaceNil returns true if there is no value under the interface func (bn *branchNode) IsInterfaceNil() bool { return bn == nil diff --git a/trie/extensionNode.go b/trie/extensionNode.go index 5cbd2ce5d3b..027b4e91f4d 100644 --- a/trie/extensionNode.go +++ b/trie/extensionNode.go @@ -791,6 +791,14 @@ func (*extensionNode) getType() string { return ExtensionNodeType } +func (en *extensionNode) getChildrenHashes() [][]byte { + if len(en.EncodedChild) == 0 { + return [][]byte{} + } + + return [][]byte{en.EncodedChild} +} + // IsInterfaceNil returns true if there is no value under the interface func (en *extensionNode) IsInterfaceNil() bool { return en == nil diff --git a/trie/interface.go b/trie/interface.go index b32cde2a9e4..a71459c8d05 100644 --- a/trie/interface.go +++ b/trie/interface.go @@ -56,6 +56,7 @@ type node interface { sizeInBytes() int collectStats(handler common.TrieStatisticsHandler, depthLevel int, db common.TrieStorageInteractor) error getType() string + getChildrenHashes() [][]byte IsInterfaceNil() bool } diff --git a/trie/leafNode.go b/trie/leafNode.go index 9800c0edf09..bc7907b46a4 100644 --- a/trie/leafNode.go +++ b/trie/leafNode.go @@ -565,6 +565,10 @@ func (*leafNode) getType() string { return LeafNodeType } +func (*leafNode) getChildrenHashes() [][]byte { + return [][]byte{} +} + // IsInterfaceNil returns true if there is no value under the interface func (ln *leafNode) IsInterfaceNil() bool { return ln == nil diff --git a/trie/receiptsTrie.go b/trie/receiptsTrie.go new file mode 100644 index 00000000000..fd1bf2587a2 --- /dev/null +++ b/trie/receiptsTrie.go @@ -0,0 +1,51 @@ +package trie + +import ( + "github.com/multiversx/mx-chain-core-go/hashing" + "github.com/multiversx/mx-chain-core-go/marshal" + "github.com/multiversx/mx-chain-go/storage" +) + +// GetLeafHashesAndPutNodesInRamStorage will return the leaf node hashes and put the rest of nodes in a storer +func GetLeafHashesAndPutNodesInRamStorage( + branchNodesMap map[string][]byte, + db storage.Persister, + hasher hashing.Hasher, + marshaller marshal.Marshalizer, +) ([][]byte, error) { + leafHashes := make([][]byte, 0) + for nodeHash, branchNodeSerialized := range branchNodesMap { + decodedNode, err := decodeNode(branchNodeSerialized, marshaller, hasher) + if err != nil { + return nil, err + } + + childrenHashes := decodedNode.getChildrenHashes() + if len(childrenHashes) == 0 { + continue + } + + leafHashes = append(leafHashes, getLeafHashesFromChildrenHashes(childrenHashes, branchNodesMap)...) + + err = db.Put([]byte(nodeHash), branchNodeSerialized) + if err != nil { + return nil, err + } + } + + return leafHashes, nil +} + +func getLeafHashesFromChildrenHashes(childrenHashes [][]byte, nodesMap map[string][]byte) [][]byte { + leafHashes := make([][]byte, 0) + for _, childHash := range childrenHashes { + _, isBranchNodeOrExtensionNode := nodesMap[string(childHash)] + if isBranchNodeOrExtensionNode { + continue + } + + leafHashes = append(leafHashes, childHash) + } + + return leafHashes +} diff --git a/update/sync/syncReceiptsData.go b/update/sync/syncReceiptsData.go new file mode 100644 index 00000000000..bafd85b9ee1 --- /dev/null +++ b/update/sync/syncReceiptsData.go @@ -0,0 +1,177 @@ +package sync + +import ( + "context" + "sync" + "time" + + "github.com/multiversx/mx-chain-core-go/core" + "github.com/multiversx/mx-chain-core-go/core/check" + "github.com/multiversx/mx-chain-go/process" + "github.com/multiversx/mx-chain-go/storage" + "github.com/multiversx/mx-chain-go/update" +) + +// ArgsNewReceiptsDataSyncer defines the arguments needed for the sycner +type ArgsNewReceiptsDataSyncer struct { + Cache storage.Cacher + RequestHandler process.RequestHandler +} + +type receiptsDataSyncer struct { + mapHashes map[string]struct{} + mapReceiptsData map[string][]byte + requestHandler process.RequestHandler + pool storage.Cacher + mutex sync.Mutex + waitTimeBetweenRequests time.Duration + chReceivedAll chan bool + stopSync bool + syncedAll bool +} + +// NewReceiptsDataSyncer will create a new instance of receipt data syncer +func NewReceiptsDataSyncer(args ArgsNewReceiptsDataSyncer) (*receiptsDataSyncer, error) { + if check.IfNil(args.Cache) { + return nil, update.ErrNilCacher + } + if check.IfNil(args.RequestHandler) { + return nil, update.ErrNilRequestHandler + } + + syncer := &receiptsDataSyncer{ + requestHandler: args.RequestHandler, + pool: args.Cache, + mutex: sync.Mutex{}, + waitTimeBetweenRequests: args.RequestHandler.RequestInterval(), + chReceivedAll: make(chan bool), + mapHashes: make(map[string]struct{}), + stopSync: true, + syncedAll: true, + } + + syncer.pool.RegisterHandler(syncer.receivedReceiptsData, core.UniqueIdentifier()) + + return syncer, nil +} + +func (rds *receiptsDataSyncer) receivedReceiptsData(receiptDataHash []byte, val interface{}) { + rds.mutex.Lock() + if rds.stopSync { + rds.mutex.Unlock() + return + } + + if _, ok := rds.mapHashes[string(receiptDataHash)]; !ok { + rds.mutex.Unlock() + return + } + + if _, ok := rds.mapReceiptsData[string(receiptDataHash)]; ok { + rds.mutex.Unlock() + return + } + + receiptDataBytes, ok := val.([]byte) + if !ok { + rds.mutex.Unlock() + return + } + + rds.mapReceiptsData[string(receiptDataHash)] = receiptDataBytes + receivedAll := len(rds.mapHashes) == len(rds.mapReceiptsData) + rds.mutex.Unlock() + if receivedAll { + rds.chReceivedAll <- true + } +} + +// SyncReceiptsDataFor syncs receipts data the provided hashes +func (rds *receiptsDataSyncer) SyncReceiptsDataFor(hashes [][]byte, ctx context.Context) error { + _ = core.EmptyChannel(rds.chReceivedAll) + + for { + rds.mutex.Lock() + rds.requestReceiptsTrieNodes(hashes) + rds.mutex.Unlock() + + select { + case <-rds.chReceivedAll: + rds.mutex.Lock() + rds.stopSync = true + rds.syncedAll = true + rds.mutex.Unlock() + return nil + case <-time.After(rds.waitTimeBetweenRequests): + rds.mutex.Lock() + log.Debug("receiptsDataSyncer.SyncReceiptsDataFor", "num nodes needed", len(hashes), "num nodes got", len(rds.mapHashes)) + rds.mutex.Unlock() + continue + case <-ctx.Done(): + rds.mutex.Lock() + rds.stopSync = true + rds.mutex.Unlock() + return update.ErrTimeIsOut + } + } +} + +func (rds *receiptsDataSyncer) requestReceiptsTrieNodes(hashes [][]byte) { + hashesToRequest := make([][]byte, 0) + for _, hash := range hashes { + _, ok := rds.mapReceiptsData[string(hash)] + if ok { + continue + } + + rds.mapHashes[string(hash)] = struct{}{} + receiptsDataBytes, ok := rds.getReceiptDataFromPool(hash) + if ok { + rds.mapReceiptsData[string(hash)] = receiptsDataBytes + continue + } + + hashesToRequest = append(hashesToRequest, hash) + } + + rds.requestHandler.RequestReceiptsTrieNodes(hashesToRequest) +} + +func (rds *receiptsDataSyncer) getReceiptDataFromPool(hash []byte) ([]byte, bool) { + res, ok := rds.pool.Peek(hash) + if !ok { + return nil, false + } + + receiptDataBytes, ok := res.([]byte) + if !ok { + return nil, false + } + + return receiptDataBytes, true +} + +// GetReceiptsData returns the synced receipts data +func (rds *receiptsDataSyncer) GetReceiptsData() (map[string][]byte, error) { + rds.mutex.Lock() + defer rds.mutex.Unlock() + + if !rds.syncedAll { + return nil, update.ErrNotSynced + } + + return rds.mapReceiptsData, nil +} + +// ClearFields will clear all the maps +func (rds *receiptsDataSyncer) ClearFields() { + rds.mutex.Lock() + rds.mapHashes = make(map[string]struct{}) + rds.mapReceiptsData = make(map[string][]byte) + rds.mutex.Unlock() +} + +// IsInterfaceNil returns true if underlying object is nil +func (rds *receiptsDataSyncer) IsInterfaceNil() bool { + return rds == nil +}