Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message validation optimization #6462

Open
wants to merge 34 commits into
base: feat/equivalent-messages
Choose a base branch
from

Conversation

cristure
Copy link
Contributor

Reasoning behind the pull request

  • Sometimes a message could be processed multiple times with no real benefit. Thus we need to introduce a mechanism that will check if the message has been previously processed in a set time span and exit early on if that is the case.

Proposed changes

  • Add a map of timeCachers. Before initializing every interceptor we add the timeCacher to the map. When they will try to process the message they will retrieve their cacher from the global map and check if the message has been processed before.

Testing procedure

  • TBD

Pre-requisites

Based on the Contributing Guidelines the PR author and the reviewers must check the following requirements are met:

  • was the PR targeted to the correct branch?
  • if this is a larger feature that probably needs more than one PR, is there a feat branch created?
  • if this is a feat branch merging, do all satellite projects have a proper tag inside go.mod?

WhiteListRequest: args.WhitelistHandler,
CurrentPeerId: args.Messenger.ID(),
PreferredPeersHolder: disabled.NewPreferredPeersHolder(),
ProcessedMessagesCacheMap: make(map[string]storage.Cacher),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should not be allocated here, but received through the arguments.

@@ -1501,6 +1501,8 @@ func (pcf *processComponentsFactory) newInterceptorContainerFactory(
nodeOperationMode = common.FullArchiveMode
}

processedMessagesCacheMap := make(map[string]storage.Cacher)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should not be initialized here, but received in the arguments so we can also mock it in tests.

listInterceptedData[index] = interceptedData
if err != nil {
mdi.throttler.EndProcessing()
return err
}

errCache := mdi.checkIfMessageHasBeenProcessed(interceptedData)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to check also what happens in the following scenario:

  • receive an invalid message
  • receive again the same invalid message from another source (before the cache expires)

What we want: - do not accept/propagate the message if it was initially invalidated (need to check if the validation returns true or false in this case, and we need to make sure it returns false)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could solve this if we save also the verification status inside the cache.
In this case we might still need the mutex per message key, so that we do not process it twice while not having the status inside the cache.

@@ -108,6 +110,7 @@ func NewEpochStartInterceptorsContainer(args ArgsEpochStartInterceptorContainer)
FullArchivePeerShardMapper: fullArchivePeerShardMapper,
HardforkTrigger: hardforkTrigger,
NodeOperationMode: args.NodeOperationMode,
ProcessedMessagesCacheMap: make(map[string]storage.Cacher),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to check all intializations of this map, as we might end up with multiple versions of this.

@@ -152,12 +157,19 @@ func (mdi *MultiDataInterceptor) ProcessReceivedMessage(message p2p.MessageP2P,
for index, dataBuff := range multiDataBuff {
var interceptedData process.InterceptedData
interceptedData, err = mdi.interceptedData(dataBuff, message.Peer(), fromConnectedPeer)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like at line 159 inside mdi.interceptedData happens all the intensive processing -> signature verifications, etc.
so checking the cache at line 167 does not save much in this case.
I would suggest moving the cache check inside mdi.InterceptedData, before interceptedData.CheckValidity
Check the same for singleDataInterceptors.

return ErrInvalidInterceptedData
}

err := interceptedData.CheckValidity()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in case the checkValidity lasts a long time, there can be multiple verifications being done in parallel
these can be avoided if we use a per-resource (hash) mutex.while checking the validity & writing in the cache.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hope I fixed it in the next push

@@ -152,7 +159,10 @@ func (mdi *MultiDataInterceptor) ProcessReceivedMessage(message p2p.MessageP2P,
for index, dataBuff := range multiDataBuff {
var interceptedData process.InterceptedData
interceptedData, err = mdi.interceptedData(dataBuff, message.Peer(), fromConnectedPeer)
listInterceptedData[index] = interceptedData

if !errors.Is(err, ErrInvalidInterceptedData) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need for this error handling here, as if the error is not nil then at line 167 it will anyway exit and not do anything with the listInterceptedData.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alright, removed the check.

// It will retrieve the status in the cache if it exists, otherwise it will validate it and store the status of the
// validation in the cache. Note that the entries are stored for a set period of time
func (idv *interceptedDataVerifier) Verify(interceptedData process.InterceptedData) error {
if len(interceptedData.Hash()) == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this early exit here for len(hash == 0) is bypassing the CheckValidity check which was previously called even if there was no hash computed.

can you return instead interceptedData.CheckValidity() ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -98,7 +99,7 @@ func TestNewInterceptedMetaHeader_ShouldWork(t *testing.T) {
assert.Nil(t, err)
}

//------- CheckValidity
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe wrong replace?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reverted

@@ -99,7 +100,7 @@ func (imh *InterceptedMetaHeader) CheckValidity() error {
}

if imh.isMetaHeaderEpochOutOfRange() {
log.Trace("InterceptedMetaHeader.CheckValidity",
log.Trace("InterceptedMetaHeader.Verify",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrong replace?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reverted

@@ -173,7 +174,7 @@ func TestNewInterceptedHeader_MetachainForThisShardShouldWork(t *testing.T) {
assert.True(t, inHdr.IsForCurrentShard())
}

//------- CheckValidity
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrong replace?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reverted

@@ -29,6 +31,8 @@ const (
minTimespanDurationInSec = int64(1)
errorOnMainNetworkString = "on main network"
errorOnFullArchiveNetworkString = "on full archive network"
cacheDefaultSpan = 30 * time.Second
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo: maybe move the two constants to config

@@ -1401,3 +1401,8 @@ type SentSignaturesTracker interface {
ResetCountersForManagedBlockSigner(signerPk []byte)
IsInterfaceNil() bool
}

type InterceptedDataVerifier interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add comment on exported type

@@ -612,7 +612,7 @@ func TestNewInterceptedTransaction_ShouldWork(t *testing.T) {
assert.Equal(t, tx, txi.Transaction())
}

// ------- CheckValidity
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrong replace?

@@ -170,7 +171,7 @@ func TestNewInterceptedUnsignedTransaction_ShouldWork(t *testing.T) {
assert.Nil(t, err)
}

// ------- CheckValidity
// ------- Verify
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrong replace?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants