Skip to content

Commit

Permalink
V1.10.26 rebase wip (#289)
Browse files Browse the repository at this point in the history
* eth/protocols/snap: fix problems due to idle-but-busy peers (ethereum#25651)

* eth/protocols/snap: throttle trie heal requests when peers DoS us (ethereum#25666)

* eth/protocols/snap: throttle trie heal requests when peers DoS us

* eth/protocols/snap: lower heal throttle log to debug

Co-authored-by: Martin Holst Swende <martin@swende.se>

* eth/protocols/snap: fix comment

Co-authored-by: Martin Holst Swende <martin@swende.se>

* trie: check childrens' existence concurrently for snap heal (ethereum#25694)

* eth: fix a rare datarace on CHT challenge reply / shutdown (ethereum#25831)

* eth/filters: change filter block to be by-ref (ethereum#26054)

This PR changes the block field in the filter to be a pointer, to disambiguate between empty hash and no hash

* rpc: handle wrong HTTP batch response length (ethereum#26064)

* params: release geth v1.10.26 stable

* V1.10.25 statediff v4 wip (#275)

* Statediff Geth

Handle conflicts (#244)

* Handle conflicts

* Update go mod file versions

* Make lint changes

Disassociate block number from the indexer object

Update ipld-eth-db ref

Refactor builder code to make it reusable

Use prefix comparison for account selective statediffing

Update builder unit tests

Add mode to write to CSV files in statediff file writer (#249)

* Change file writing mode to csv files

* Implement writer interface for file indexer

* Implement option for csv or sql in file mode

* Close files in CSV writer

* Add tests for CSV file mode

* Implement CSV file for watched addresses

* Separate test configs for CSV and SQL

* Refactor common code for file indexer tests

Update indexer to include block hash in receipts and logs (#256)

* Update indexer to include block hash in receipts and logs

* Upgrade ipld-eth-db image in docker-compose to run tests

Use watched addresses from direct indexing params by default while serving statediff APIs (#262)

* Use watched addresses from direct indexing params in statediff APIs by default

* Avoid using indexer object when direct indexing is off

* Add nil check before accessing watched addresses from direct indexing params

Rebase missed these changes needed at 1.10.20

Flags cleanup for CLI changes and linter complaints

Linter appeasements to achieve perfection

enforce go 1.18 for check (#267)

* enforce go 1.18 for check

* tests on 1.18 as well

* adding db yml for possible change in docker-compose behavior in yml parsing

Add indexer tests for handling non canonical blocks (#254)

* Add indexer tests for header and transactions in a non canonical block

* Add indexer tests for receipts in a non-canonical block and refactor

* Add indexer tests for logs in a non-canonical block

* Add indexer tests for state and storage nodes in a non-canonical block

* Add indexer tests for non-canonical block at another height

* Avoid passing address of a pointer

* Update refs in GitHub workflow

* Add genesis file path to stack-orchestrator config in GitHub workflow

* Add descriptive comments

fix non-deterministic ordering in unit tests

Refactor indexer tests to avoid duplicate code (#270)

* Refactor indexer tests to avoid duplicate code

* Refactor file mode indexer tests

* Fix expected db stats for sqlx after tx closure

* Refactor indexer tests for legacy block

* Refactor mainnet indexer tests

* Refactor tests for watched addressess methods

* Fix query in legacy indexer test

rebase and resolve onto 1.10.23... still error out of index related to GetLeafKeys

changed trie.Commit behavior was subtle about not not flushing to disk without an Update

* no merge nodeset throws nil

* linter appeasement

Cerc refactor (#281)

* first pass cerc refactor in cicd

* 1st attempt to publish binary to git.vdb.to from github release

* docker build step mangled

* docker build step mangled

* wrong username for docker login... which still succeeded

* circcicd is not cerccicd

* bad hostname

adding manual override of binary publish to git.vdb.to for development/emergency (#282)

Cerc io publish fix (#284)

* adding manual override of binary publish to git.vdb.to for development/emergency

* Create manual_binary_publish.yaml (#283)

* github did not pick up workflow added outside of its UI and I still cannot spell cerc right

rawdb helper functions for cold levelDB sync export

Jenkins reborn (#285)

* initial build and output testing...  lots of trial and error

* clean up for working (but failing) unit test geth with ubuntu foundation image

* linter problem on comments in version

* trying linter appeasement with gofmt output on versions.go

Co-authored-by: Martin Holst Swende <martin@swende.se>
Co-authored-by: Péter Szilágyi <peterke@gmail.com>
Co-authored-by: Jordan Krage <jmank88@gmail.com>
Co-authored-by: Felix Lange <fjl@twurst.com>
  • Loading branch information
5 people authored Nov 4, 2022
1 parent 70d6dbb commit fda1723
Show file tree
Hide file tree
Showing 8 changed files with 267 additions and 71 deletions.
10 changes: 5 additions & 5 deletions eth/filters/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ type Filter struct {
addresses []common.Address
topics [][]common.Hash

block common.Hash // Block hash if filtering a single block
begin, end int64 // Range interval if filtering multiple blocks
block *common.Hash // Block hash if filtering a single block
begin, end int64 // Range interval if filtering multiple blocks

matcher *bloombits.Matcher
}
Expand Down Expand Up @@ -78,7 +78,7 @@ func (sys *FilterSystem) NewRangeFilter(begin, end int64, addresses []common.Add
func (sys *FilterSystem) NewBlockFilter(block common.Hash, addresses []common.Address, topics [][]common.Hash) *Filter {
// Create a generic filter and convert it into a block filter
filter := newFilter(sys, addresses, topics)
filter.block = block
filter.block = &block
return filter
}

Expand All @@ -96,8 +96,8 @@ func newFilter(sys *FilterSystem, addresses []common.Address, topics [][]common.
// first block that contains matches, updating the start of the filter accordingly.
func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
// If we're doing singleton block filtering, execute and return
if f.block != (common.Hash{}) {
header, err := f.sys.backend.HeaderByHash(ctx, f.block)
if f.block != nil {
header, err := f.sys.backend.HeaderByHash(ctx, *f.block)
if err != nil {
return nil, err
}
Expand Down
18 changes: 14 additions & 4 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,11 +391,16 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
if h.checkpointHash != (common.Hash{}) {
// Request the peer's checkpoint header for chain height/weight validation
resCh := make(chan *eth.Response)
if _, err := peer.RequestHeadersByNumber(h.checkpointNumber, 1, 0, false, resCh); err != nil {

req, err := peer.RequestHeadersByNumber(h.checkpointNumber, 1, 0, false, resCh)
if err != nil {
return err
}
// Start a timer to disconnect if the peer doesn't reply in time
go func() {
// Ensure the request gets cancelled in case of error/drop
defer req.Close()

timeout := time.NewTimer(syncChallengeTimeout)
defer timeout.Stop()

Expand Down Expand Up @@ -437,10 +442,15 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
// If we have any explicit peer required block hashes, request them
for number, hash := range h.requiredBlocks {
resCh := make(chan *eth.Response)
if _, err := peer.RequestHeadersByNumber(number, 1, 0, false, resCh); err != nil {

req, err := peer.RequestHeadersByNumber(number, 1, 0, false, resCh)
if err != nil {
return err
}
go func(number uint64, hash common.Hash) {
go func(number uint64, hash common.Hash, req *eth.Request) {
// Ensure the request gets cancelled in case of error/drop
defer req.Close()

timeout := time.NewTimer(syncChallengeTimeout)
defer timeout.Stop()

Expand Down Expand Up @@ -469,7 +479,7 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
peer.Log().Warn("Required block challenge timed out, dropping", "addr", peer.RemoteAddr(), "type", peer.Name())
h.removePeer(peer.ID())
}
}(number, hash)
}(number, hash, req)
}
// Handle incoming messages until the connection is torn down
return handler(peer)
Expand Down
197 changes: 155 additions & 42 deletions eth/protocols/snap/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ import (
"encoding/json"
"errors"
"fmt"
gomath "math"
"math/big"
"math/rand"
"sort"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -78,6 +80,29 @@ const (
// and waste round trip times. If it's too high, we're capping responses and
// waste bandwidth.
maxTrieRequestCount = maxRequestSize / 512

// trienodeHealRateMeasurementImpact is the impact a single measurement has on
// the local node's trienode processing capacity. A value closer to 0 reacts
// slower to sudden changes, but it is also more stable against temporary hiccups.
trienodeHealRateMeasurementImpact = 0.005

// minTrienodeHealThrottle is the minimum divisor for throttling trie node
// heal requests to avoid overloading the local node and exessively expanding
// the state trie bedth wise.
minTrienodeHealThrottle = 1

// maxTrienodeHealThrottle is the maximum divisor for throttling trie node
// heal requests to avoid overloading the local node and exessively expanding
// the state trie bedth wise.
maxTrienodeHealThrottle = maxTrieRequestCount

// trienodeHealThrottleIncrease is the multiplier for the throttle when the
// rate of arriving data is higher than the rate of processing it.
trienodeHealThrottleIncrease = 1.33

// trienodeHealThrottleDecrease is the divisor for the throttle when the
// rate of arriving data is lower than the rate of processing it.
trienodeHealThrottleDecrease = 1.25
)

var (
Expand Down Expand Up @@ -431,6 +456,11 @@ type Syncer struct {
trienodeHealReqs map[uint64]*trienodeHealRequest // Trie node requests currently running
bytecodeHealReqs map[uint64]*bytecodeHealRequest // Bytecode requests currently running

trienodeHealRate float64 // Average heal rate for processing trie node data
trienodeHealPend uint64 // Number of trie nodes currently pending for processing
trienodeHealThrottle float64 // Divisor for throttling the amount of trienode heal data requested
trienodeHealThrottled time.Time // Timestamp the last time the throttle was updated

trienodeHealSynced uint64 // Number of state trie nodes downloaded
trienodeHealBytes common.StorageSize // Number of state trie bytes persisted to disk
trienodeHealDups uint64 // Number of state trie nodes already processed
Expand Down Expand Up @@ -476,9 +506,10 @@ func NewSyncer(db ethdb.KeyValueStore) *Syncer {
trienodeHealIdlers: make(map[string]struct{}),
bytecodeHealIdlers: make(map[string]struct{}),

trienodeHealReqs: make(map[uint64]*trienodeHealRequest),
bytecodeHealReqs: make(map[uint64]*bytecodeHealRequest),
stateWriter: db.NewBatch(),
trienodeHealReqs: make(map[uint64]*trienodeHealRequest),
bytecodeHealReqs: make(map[uint64]*bytecodeHealRequest),
trienodeHealThrottle: maxTrienodeHealThrottle, // Tune downward instead of insta-filling with junk
stateWriter: db.NewBatch(),

extProgress: new(SyncProgress),
}
Expand Down Expand Up @@ -1321,6 +1352,10 @@ func (s *Syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fai
if cap > maxTrieRequestCount {
cap = maxTrieRequestCount
}
cap = int(float64(cap) / s.trienodeHealThrottle)
if cap <= 0 {
cap = 1
}
var (
hashes = make([]common.Hash, 0, cap)
paths = make([]string, 0, cap)
Expand Down Expand Up @@ -2090,6 +2125,10 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
// processTrienodeHealResponse integrates an already validated trienode response
// into the healer tasks.
func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) {
var (
start = time.Now()
fills int
)
for i, hash := range res.hashes {
node := res.nodes[i]

Expand All @@ -2098,6 +2137,8 @@ func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) {
res.task.trieTasks[res.paths[i]] = res.hashes[i]
continue
}
fills++

// Push the trie node into the state syncer
s.trienodeHealSynced++
s.trienodeHealBytes += common.StorageSize(len(node))
Expand All @@ -2121,6 +2162,50 @@ func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) {
log.Crit("Failed to persist healing data", "err", err)
}
log.Debug("Persisted set of healing data", "type", "trienodes", "bytes", common.StorageSize(batch.ValueSize()))

// Calculate the processing rate of one filled trie node
rate := float64(fills) / (float64(time.Since(start)) / float64(time.Second))

// Update the currently measured trienode queueing and processing throughput.
//
// The processing rate needs to be updated uniformly independent if we've
// processed 1x100 trie nodes or 100x1 to keep the rate consistent even in
// the face of varying network packets. As such, we cannot just measure the
// time it took to process N trie nodes and update once, we need one update
// per trie node.
//
// Naively, that would be:
//
// for i:=0; i<fills; i++ {
// healRate = (1-measurementImpact)*oldRate + measurementImpact*newRate
// }
//
// Essentially, a recursive expansion of HR = (1-MI)*HR + MI*NR.
//
// We can expand that formula for the Nth item as:
// HR(N) = (1-MI)^N*OR + (1-MI)^(N-1)*MI*NR + (1-MI)^(N-2)*MI*NR + ... + (1-MI)^0*MI*NR
//
// The above is a geometric sequence that can be summed to:
// HR(N) = (1-MI)^N*(OR-NR) + NR
s.trienodeHealRate = gomath.Pow(1-trienodeHealRateMeasurementImpact, float64(fills))*(s.trienodeHealRate-rate) + rate

pending := atomic.LoadUint64(&s.trienodeHealPend)
if time.Since(s.trienodeHealThrottled) > time.Second {
// Periodically adjust the trie node throttler
if float64(pending) > 2*s.trienodeHealRate {
s.trienodeHealThrottle *= trienodeHealThrottleIncrease
} else {
s.trienodeHealThrottle /= trienodeHealThrottleDecrease
}
if s.trienodeHealThrottle > maxTrienodeHealThrottle {
s.trienodeHealThrottle = maxTrienodeHealThrottle
} else if s.trienodeHealThrottle < minTrienodeHealThrottle {
s.trienodeHealThrottle = minTrienodeHealThrottle
}
s.trienodeHealThrottled = time.Now()

log.Debug("Updated trie node heal throttler", "rate", s.trienodeHealRate, "pending", pending, "throttle", s.trienodeHealThrottle)
}
}

// processBytecodeHealResponse integrates an already validated bytecode response
Expand Down Expand Up @@ -2248,14 +2333,18 @@ func (s *Syncer) OnAccounts(peer SyncPeer, id uint64, hashes []common.Hash, acco
// Whether or not the response is valid, we can mark the peer as idle and
// notify the scheduler to assign a new task. If the response is invalid,
// we'll drop the peer in a bit.
defer func() {
s.lock.Lock()
defer s.lock.Unlock()
if _, ok := s.peers[peer.ID()]; ok {
s.accountIdlers[peer.ID()] = struct{}{}
}
select {
case s.update <- struct{}{}:
default:
}
}()
s.lock.Lock()
if _, ok := s.peers[peer.ID()]; ok {
s.accountIdlers[peer.ID()] = struct{}{}
}
select {
case s.update <- struct{}{}:
default:
}
// Ensure the response is for a valid request
req, ok := s.accountReqs[id]
if !ok {
Expand Down Expand Up @@ -2360,14 +2449,18 @@ func (s *Syncer) onByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) error
// Whether or not the response is valid, we can mark the peer as idle and
// notify the scheduler to assign a new task. If the response is invalid,
// we'll drop the peer in a bit.
defer func() {
s.lock.Lock()
defer s.lock.Unlock()
if _, ok := s.peers[peer.ID()]; ok {
s.bytecodeIdlers[peer.ID()] = struct{}{}
}
select {
case s.update <- struct{}{}:
default:
}
}()
s.lock.Lock()
if _, ok := s.peers[peer.ID()]; ok {
s.bytecodeIdlers[peer.ID()] = struct{}{}
}
select {
case s.update <- struct{}{}:
default:
}
// Ensure the response is for a valid request
req, ok := s.bytecodeReqs[id]
if !ok {
Expand Down Expand Up @@ -2469,14 +2562,18 @@ func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slo
// Whether or not the response is valid, we can mark the peer as idle and
// notify the scheduler to assign a new task. If the response is invalid,
// we'll drop the peer in a bit.
defer func() {
s.lock.Lock()
defer s.lock.Unlock()
if _, ok := s.peers[peer.ID()]; ok {
s.storageIdlers[peer.ID()] = struct{}{}
}
select {
case s.update <- struct{}{}:
default:
}
}()
s.lock.Lock()
if _, ok := s.peers[peer.ID()]; ok {
s.storageIdlers[peer.ID()] = struct{}{}
}
select {
case s.update <- struct{}{}:
default:
}
// Ensure the response is for a valid request
req, ok := s.storageReqs[id]
if !ok {
Expand Down Expand Up @@ -2596,14 +2693,18 @@ func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error
// Whether or not the response is valid, we can mark the peer as idle and
// notify the scheduler to assign a new task. If the response is invalid,
// we'll drop the peer in a bit.
defer func() {
s.lock.Lock()
defer s.lock.Unlock()
if _, ok := s.peers[peer.ID()]; ok {
s.trienodeHealIdlers[peer.ID()] = struct{}{}
}
select {
case s.update <- struct{}{}:
default:
}
}()
s.lock.Lock()
if _, ok := s.peers[peer.ID()]; ok {
s.trienodeHealIdlers[peer.ID()] = struct{}{}
}
select {
case s.update <- struct{}{}:
default:
}
// Ensure the response is for a valid request
req, ok := s.trienodeHealReqs[id]
if !ok {
Expand Down Expand Up @@ -2639,10 +2740,12 @@ func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error

// Cross reference the requested trienodes with the response to find gaps
// that the serving node is missing
hasher := sha3.NewLegacyKeccak256().(crypto.KeccakState)
hash := make([]byte, 32)

nodes := make([][]byte, len(req.hashes))
var (
hasher = sha3.NewLegacyKeccak256().(crypto.KeccakState)
hash = make([]byte, 32)
nodes = make([][]byte, len(req.hashes))
fills uint64
)
for i, j := 0, 0; i < len(trienodes); i++ {
// Find the next hash that we've been served, leaving misses with nils
hasher.Reset()
Expand All @@ -2654,16 +2757,22 @@ func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error
}
if j < len(req.hashes) {
nodes[j] = trienodes[i]
fills++
j++
continue
}
// We've either ran out of hashes, or got unrequested data
logger.Warn("Unexpected healing trienodes", "count", len(trienodes)-i)

// Signal this request as failed, and ready for rescheduling
s.scheduleRevertTrienodeHealRequest(req)
return errors.New("unexpected healing trienode")
}
// Response validated, send it to the scheduler for filling
atomic.AddUint64(&s.trienodeHealPend, fills)
defer func() {
atomic.AddUint64(&s.trienodeHealPend, ^(fills - 1))
}()
response := &trienodeHealResponse{
paths: req.paths,
task: req.task,
Expand Down Expand Up @@ -2691,14 +2800,18 @@ func (s *Syncer) onHealByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) e
// Whether or not the response is valid, we can mark the peer as idle and
// notify the scheduler to assign a new task. If the response is invalid,
// we'll drop the peer in a bit.
defer func() {
s.lock.Lock()
defer s.lock.Unlock()
if _, ok := s.peers[peer.ID()]; ok {
s.bytecodeHealIdlers[peer.ID()] = struct{}{}
}
select {
case s.update <- struct{}{}:
default:
}
}()
s.lock.Lock()
if _, ok := s.peers[peer.ID()]; ok {
s.bytecodeHealIdlers[peer.ID()] = struct{}{}
}
select {
case s.update <- struct{}{}:
default:
}
// Ensure the response is for a valid request
req, ok := s.bytecodeHealReqs[id]
if !ok {
Expand Down
Loading

0 comments on commit fda1723

Please sign in to comment.