Skip to content

Commit

Permalink
feat: PRT Block Hash Retry Archive - Part 1, redesign UsedProviders. (#…
Browse files Browse the repository at this point in the history
…1726)

* feat: PRT Block Hash Retry Archive

* fix deref

* fix lint

* remove extensions from all flows and save RouterKey in singleConsumerSession

* version merge

* rename function for better description on functionality

* give a bigger window for the test

---------

Co-authored-by: omerlavanet <omer@lavanet.xyz>
  • Loading branch information
ranlavanet and omerlavanet authored Nov 3, 2024
1 parent cc682df commit 213d0c6
Show file tree
Hide file tree
Showing 17 changed files with 216 additions and 118 deletions.
4 changes: 4 additions & 0 deletions protocol/chainlib/chain_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ type baseChainMessageContainer struct {
resultErrorParsingMethod func(data []byte, httpStatusCode int) (hasError bool, errorMessage string)
}

func (bcnc *baseChainMessageContainer) GetRequestedBlocksHashes() []string {
return bcnc.requestedBlockHashes
}

func (bcnc *baseChainMessageContainer) SubscriptionIdExtractor(reply *rpcclient.JsonrpcMessage) string {
return bcnc.msg.SubscriptionIdExtractor(reply)
}
Expand Down
1 change: 1 addition & 0 deletions protocol/chainlib/chainlib.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ type ChainMessage interface {
SetForceCacheRefresh(force bool) bool
CheckResponseError(data []byte, httpStatusCode int) (hasError bool, errorMessage string)
GetRawRequestHash() ([]byte, error)
GetRequestedBlocksHashes() []string

ChainMessageForSend
}
Expand Down
8 changes: 8 additions & 0 deletions protocol/chainlib/chainlib_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions protocol/lavasession/consumer_session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,8 @@ func (csm *ConsumerSessionManager) GetSessions(ctx context.Context, cuNeededForS
return nil, utils.LavaFormatError("failed getting sessions from used Providers", nil, utils.LogAttr("usedProviders", usedProviders), utils.LogAttr("endpoint", csm.rpcEndpoint))
}
defer func() { usedProviders.AddUsed(consumerSessionMap, errRet) }()
initUnwantedProviders := usedProviders.GetUnwantedProvidersToSend()
routerKey := NewRouterKeyFromExtensions(extensions)
initUnwantedProviders := usedProviders.GetUnwantedProvidersToSend(routerKey)

