Skip to content

Commit

Permalink
rename throttle
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangchiqing committed Mar 4, 2024
1 parent 9a06c2b commit 44f6929
Showing 1 changed file with 10 additions and 10 deletions.
20 changes: 10 additions & 10 deletions engine/execution/ingestion/throttle.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ import (
// blocks until the execution has caught up
const CatchUpThreshold = 500

// Throttle is a helper struct that helps throttle the unexecuted blocks to be sent
// BlockThrottle is a helper struct that throttles the unexecuted blocks to be sent
// to the block queue for execution.
// It is useful for case when execution is falling far behind the finalization, in which case
// we want to throttle the blocks to be sent to the block queue for fetching data to execute
// them. Without throttle, the block queue will be flooded with blocks, and the network
// will be flooded with requests fetching collections, and the EN might quickly run out of memory.
type Throttle struct {
type BlockThrottle struct {
// config
threshold int // catch up threshold

Expand All @@ -46,13 +46,13 @@ type BlockHandler interface {
OnBlock(block *flow.Header) error
}

func NewThrottle(
func NewBlockThrottle(
log zerolog.Logger,
state protocol.State,
execState state.ExecutionState,
headers storage.Headers,
catchupThreshold int,
) (*Throttle, error) {
) (*BlockThrottle, error) {
finalizedHead, err := state.Final().Head()
if err != nil {
return nil, fmt.Errorf("could not get finalized head: %w", err)
Expand All @@ -67,7 +67,7 @@ func NewThrottle(
return nil, fmt.Errorf("executed finalized %v is greater than finalized %v", executed, finalized)
}

return &Throttle{
return &BlockThrottle{
threshold: catchupThreshold,
executed: executed,
finalized: finalized,
Expand All @@ -78,7 +78,7 @@ func NewThrottle(
}, nil
}

func (c *Throttle) Init(processables chan<- flow.Identifier) error {
func (c *BlockThrottle) Init(processables chan<- flow.Identifier) error {
c.mu.Lock()
defer c.mu.Unlock()
if c.inited {
Expand Down Expand Up @@ -108,7 +108,7 @@ func (c *Throttle) Init(processables chan<- flow.Identifier) error {
return nil
}

func (c *Throttle) OnBlockExecuted(executed uint64, _ flow.Identifier) error {
func (c *BlockThrottle) OnBlockExecuted(executed uint64, _ flow.Identifier) error {
c.mu.Lock()
defer c.mu.Unlock()

Expand Down Expand Up @@ -148,7 +148,7 @@ func (c *Throttle) OnBlockExecuted(executed uint64, _ flow.Identifier) error {
return nil
}

func (c *Throttle) BlockProcessable(block *flow.Header, qc *flow.QuorumCertificate) {
func (c *BlockThrottle) BlockProcessable(block *flow.Header, qc *flow.QuorumCertificate) {
c.mu.Lock()
defer c.mu.Unlock()

Expand All @@ -165,7 +165,7 @@ func (c *Throttle) BlockProcessable(block *flow.Header, qc *flow.QuorumCertifica
c.processables <- qc.BlockID
}

func (c *Throttle) OnBlockFinalized(lastFinalized *flow.Header) {
func (c *BlockThrottle) OnBlockFinalized(lastFinalized *flow.Header) {
c.mu.Lock()
defer c.mu.Unlock()
if !c.inited {
Expand All @@ -183,7 +183,7 @@ func (c *Throttle) OnBlockFinalized(lastFinalized *flow.Header) {
c.finalized = lastFinalized.Height
}

func (c *Throttle) caughtUp() bool {
func (c *BlockThrottle) caughtUp() bool {
return caughtUp(c.executed, c.finalized, c.threshold)
}

Expand Down

0 comments on commit 44f6929

Please sign in to comment.