Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[lite-client] Interceptor and recreate trie #6484

Open
wants to merge 15 commits into
base: feat/light-client
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dataRetriever/requestHandlers/requestHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ func (rrh *resolverRequestHandler) RequestTransaction(destShardID uint32, txHash
rrh.requestByHashes(destShardID, txHashes, factory.TransactionTopic, uniqueTxSuffix)
}

// RequestReceipts method ask for receipts from the connected peers
func (rrh *resolverRequestHandler) RequestReceipts(receiptsHashes [][]byte) {
// RequestReceiptsTrieNodes method ask for receipts from the connected peers
func (rrh *resolverRequestHandler) RequestReceiptsTrieNodes(receiptsHashes [][]byte) {
rrh.requestByHashes(rrh.shardID, receiptsHashes, factory.ReceiptTopic, uniqueReceiptSuffix)
}

Expand Down
6 changes: 6 additions & 0 deletions epochStart/bootstrap/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -979,6 +979,9 @@ func TestCreateSyncers(t *testing.T) {
HeartbeatsCalled: func() storage.Cacher {
return testscommon.NewCacherStub()
},
ReceiptsCalled: func() storage.Cacher {
return testscommon.NewCacherStub()
},
}
epochStartProvider.whiteListHandler = &testscommon.WhiteListHandlerStub{}
epochStartProvider.whiteListerVerifiedTxs = &testscommon.WhiteListHandlerStub{}
Expand Down Expand Up @@ -2394,6 +2397,9 @@ func TestSyncSetGuardianTransaction(t *testing.T) {
HeartbeatsCalled: func() storage.Cacher {
return testscommon.NewCacherStub()
},
ReceiptsCalled: func() storage.Cacher {
return testscommon.NewCacherStub()
},
}
epochStartProvider.whiteListHandler = &testscommon.WhiteListHandlerStub{
IsWhiteListedCalled: func(interceptedData process.InterceptedData) bool {
Expand Down
4 changes: 4 additions & 0 deletions genesis/process/disabled/requestHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ import "time"
type RequestHandler struct {
}

// RequestReceiptsTrieNodes -
func (r *RequestHandler) RequestReceiptsTrieNodes(_ [][]byte) {
}

// SetEpoch does nothing
func (r *RequestHandler) SetEpoch(_ uint32) {
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
github.com/klauspost/cpuid/v2 v2.2.5
github.com/mitchellh/mapstructure v1.5.0
github.com/multiversx/mx-chain-communication-go v1.1.0
github.com/multiversx/mx-chain-core-go v1.2.22-0.20240821080303-1673ce1a0fd8
github.com/multiversx/mx-chain-core-go v1.2.23-0.20240924080818-9cfce0926498
github.com/multiversx/mx-chain-crypto-go v1.2.12
github.com/multiversx/mx-chain-es-indexer-go v1.7.4
github.com/multiversx/mx-chain-logger-go v1.0.15
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -387,8 +387,8 @@ github.com/multiversx/concurrent-map v0.1.4 h1:hdnbM8VE4b0KYJaGY5yJS2aNIW9TFFsUY
github.com/multiversx/concurrent-map v0.1.4/go.mod h1:8cWFRJDOrWHOTNSqgYCUvwT7c7eFQ4U2vKMOp4A/9+o=
github.com/multiversx/mx-chain-communication-go v1.1.0 h1:J7bX6HoN3HiHY7cUeEjG8AJWgQDDPcY+OPDOsSUOkRE=
github.com/multiversx/mx-chain-communication-go v1.1.0/go.mod h1:WK6bP4pGEHGDDna/AYRIMtl6G9OA0NByI1Lw8PmOnRM=
github.com/multiversx/mx-chain-core-go v1.2.22-0.20240821080303-1673ce1a0fd8 h1:zlm+Brw3jPj5lQ8m7wGLwYWD4ebWArGd5qE5/99qIWQ=
github.com/multiversx/mx-chain-core-go v1.2.22-0.20240821080303-1673ce1a0fd8/go.mod h1:B5zU4MFyJezmEzCsAHE9YNULmGCm2zbPHvl9hazNxmE=
github.com/multiversx/mx-chain-core-go v1.2.23-0.20240924080818-9cfce0926498 h1:BBBAzI/Mg6FVvxi/YHbUbFM8teVGX+MxsHdOvbXd8zg=
github.com/multiversx/mx-chain-core-go v1.2.23-0.20240924080818-9cfce0926498/go.mod h1:B5zU4MFyJezmEzCsAHE9YNULmGCm2zbPHvl9hazNxmE=
github.com/multiversx/mx-chain-crypto-go v1.2.12 h1:zWip7rpUS4CGthJxfKn5MZfMfYPjVjIiCID6uX5BSOk=
github.com/multiversx/mx-chain-crypto-go v1.2.12/go.mod h1:HzcPpCm1zanNct/6h2rIh+MFrlXbjA5C8+uMyXj3LI4=
github.com/multiversx/mx-chain-es-indexer-go v1.7.4 h1:SjJk9G9SN8baz0sFIU2jymYCfx3XiikGEB2wW0jwvfw=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,50 @@ func createTopicAndAssignHandlerOnMessenger(
return messenger.RegisterMessageProcessor(topic, common.DefaultInterceptorsIdentifier, interceptor)
}

func (bicf *baseInterceptorsContainerFactory) generateReceiptNodeInterceptor() error {
shardC := bicf.shardCoordinator

keys := make([]string, 0)
interceptorsSlice := make([]process.Interceptor, 0)

receiptNodesInterceptorProcessor, err := processor.NewReceiptsNodesInterceptorProcessor(bicf.dataPool.Receipts())
if err != nil {
return err
}

interceptedTrieNodeFactory, err := interceptorFactory.NewInterceptedTrieNodeDataFactory(bicf.argInterceptorFactory)
if err != nil {
return err
}

internalMarshaller := bicf.argInterceptorFactory.CoreComponents.InternalMarshalizer()
topic := factory.ReceiptTopic + shardC.CommunicationIdentifier(core.MetachainShardId)
multiDataInterceptor, err := interceptors.NewMultiDataInterceptor(interceptors.ArgMultiDataInterceptor{
Topic: topic,
Marshalizer: internalMarshaller,
DataFactory: interceptedTrieNodeFactory,
Processor: receiptNodesInterceptorProcessor,
Throttler: bicf.globalThrottler,
AntifloodHandler: bicf.antifloodHandler,
WhiteListRequest: bicf.whiteListHandler,
CurrentPeerId: bicf.mainMessenger.ID(),
PreferredPeersHolder: bicf.preferredPeersHolder,
})
if err != nil {
return err
}

interceptor, err := bicf.createTopicAndAssignHandler(topic, multiDataInterceptor, true)
if err != nil {
return err
}

keys = append(keys, topic)
interceptorsSlice = append(interceptorsSlice, interceptor)

return bicf.addInterceptorsToContainers(keys, interceptorsSlice)
}

// ------- Tx interceptors

func (bicf *baseInterceptorsContainerFactory) generateTxInterceptors() error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,11 @@ func (micf *metaInterceptorsContainerFactory) Create() (process.InterceptorsCont
return nil, nil, err
}

err = micf.generateReceiptNodeInterceptor()
if err != nil {
return nil, nil, err
}

return micf.mainContainer, micf.fullArchiveContainer, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ func createMetaDataPools() dataRetriever.PoolsHolder {
RewardTransactionsCalled: func() dataRetriever.ShardedDataCacherNotifier {
return testscommon.NewShardedDataStub()
},
ReceiptsCalled: func() storage.Cacher {
return testscommon.NewCacherStub()
},
}

return pools
Expand Down Expand Up @@ -605,10 +608,11 @@ func TestMetaInterceptorsContainerFactory_With4ShardsShouldWork(t *testing.T) {
numInterceptorsHeartbeatForMetachain := 1
numInterceptorsShardValidatorInfoForMetachain := 1
numInterceptorValidatorInfo := 1
numInterceptorReceiptsData := 1
totalInterceptors := numInterceptorsMetablock + numInterceptorsShardHeadersForMetachain + numInterceptorsTrieNodes +
numInterceptorsTransactionsForMetachain + numInterceptorsUnsignedTxsForMetachain + numInterceptorsMiniBlocksForMetachain +
numInterceptorsRewardsTxsForMetachain + numInterceptorsPeerAuthForMetachain + numInterceptorsHeartbeatForMetachain +
numInterceptorsShardValidatorInfoForMetachain + numInterceptorValidatorInfo
numInterceptorsShardValidatorInfoForMetachain + numInterceptorValidatorInfo + numInterceptorReceiptsData

assert.Nil(t, err)
assert.Equal(t, totalInterceptors, mainContainer.Len())
Expand Down Expand Up @@ -655,10 +659,11 @@ func TestMetaInterceptorsContainerFactory_With4ShardsShouldWork(t *testing.T) {
numInterceptorsHeartbeatForMetachain := 1
numInterceptorsShardValidatorInfoForMetachain := 1
numInterceptorValidatorInfo := 1
numInterceptorReceiptsData := 1
totalInterceptors := numInterceptorsMetablock + numInterceptorsShardHeadersForMetachain + numInterceptorsTrieNodes +
numInterceptorsTransactionsForMetachain + numInterceptorsUnsignedTxsForMetachain + numInterceptorsMiniBlocksForMetachain +
numInterceptorsRewardsTxsForMetachain + numInterceptorsPeerAuthForMetachain + numInterceptorsHeartbeatForMetachain +
numInterceptorsShardValidatorInfoForMetachain + numInterceptorValidatorInfo
numInterceptorsShardValidatorInfoForMetachain + numInterceptorValidatorInfo + numInterceptorReceiptsData

assert.Nil(t, err)
assert.Equal(t, totalInterceptors, mainContainer.Len())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,11 @@ func (sicf *shardInterceptorsContainerFactory) Create() (process.InterceptorsCon
return nil, nil, err
}

err = sicf.generateReceiptNodeInterceptor()
if err != nil {
return nil, nil, err
}

return sicf.mainContainer, sicf.fullArchiveContainer, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ func createShardDataPools() dataRetriever.PoolsHolder {
pools.CurrBlockTxsCalled = func() dataRetriever.TransactionCacher {
return &mock.TxForCurrentBlockStub{}
}
pools.ReceiptsCalled = func() storage.Cacher {
return testscommon.NewCacherStub()
}
return pools
}

Expand Down Expand Up @@ -610,9 +613,11 @@ func TestShardInterceptorsContainerFactory_With4ShardsShouldWork(t *testing.T) {
numInterceptorHeartbeat := 1
numInterceptorsShardValidatorInfo := 1
numInterceptorValidatorInfo := 1
numInterceptorReceiptsData := 1
totalInterceptors := numInterceptorTxs + numInterceptorsUnsignedTxs + numInterceptorsRewardTxs +
numInterceptorHeaders + numInterceptorMiniBlocks + numInterceptorMetachainHeaders + numInterceptorTrieNodes +
numInterceptorPeerAuth + numInterceptorHeartbeat + numInterceptorsShardValidatorInfo + numInterceptorValidatorInfo
numInterceptorPeerAuth + numInterceptorHeartbeat + numInterceptorsShardValidatorInfo + numInterceptorValidatorInfo +
numInterceptorReceiptsData

assert.Nil(t, err)
assert.Equal(t, totalInterceptors, mainContainer.Len())
Expand Down Expand Up @@ -658,9 +663,11 @@ func TestShardInterceptorsContainerFactory_With4ShardsShouldWork(t *testing.T) {
numInterceptorHeartbeat := 1
numInterceptorsShardValidatorInfo := 1
numInterceptorValidatorInfo := 1
numInterceptorReceiptsData := 1
totalInterceptors := numInterceptorTxs + numInterceptorsUnsignedTxs + numInterceptorsRewardTxs +
numInterceptorHeaders + numInterceptorMiniBlocks + numInterceptorMetachainHeaders + numInterceptorTrieNodes +
numInterceptorPeerAuth + numInterceptorHeartbeat + numInterceptorsShardValidatorInfo + numInterceptorValidatorInfo
numInterceptorPeerAuth + numInterceptorHeartbeat + numInterceptorsShardValidatorInfo + numInterceptorValidatorInfo +
numInterceptorReceiptsData

assert.Nil(t, err)
assert.Equal(t, totalInterceptors, mainContainer.Len())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package processor

import (
"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-core-go/core/check"
"github.com/multiversx/mx-chain-go/process"
"github.com/multiversx/mx-chain-go/storage"
)

var _ process.InterceptorProcessor = (*receiptsNodeInterceptorProcessor)(nil)

// receiptsNodeInterceptorProcessor is the processor used when intercepting receipts nodes
type receiptsNodeInterceptorProcessor struct {
interceptedNodes storage.Cacher
}

// NewReceiptsNodesInterceptorProcessor creates a new instance of ReceiptsNodeInterceptorProcessor
func NewReceiptsNodesInterceptorProcessor(interceptedNodes storage.Cacher) (*receiptsNodeInterceptorProcessor, error) {
if check.IfNil(interceptedNodes) {
return nil, process.ErrNilCacher
}

return &receiptsNodeInterceptorProcessor{
interceptedNodes: interceptedNodes,
}, nil
}

// Validate checks if the intercepted data can be processed
func (p *receiptsNodeInterceptorProcessor) Validate(_ process.InterceptedData, _ core.PeerID) error {
return nil
}

// Save saves the intercepted trie node in the intercepted nodes cacher
func (p *receiptsNodeInterceptorProcessor) Save(data process.InterceptedData, _ core.PeerID, _ string) error {
nodeData, ok := data.(interceptedDataSizeHandler)
if !ok {
return process.ErrWrongTypeAssertion
}

p.interceptedNodes.Put(data.Hash(), nodeData, nodeData.SizeInBytes()+len(data.Hash()))
return nil
}

// RegisterHandler registers a callback function to be notified of incoming trie nodes
func (p *receiptsNodeInterceptorProcessor) RegisterHandler(_ func(topic string, hash []byte, data interface{})) {
log.Error("ReceiptsNodeInterceptorProcessor.RegisterHandler", "error", "not implemented")
}

// IsInterfaceNil returns true if there is no value under the interface
func (p *receiptsNodeInterceptorProcessor) IsInterfaceNil() bool {
return p == nil
}
1 change: 1 addition & 0 deletions process/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,7 @@ type RequestHandler interface {
RequestTrieNode(requestHash []byte, topic string, chunkIndex uint32)
CreateTrieNodeIdentifier(requestHash []byte, chunkIndex uint32) []byte
RequestPeerAuthenticationsByHashes(destShardID uint32, hashes [][]byte)
RequestReceiptsTrieNodes(receiptsHashes [][]byte)
RequestValidatorInfo(hash []byte)
RequestValidatorsInfo(hashes [][]byte)
IsInterfaceNil() bool
Expand Down
3 changes: 3 additions & 0 deletions process/receiptslog/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,6 @@ import "errors"

// ErrNilTrieInteractor signals that a nil trie interactor has been provided
var ErrNilTrieInteractor = errors.New("trie interactor is nil")

// ErrNilReceiptsDataSyncer signals that a nil receipt data syncer has been provided
var ErrNilReceiptsDataSyncer = errors.New("receipts data syncer is nil")
18 changes: 17 additions & 1 deletion process/receiptslog/interface.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
package receiptslog

import "github.com/multiversx/mx-chain-core-go/data/state"
import (
"context"
"github.com/multiversx/mx-chain-core-go/data/state"
"github.com/multiversx/mx-chain-go/common"
"github.com/multiversx/mx-chain-go/storage"
)

// Interactor defines what a trie interactor should be able to do
type Interactor interface {
CreateNewTrie() error
AddReceiptData(receiptData state.Receipt) error
Save() ([]byte, error)
GetSerializedNode(nodeHash []byte) ([]byte, error)
RecreateTrieFromDB(rootHash []byte, db storage.Persister) (common.Trie, error)
SaveNewTrie(localTrie common.Trie) error
IsInterfaceNil() bool
}

Expand All @@ -16,3 +24,11 @@ type ReceiptsManagerHandler interface {
SyncReceiptsTrie(receiptsRootHash []byte) error
IsInterfaceNil() bool
}

// ReceiptsDataSyncer defines what a receipts data syncer should be able to do
type ReceiptsDataSyncer interface {
SyncReceiptsDataFor(hashes [][]byte, ctx context.Context) error
GetReceiptsData() (map[string][]byte, error)
ClearFields()
IsInterfaceNil() bool
}
Loading
Loading