extensionNames := common.GetExtensionNames(extensions)
// if pairing list is empty we reset the state.
Expand Down Expand Up @@ -567,7 +568,7 @@ func (csm *ConsumerSessionManager) GetSessions(ctx context.Context, cuNeededForS
// we don't want to update the reputation by it, so we null the rawQosReport
rawQosReport = nil
}
consumerSession.SetUsageForSession(cuNeededForSession, qosReport, rawQosReport, usedProviders)
consumerSession.SetUsageForSession(cuNeededForSession, qosReport, rawQosReport, usedProviders, routerKey)
// We successfully added provider, we should ignore it if we need to fetch new
tempIgnoredProviders.providers[providerAddress] = struct{}{}
if len(sessions) == wantedSession {
Expand Down Expand Up @@ -687,6 +688,7 @@ func (csm *ConsumerSessionManager) tryGetConsumerSessionWithProviderFromBlockedP
// if we got here we validated the epoch is still the same epoch as we expected and we need to fetch a session from the blocked provider list.
defer csm.lock.RUnlock()

routerKey := NewRouterKey(extensions)
// csm.currentlyBlockedProviderAddresses is sorted by the provider with the highest cu used this epoch to the lowest
// meaning if we fetch the first successful index this is probably the highest success ratio to get a response.
for _, providerAddress := range csm.currentlyBlockedProviderAddresses {
Expand All @@ -697,7 +699,7 @@ func (csm *ConsumerSessionManager) tryGetConsumerSessionWithProviderFromBlockedP
consumerSessionsWithProvider := csm.pairing[providerAddress]
// Add to ignored (no matter what)
ignoredProviders.providers[providerAddress] = struct{}{}
usedProviders.AddUnwantedAddresses(providerAddress) // add the address to our unwanted providers to avoid infinite recursion
usedProviders.AddUnwantedAddresses(providerAddress, routerKey) // add the address to our unwanted providers to avoid infinite recursion

// validate this provider has enough cu to be used
if err := consumerSessionsWithProvider.validateComputeUnits(cuNeededForSession, virtualEpoch); err != nil {
Expand Down Expand Up @@ -1019,6 +1021,7 @@ func (csm *ConsumerSessionManager) OnSessionDone(
numOfProviders int,
providersCount uint64,
isHangingApi bool,
extensions []*spectypes.Extension,
) error {
// release locks, update CU, relaynum etc..
if err := consumerSession.VerifyLock(); err != nil {
Expand Down
24 changes: 12 additions & 12 deletions protocol/lavasession/consumer_session_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestHappyFlow(t *testing.T) {
require.NotNil(t, cs)
require.Equal(t, cs.Epoch, csm.currentEpoch)
require.Equal(t, cs.Session.LatestRelayCu, cuForFirstRequest)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false, nil)
require.NoError(t, err)
require.Equal(t, cs.Session.CuSum, cuForFirstRequest)
require.Equal(t, cs.Session.LatestRelayCu, latestRelayCuAfterDone)
Expand Down Expand Up @@ -416,7 +416,7 @@ func runOnSessionDoneForConsumerSessionMap(t *testing.T, css ConsumerSessionsMap
require.NotNil(t, cs)
require.Equal(t, cs.Epoch, csm.currentEpoch)
require.Equal(t, cs.Session.LatestRelayCu, cuForFirstRequest)
err := csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false)
err := csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false, nil)
require.NoError(t, err)
require.Equal(t, cs.Session.CuSum, cuForFirstRequest)
require.Equal(t, cs.Session.LatestRelayCu, latestRelayCuAfterDone)
Expand Down Expand Up @@ -448,7 +448,7 @@ func TestHappyFlowVirtualEpoch(t *testing.T) {
require.NotNil(t, cs)
require.Equal(t, cs.Epoch, csm.currentEpoch)
require.Equal(t, cs.Session.LatestRelayCu, maxCuForVirtualEpoch*(virtualEpoch+1))
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, maxCuForVirtualEpoch*(virtualEpoch+1), time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, maxCuForVirtualEpoch*(virtualEpoch+1), time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false, nil)
require.NoError(t, err)
require.Equal(t, cs.Session.CuSum, maxCuForVirtualEpoch*(virtualEpoch+1))
require.Equal(t, cs.Session.LatestRelayCu, latestRelayCuAfterDone)
Expand Down Expand Up @@ -484,7 +484,7 @@ func TestPairingReset(t *testing.T) {
require.NotNil(t, cs)
require.Equal(t, cs.Epoch, csm.currentEpoch)
require.Equal(t, cs.Session.LatestRelayCu, cuForFirstRequest)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false, nil)
require.NoError(t, err)
require.Equal(t, cs.Session.CuSum, cuForFirstRequest)
require.Equal(t, cs.Session.LatestRelayCu, latestRelayCuAfterDone)
Expand Down Expand Up @@ -573,7 +573,7 @@ func TestPairingResetWithMultipleFailures(t *testing.T) {
require.NotNil(t, cs)
require.Equal(t, cs.Epoch, csm.currentEpoch)
require.Equal(t, cs.Session.LatestRelayCu, cuForFirstRequest)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false, nil)
require.NoError(t, err)
require.Equal(t, cs.Session.CuSum, cuForFirstRequest)
require.Equal(t, cs.Session.LatestRelayCu, latestRelayCuAfterDone)
Expand Down Expand Up @@ -619,7 +619,7 @@ func TestSuccessAndFailureOfSessionWithUpdatePairingsInTheMiddle(t *testing.T) {
require.Equal(t, epoch, csm.currentEpoch)

if rand.Intn(2) > 0 {
err = csm.OnSessionDone(cs, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false)
err = csm.OnSessionDone(cs, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false, nil)
require.NoError(t, err)
require.Equal(t, cs.CuSum, cuForFirstRequest)
require.Equal(t, cs.LatestRelayCu, latestRelayCuAfterDone)
Expand Down Expand Up @@ -653,7 +653,7 @@ func TestSuccessAndFailureOfSessionWithUpdatePairingsInTheMiddle(t *testing.T) {
for j := numberOfAllowedSessionsPerConsumer / 2; j < numberOfAllowedSessionsPerConsumer; j++ {
cs := sessionList[j].cs
if rand.Intn(2) > 0 {
err = csm.OnSessionDone(cs, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false)
err = csm.OnSessionDone(cs, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false, nil)
require.NoError(t, err)
require.Equal(t, sessionListData[j].cuSum+cuForFirstRequest, cs.CuSum)
require.Equal(t, cs.LatestRelayCu, latestRelayCuAfterDone)
Expand All @@ -676,7 +676,7 @@ func successfulSession(ctx context.Context, csm *ConsumerSessionManager, t *test
for _, cs := range css {
require.NotNil(t, cs)
time.Sleep(time.Duration((rand.Intn(500) + 1)) * time.Millisecond)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false, nil)
require.NoError(t, err)
ch <- p
}
Expand Down Expand Up @@ -957,7 +957,7 @@ func TestPairingWithAddons(t *testing.T) {
css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, addon, nil, common.NO_STATE, 0) // get a session
require.NoError(t, err)
for _, cs := range css {
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false, nil)
require.NoError(t, err)
}
})
Expand Down Expand Up @@ -1032,7 +1032,7 @@ func TestPairingWithExtensions(t *testing.T) {
css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, extensionOpt.addon, extensionsList, common.NO_STATE, 0) // get a session
require.NoError(t, err)
for _, cs := range css {
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false, nil)
require.NoError(t, err)
}
})
Expand Down Expand Up @@ -1068,11 +1068,11 @@ func TestPairingWithStateful(t *testing.T) {
require.NoError(t, err)
require.Equal(t, allProviders, len(css))
for _, cs := range css {
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), numberOfProviders, numberOfProviders, false, nil)
require.NoError(t, err)
}
usedProviders := NewUsedProviders(nil)
usedProviders.RemoveUsed(providerAddresses[0], nil)
usedProviders.RemoveUsed(providerAddresses[0], NewRouterKey(nil), nil)
css, err = csm.GetSessions(ctx, cuForFirstRequest, usedProviders, servicedBlockNumber, addon, nil, common.CONSISTENCY_SELECT_ALL_PROVIDERS, 0) // get a session
require.NoError(t, err)
require.Equal(t, allProviders-1, len(css))
Expand Down
7 changes: 4 additions & 3 deletions protocol/lavasession/consumer_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ var (
)

type UsedProvidersInf interface {
RemoveUsed(providerAddress string, err error)
RemoveUsed(providerAddress string, routerKey RouterKey, err error)
TryLockSelection(context.Context) error
AddUsed(ConsumerSessionsMap, error)
GetUnwantedProvidersToSend() map[string]struct{}
AddUnwantedAddresses(address string)
GetUnwantedProvidersToSend(RouterKey) map[string]struct{}
AddUnwantedAddresses(address string, routerKey RouterKey)
CurrentlyUsed() int
}

Expand Down Expand Up @@ -439,6 +439,7 @@ func (cswp *ConsumerSessionsWithProvider) GetConsumerSessionInstanceFromEndpoint
Parent: cswp,
EndpointConnection: endpointConnection,
StaticProvider: cswp.StaticProvider,
routerKey: NewRouterKey(nil),
}

consumerSession.TryUseSession() // we must lock the session so other requests wont get it.
Expand Down
4 changes: 2 additions & 2 deletions protocol/lavasession/end_to_end_lavasession_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestHappyFlowE2EEmergency(t *testing.T) {
err = psm.OnSessionDone(sps, cs.Session.RelayNum-skippedRelays)
require.NoError(t, err)

err = csm.OnSessionDone(cs.Session, servicedBlockNumber, maxCuForVirtualEpoch, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), 1, 1, false)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, maxCuForVirtualEpoch, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), 1, 1, false, nil)
require.NoError(t, err)
}

Expand Down Expand Up @@ -195,7 +195,7 @@ func prepareSessionsWithFirstRelay(t *testing.T, cuForFirstRequest uint64) (*Con
require.NoError(t, err)

// Consumer Side:
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), 1, 1, false)
err = csm.OnSessionDone(cs.Session, servicedBlockNumber, cuForFirstRequest, time.Millisecond, cs.Session.CalculateExpectedLatency(2*time.Millisecond), (servicedBlockNumber - 1), 1, 1, false, nil)
require.NoError(t, err)
require.Equal(t, cs.Session.CuSum, cuForFirstRequest)
require.Equal(t, cs.Session.LatestRelayCu, latestRelayCuAfterDone)
Expand Down
25 changes: 20 additions & 5 deletions protocol/lavasession/router_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"sort"
"strconv"
"strings"

spectypes "github.com/lavanet/lava/v4/x/spec/types"
)

const (
Expand All @@ -18,18 +20,31 @@ func (rk *RouterKey) ApplyMethodsRoute(routeNum int) RouterKey {
return RouterKey(string(*rk) + methodRouteSep + additionalPath)
}

func newRouterKeyInner(uniqueExtensions map[string]struct{}) RouterKey {
uniqueExtensionsSlice := []string{}
for addon := range uniqueExtensions { // we are sorting this anyway so we don't have to keep order
uniqueExtensionsSlice = append(uniqueExtensionsSlice, addon)
}
sort.Strings(uniqueExtensionsSlice)
return RouterKey(sep + strings.Join(uniqueExtensionsSlice, sep) + sep)
}

func NewRouterKey(extensions []string) RouterKey {
// make sure addons have no repetitions
uniqueExtensions := map[string]struct{}{}
for _, extension := range extensions {
uniqueExtensions[extension] = struct{}{}
}
uniqueExtensionsSlice := []string{}
for addon := range uniqueExtensions { // we are sorting this anyway so we don't have to keep order
uniqueExtensionsSlice = append(uniqueExtensionsSlice, addon)
return newRouterKeyInner(uniqueExtensions)
}

func NewRouterKeyFromExtensions(extensions []*spectypes.Extension) RouterKey {
// make sure addons have no repetitions
uniqueExtensions := map[string]struct{}{}
for _, extension := range extensions {
uniqueExtensions[extension.Name] = struct{}{}
}
sort.Strings(uniqueExtensionsSlice)
return RouterKey(sep + strings.Join(uniqueExtensionsSlice, sep) + sep)
return newRouterKeyInner(uniqueExtensions)
}

func GetEmptyRouterKey() RouterKey {
Expand Down
17 changes: 10 additions & 7 deletions protocol/lavasession/single_consumer_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ type SingleConsumerSession struct {
BlockListed bool // if session lost sync we blacklist it.
ConsecutiveErrors []error
errorsCount uint64
relayProcessor UsedProvidersInf
usedProviders UsedProvidersInf
providerUniqueId string
StaticProvider bool
routerKey RouterKey
}

// returns the expected latency to a threshold.
Expand Down Expand Up @@ -103,23 +104,25 @@ func (cs *SingleConsumerSession) CalculateQoS(latency, expectedLatency time.Dura
}
}

func (scs *SingleConsumerSession) SetUsageForSession(cuNeededForSession uint64, qoSExcellenceReport *pairingtypes.QualityOfServiceReport, rawQoSExcellenceReport *pairingtypes.QualityOfServiceReport, usedProviders UsedProvidersInf) error {
func (scs *SingleConsumerSession) SetUsageForSession(cuNeededForSession uint64, qoSExcellenceReport *pairingtypes.QualityOfServiceReport, rawQoSExcellenceReport *pairingtypes.QualityOfServiceReport, usedProviders UsedProvidersInf, routerKey RouterKey) error {
scs.LatestRelayCu = cuNeededForSession // set latestRelayCu
scs.RelayNum += RelayNumberIncrement // increase relayNum
if scs.RelayNum > 1 {
// we only set excellence for sessions with more than one successful relays, this guarantees data within the epoch exists
scs.QoSInfo.LastExcellenceQoSReport = qoSExcellenceReport
scs.QoSInfo.LastExcellenceQoSReportRaw = rawQoSExcellenceReport
}
scs.relayProcessor = usedProviders
scs.usedProviders = usedProviders
scs.routerKey = routerKey
return nil
}

func (scs *SingleConsumerSession) Free(err error) {
if scs.relayProcessor != nil {
scs.relayProcessor.RemoveUsed(scs.Parent.PublicLavaAddress, err)
scs.relayProcessor = nil
if scs.usedProviders != nil {
scs.usedProviders.RemoveUsed(scs.Parent.PublicLavaAddress, scs.routerKey, err)
scs.usedProviders = nil
}
scs.routerKey = NewRouterKey(nil)
scs.EndpointConnection.decreaseSessionUsingConnection()
scs.lock.Unlock()
}
Expand All @@ -130,7 +133,7 @@ func (session *SingleConsumerSession) TryUseSession() (blocked bool, ok bool) {
session.lock.Unlock()
return true, false
}
if session.relayProcessor != nil {
if session.usedProviders != nil {
utils.LavaFormatError("session misuse detected, usedProviders isn't nil, missing Free call, blocking", nil, utils.LogAttr("session", session.SessionId))
session.BlockListed = true
session.lock.Unlock()
Expand Down
Loading

0 comments on commit 213d0c6

Please sign in to comment.