-
Notifications
You must be signed in to change notification settings - Fork 179
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Dynamic Protocol State] Protocol state used in FollowerState
and ParticipantState
#4613
[Dynamic Protocol State] Protocol state used in FollowerState
and ParticipantState
#4613
Conversation
…m/onflow/flow-go into yurii/5514-extend-updates-protocol-state
…m/onflow/flow-go into yurii/5514-extend-updates-protocol-state
…ents. Updated FollowerState to process service events using updater
…m/onflow/flow-go into yurii/5514-extend-updates-protocol-state
…m/onflow/flow-go into yurii/5514-extend-updates-protocol-state
… implementors of Snapshot to comply with new interface
…l params are read
…m/onflow/flow-go into yurii/5514-extend-updates-protocol-state
…m/onflow/flow-go into yurii/5514-extend-updates-protocol-state
…m/onflow/flow-go into yurii/5514-extend-updates-protocol-state
…m/onflow/flow-go into yurii/5514-extend-updates-protocol-state
…m/onflow/flow-go into yurii/5514-extend-updates-protocol-state
…m/onflow/flow-go into yurii/5514-extend-updates-protocol-state
…nd-updates-protocol-state
…nd-updates-protocol-state
state/protocol/badger/mutator.go
Outdated
if hasChanges { | ||
err = m.protocolState.StoreTx(updatedStateID, updatedState)(tx) | ||
// in case of fork, the protocol state may already exist | ||
if err != nil && !errors.Is(err, storage.ErrAlreadyExists) { | ||
return fmt.Errorf("could not store protocol state (%v): %w", updatedStateID, err) | ||
} | ||
} | ||
|
||
err = m.protocolState.Index(blockID, updatedStateID)(tx) | ||
if err != nil { | ||
return fmt.Errorf("could not index protocol state (%v) for block (%v): %w", | ||
updatedStateID, blockID, err) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my opinion, the bulk of this code should go before line
flow-go/state/protocol/badger/mutator.go
Line 539 in 79b2ebb
err = operation.RetryOnConflictTx(m.db, transaction.Update, func(tx *transaction.Tx) error { |
Reasoning:
- Both
ProtocolState.StoreTx
as well asProtocolState.Index
return anonymous functions for later execution as part of a badger transaction. In particular, they might front-load a significant amount of computation and sanity checks as pre-processing outside of their transaction fragment.
In my mind, the code lines 526-607 could look something like this:
updatedState, updatedStateID, hasChanges := protocolStateUpdater.Build()
// TODO: check if updatedStateID corresponds to the root protocol state ID stored in payload
// if updatedStateID != payload.ProtocolStateID {
// return state.NewInvalidExtension("invalid protocol state transition detected expected (%x) got %x", payload.ProtocolStateID, updatedStateID)
// }
if hasChanges {
dbUpdates = append(dbUpdates, m.protocolState.StoreTx(updatedStateID, updatedState))
}
dbUpdates = append(dbUpdates, m.protocolState.Index(blockID, updatedStateID))
// events is a queue of node-internal events (aka notifications) that are emitted after the database write succeeded
var events []func()
if certifyingQC != nil {
dbUpdates = append(dbUpdates, m.qcs.StoreTx(certifyingQC))
// queue an BlockProcessable event for candidate block, since it is certified
events = append(events, func() {
m.consumer.BlockProcessable(candidate.Header, certifyingQC)
})
}
// Both the header itself and its payload are in compliance with the protocol state.
// We can now store the candidate block, as well as adding its final seal
// to the seal index and initializing its children index.
qc := candidate.Header.QuorumCertificate()
err = operation.RetryOnConflictTx(m.db, transaction.Update, func(tx *transaction.Tx) error {
// insert the block into the database AND cache
err := m.blocks.StoreTx(candidate)(tx)
if err != nil {
return fmt.Errorf("could not store candidate block: %w", err)
}
err = m.qcs.StoreTx(qc)(tx)
if err != nil {
if !errors.Is(err, storage.ErrAlreadyExists) {
return fmt.Errorf("could not store incorporated qc: %w", err)
}
} else {
// trigger BlockProcessable for parent blocks above root height
if parent.Height > m.finalizedRootHeight {
events = append(events, func() {
m.consumer.BlockProcessable(parent, qc)
})
}
}
// index the latest sealed block in this fork
err = transaction.WithTx(operation.IndexLatestSealAtBlock(blockID, latestSealID))(tx)
if err != nil {
return fmt.Errorf("could not index candidate seal: %w", err)
}
// index the child block for recovery
err = transaction.WithTx(procedure.IndexNewBlock(blockID, candidate.Header.ParentID))(tx)
if err != nil {
return fmt.Errorf("could not index new block: %w", err)
}
// apply any optional DB operations from service events
for _, apply := range dbUpdates {
err := apply(tx)
if err != nil {
return fmt.Errorf("could not apply operation: %w", err)
}
}
return nil
})
Note:
similar argument could apply to other operations in the body of operation.RetryOnConflictTx
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some further thoughts and comments ... still reviewing
-
I think we have a terminology conflict with the interface
ProtocolState
, specificallystorage.ProtocolState
is the low-level storage layer abstraction for persisting / querying Snapshots of the protocol state.protocol.ProtocolState
is the high-level API with lots of convenience functionality for retrieving fine-grained information about the protocol state at each block
As a result of these very similar naming, we end up with implementation patters such as
flow-go/state/protocol/badger/state.go
Lines 42 to 43 in 79b2ebb
protocolState storage.ProtocolState protocolStateReader protocol.ProtocolState Suggestions for resolution
- we generally name the storage layer abstractions by adding an "s" at the end. Therefore,
storage.ProtocolState
should becomestorage.ProtocolState*s*
. I think even more clear would bestorage.ProtocolStateSnapshots
state/protocol/badger/mutator.go
Outdated
if !epochFallbackTriggered { | ||
phase, err := epochStatus.Phase() | ||
if err != nil { | ||
return nil, fmt.Errorf("could not determine epoch phase: %w", err) | ||
} | ||
if phase == flow.EpochPhaseCommitted { | ||
if candidate.Header.View > parentProtocolState.CurrentEpochSetup.FinalView { | ||
err = updater.TransitionToNextEpoch() | ||
if err != nil { | ||
return nil, fmt.Errorf("could not transition protocol state to next epoch: %w", err) | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My gut feeling:
- It would be beneficial to avoid a change of the Protocol state at the epoch switchover.
- While this is technically permitted in our formal model, I don't think it is necessary
Suggestion:
- for this PR, I would leave the code as is and add a TODO
- This point here is already part of our issue [Dynamic Protocol State] Remaining work and ToDos #4649
…nflow/flow-go into yurii/5514-extend-updates-protocol-state
Codecov ReportPatch coverage:
Additional details and impacted files@@ Coverage Diff @@
## feature/dynamic-protocol-state #4613 +/- ##
================================================================
Coverage 54.92% 54.93%
================================================================
Files 924 924
Lines 86127 86253 +126
================================================================
+ Hits 47309 47384 +75
- Misses 35216 35252 +36
- Partials 3602 3617 +15
Flags with carried forward coverage won't be shown. Click here to find out more.
☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice. Just a few minor revisions might further increase clarity.
storage/badger/transaction/tx.go
Outdated
// WithBadgerTx adapts a function that takes a *Tx to one that takes a *badger.Txn. | ||
func WithBadgerTx(f func(*Tx) error) func(*dbbadger.Txn) error { | ||
return func(txn *dbbadger.Txn) error { | ||
return f(&Tx{DBTxn: txn}) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
⚠️ potentially inconsistent design
I am not really sure (its not properly documented 😭), but this is my understanding of the current Tx
struct implementation:
Tx wraps a badger transaction and includes and additional slice for callbacks.
The callbacks are executed after the badger transaction completed successfully.
DESIGN PATTERN
- DBTxn should never be nil
- at initialization,
callbacks
is empty- While business logic code operates on
DBTxn
, it can append additional callbacks
via theOnSucceed
method. This generally happens during the transaction execution.CAUTION:
- Tx is stateful (calls to
OnSucceed
change its internal state).
Therefore, Tx needs to be passed as pointer variable.- Do not instantiate Tx outside of this package. Instead, use
Update
orView
functions.- Whether a transaction is considered to have succeeded depends only on the return value
of the outermost function. For example, consider a chain of 3 functions: f3( f2( f1(x)))
Lets assume f1 fails with anstorage.ErrAlreadyExists
sentinel, which f2 expects and
therefore discards. f3 could then succeed, i.e. return nil.
Consequently, the entire list of callbacks is executed, including f1's callback if it
added one. Callback implementations therefore need to account for this edge case.
Lets unravel this a bit:
- given an instance of
Tx
, the API contract is that during the execution of the wrappedflow-go/storage/badger/transaction/tx.go
Line 10 in 0295799
DBTxn *dbbadger.Txn flow-go/storage/badger/transaction/tx.go
Line 11 in 0295799
callbacks []func() - function
f
is exactly such a case:-
we run the returned anonymous function
func(txn *dbbadger.Txn) error
at some point later -
during that, we instantiate a new instance of
Tx
-
during its execution. function
f
might add elements toTx.callbacks
-
when we reach line 59, we return
f
's error value:flow-go/storage/badger/transaction/tx.go
Line 59 in 0295799
return f(tx.DBTxn) But, any callbacks that
f
added to Tx are garbage collected now and never executed.
-
Suggestions:
- my previous comment should remove the need for the function
WithBadgerTx
- Add the following goDoc to
Tx
// Tx wraps a badger transaction and includes and additional slice for callbacks. // The callbacks are executed after the badger transaction completed _successfully_. // DESIGN PATTERN // - DBTxn should never be nil // - at initialization, `callbacks` is empty // - While business logic code operates on `DBTxn`, it can append additional callbacks // via the `OnSucceed` method. This generally happens during the transaction execution. // // CAUTION: // - Tx is stateful (calls to `OnSucceed` change its internal state). // Therefore, Tx needs to be passed as pointer variable. // - Do not instantiate Tx outside of this package. Instead, use `Update` or `View` // functions. // - Whether a transaction is considered to have succeeded depends only on the return value // of the outermost function. For example, consider a chain of 3 functions: f3( f2( f1(x))) // Lets assume f1 fails with an `storage.ErrAlreadyExists` sentinel, which f2 expects and // therefore discards. f3 could then succeed, i.e. return nil. // Consequently, the entire list of callbacks is executed, including f1's callback if it // added one. Callback implementations therefore need to account for this edge case. type Tx struct { DBTxn *dbbadger.Txn callbacks []func() }
- Update the goDoc of function
OnSucceed
as follows:// OnSucceed adds a callback to execute after the batch has been successfully flushed. // Useful for implementing the cache where we will only cache after the batch of database // operations has been successfully applied. // CAUTION: // Whether a transaction is considered to have succeeded depends only on the return value // of the outermost function. For example, consider a chain of 3 functions: f3( f2( f1(x))) // Lets assume f1 fails with an `storage.ErrAlreadyExists` sentinel, which f2 expects and // therefore discards. f3 could then succeed, i.e. return nil. // Consequently, the entire list of callbacks is executed, including f1's callback if it // added one. Callback implementations therefore need to account for this edge case. func (b *Tx) OnSucceed(callback func()) {
- Extend implementation to avoid the need to instantiate
Tx
instances outside of this package here. For example, I would suggest to include theView
function, whose implementation is shared with theUpdate
function// Update creates a badger transaction, passing it to a chain of functions. // Only if transaction succeeds, we run `callbacks` that were appended during the // transaction execution. The callbacks are useful update caches in order to reduce // cache misses. func Update(db *dbbadger.DB, f func(*Tx) error) error { dbTxn := db.NewTransaction(true) err := run(f, dbTxn) dbTxn.Discard() return err } // View creates a read-only badger transaction, passing it to a chain of functions. // Only if transaction succeeds, we run `callbacks` that were appended during the // transaction execution. The callbacks are useful update caches in order to reduce // cache misses. func View(db *dbbadger.DB, f func(*Tx) error) error { dbTxn := db.NewTransaction(false) err := run(f, dbTxn) dbTxn.Discard() return err } func run(f func(*Tx) error, dbTxn *dbbadger.Txn) error { tx := &Tx{DBTxn: dbTxn} err := f(tx) if err != nil { return err } err = dbTxn.Commit() if err != nil { return ioutils.TerminateOnFullDisk(err) } for _, callback := range tx.callbacks { callback() } return nil }
- remove
WithBadgerTx
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch, I've applied changes that you have requested
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For me, it would be most intuitive to explicitly mirror the interfaces protocol.InstanceParams
and
In other words: we could define a struct InstanceParams
that only implements the protocol.InstanceParams
interface. Something like the following:
type Params struct {
protocol.GlobalParams
protocol.InstanceParams
}
var _ protocol.InstanceParams = (*Params)(nil)
var _ protocol.GlobalParams = (*Params)(nil) // TODO(yuraolex): probably this is temporary since protocol state will be serving global params
var _ protocol.Params = (*Params)(nil)
// InstanceParams implements the interface protocol.InstanceParams. All functions
// are served on demand directly from the database, _without_ any caching.
type InstanceParams struct {
state *State
}
var _ protocol.InstanceParams = (*InstanceParams)(nil)
func (p InstanceParams) EpochFallbackTriggered() (bool, error) {
⋮
}
func (p InstanceParams) FinalizedRoot() (*flow.Header, error) {
⋮
}
⋮
There is only a single point in the code, where a Params
struct is initialized
flow-go/state/protocol/badger/state.go
Lines 693 to 696 in 0295799
return Params{ | |
GlobalParams: state.protocolState.GlobalParams(), | |
state: state, | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have implemented it but I think we need to revisit how params are implemented, there is a strong coupling between State
and InstanceParams
Co-authored-by: Alexander Hentschel <alex.hentschel@axiomzen.co>
https://github.com/dapperlabs/flow-go/issues/5514
Context
This PR introduces dynamic protocol state components in
State
,FollowerState
,ParticipantState
, core components that do critical protocol related work, such as: process information in blocks, extend block tree, process service events, transition epochs, serve data using snapshots, etc.Changes in this PR are absolute minim to start relying on protocol state for getting some of the data. In this PR protocol state is used to determine epoch phases, so all logic for epoch transitions, processing service events moved from
EpochStatuses
to dynamic protocol state. In next issue I will fetch all available data from protocol state and remove redundant data that we have right now.