Skip to content

Commit

Permalink
core, eth, les: fix messy code (#15367)
Browse files Browse the repository at this point in the history
* core, eth, les: fix messy code

* les: fixed tx status test and rlp encoding

* core: add a workaround for light sync
  • Loading branch information
karalabe authored Oct 25, 2017
1 parent ca376ea commit 0095531
Show file tree
Hide file tree
Showing 11 changed files with 175 additions and 203 deletions.
67 changes: 24 additions & 43 deletions core/bloombits/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,16 @@ type partialMatches struct {
// Retrieval represents a request for retrieval task assignments for a given
// bit with the given number of fetch elements, or a response for such a request.
// It can also have the actual results set to be used as a delivery data struct.
//
// The contest and error fields are used by the light client to terminate matching
// early if an error is enountered on some path of the pipeline.
type Retrieval struct {
Bit uint
Sections []uint64
Bitsets [][]byte
Error error
Context context.Context

Context context.Context
Error error
}

// Matcher is a pipelined system of schedulers and logic matchers which perform
Expand Down Expand Up @@ -506,54 +510,31 @@ func (m *Matcher) distributor(dist chan *request, session *MatcherSession) {
type MatcherSession struct {
matcher *Matcher

quit chan struct{} // Quit channel to request pipeline termination
kill chan struct{} // Term channel to signal non-graceful forced shutdown
ctx context.Context
err error
stopping bool
lock sync.Mutex
pend sync.WaitGroup
closer sync.Once // Sync object to ensure we only ever close once
quit chan struct{} // Quit channel to request pipeline termination
kill chan struct{} // Term channel to signal non-graceful forced shutdown

ctx context.Context // Context used by the light client to abort filtering
err atomic.Value // Global error to track retrieval failures deep in the chain

pend sync.WaitGroup
}

// Close stops the matching process and waits for all subprocesses to terminate
// before returning. The timeout may be used for graceful shutdown, allowing the
// currently running retrievals to complete before this time.
func (s *MatcherSession) Close() {
s.lock.Lock()
stopping := s.stopping
s.stopping = true
s.lock.Unlock()
// ensure that we only close the session once
if stopping {
return
}

// Bail out if the matcher is not running
select {
case <-s.quit:
return
default:
}
// Signal termination and wait for all goroutines to tear down
close(s.quit)
time.AfterFunc(time.Second, func() { close(s.kill) })
s.pend.Wait()
s.closer.Do(func() {
// Signal termination and wait for all goroutines to tear down
close(s.quit)
time.AfterFunc(time.Second, func() { close(s.kill) })
s.pend.Wait()
})
}

// setError sets an error and stops the session
func (s *MatcherSession) setError(err error) {
s.lock.Lock()
s.err = err
s.lock.Unlock()
s.Close()
}

// Error returns an error if one has happened during the session
// Error returns any failure encountered during the matching session.
func (s *MatcherSession) Error() error {
s.lock.Lock()
defer s.lock.Unlock()

return s.err
return s.err.Load().(error)
}

// AllocateRetrieval assigns a bloom bit index to a client process that can either
Expand Down Expand Up @@ -655,9 +636,9 @@ func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan

result := <-request
if result.Error != nil {
s.setError(result.Error)
s.err.Store(result.Error)
s.Close()
}

s.DeliverSections(result.Bit, result.Sections, result.Bitsets)
}
}
Expand Down
49 changes: 25 additions & 24 deletions core/chain_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,25 @@ import (
type ChainIndexerBackend interface {
// Reset initiates the processing of a new chain segment, potentially terminating
// any partially completed operations (in case of a reorg).
Reset(section uint64, lastSectionHead common.Hash) error
Reset(section uint64, prevHead common.Hash) error

// Process crunches through the next header in the chain segment. The caller
// will ensure a sequential order of headers.
Process(header *types.Header)

// Commit finalizes the section metadata and stores it into the database. This
// interface will usually be a batch writer.
// Commit finalizes the section metadata and stores it into the database.
Commit() error
}

// ChainIndexerChain interface is used for connecting the indexer to a blockchain
type ChainIndexerChain interface {
// CurrentHeader retrieves the latest locally known header.
CurrentHeader() *types.Header

// SubscribeChainEvent subscribes to new head header notifications.
SubscribeChainEvent(ch chan<- ChainEvent) event.Subscription
}

// ChainIndexer does a post-processing job for equally sized sections of the
// canonical chain (like BlooomBits and CHT structures). A ChainIndexer is
// connected to the blockchain through the event system by starting a
Expand Down Expand Up @@ -114,21 +122,14 @@ func (c *ChainIndexer) AddKnownSectionHead(section uint64, shead common.Hash) {
c.setValidSections(section + 1)
}

// IndexerChain interface is used for connecting the indexer to a blockchain
type IndexerChain interface {
CurrentHeader() *types.Header
SubscribeChainEvent(ch chan<- ChainEvent) event.Subscription
}

// Start creates a goroutine to feed chain head events into the indexer for
// cascading background processing. Children do not need to be started, they
// are notified about new events by their parents.
func (c *ChainIndexer) Start(chain IndexerChain) {
ch := make(chan ChainEvent, 10)
sub := chain.SubscribeChainEvent(ch)
currentHeader := chain.CurrentHeader()
func (c *ChainIndexer) Start(chain ChainIndexerChain) {
events := make(chan ChainEvent, 10)
sub := chain.SubscribeChainEvent(events)

go c.eventLoop(currentHeader, ch, sub)
go c.eventLoop(chain.CurrentHeader(), events, sub)
}

// Close tears down all goroutines belonging to the indexer and returns any error
Expand All @@ -149,14 +150,12 @@ func (c *ChainIndexer) Close() error {
errs = append(errs, err)
}
}

// Close all children
for _, child := range c.children {
if err := child.Close(); err != nil {
errs = append(errs, err)
}
}

// Return any failures
switch {
case len(errs) == 0:
Expand All @@ -173,7 +172,7 @@ func (c *ChainIndexer) Close() error {
// eventLoop is a secondary - optional - event loop of the indexer which is only
// started for the outermost indexer to push chain head events into a processing
// queue.
func (c *ChainIndexer) eventLoop(currentHeader *types.Header, ch chan ChainEvent, sub event.Subscription) {
func (c *ChainIndexer) eventLoop(currentHeader *types.Header, events chan ChainEvent, sub event.Subscription) {
// Mark the chain indexer as active, requiring an additional teardown
atomic.StoreUint32(&c.active, 1)

Expand All @@ -193,7 +192,7 @@ func (c *ChainIndexer) eventLoop(currentHeader *types.Header, ch chan ChainEvent
errc <- nil
return

case ev, ok := <-ch:
case ev, ok := <-events:
// Received a new event, ensure it's not nil (closing) and update
if !ok {
errc := <-c.quit
Expand All @@ -202,6 +201,8 @@ func (c *ChainIndexer) eventLoop(currentHeader *types.Header, ch chan ChainEvent
}
header := ev.Block.Header()
if header.ParentHash != prevHash {
// Reorg to the common ancestor (might not exist in light sync mode, skip reorg then)
// TODO(karalabe, zsfelfoldi): This seems a bit brittle, can we detect this case explicitly?
if h := FindCommonAncestor(c.chainDb, prevHeader, header); h != nil {
c.newHead(h.Number.Uint64(), true)
}
Expand Down Expand Up @@ -259,8 +260,8 @@ func (c *ChainIndexer) newHead(head uint64, reorg bool) {
// down into the processing backend.
func (c *ChainIndexer) updateLoop() {
var (
updated time.Time
updateMsg bool
updating bool
updated time.Time
)

for {
Expand All @@ -277,7 +278,7 @@ func (c *ChainIndexer) updateLoop() {
// Periodically print an upgrade log message to the user
if time.Since(updated) > 8*time.Second {
if c.knownSections > c.storedSections+1 {
updateMsg = true
updating = true
c.log.Info("Upgrading chain index", "percentage", c.storedSections*100/c.knownSections)
}
updated = time.Now()
Expand All @@ -300,8 +301,8 @@ func (c *ChainIndexer) updateLoop() {
if err == nil && oldHead == c.SectionHead(section-1) {
c.setSectionHead(section, newHead)
c.setValidSections(section + 1)
if c.storedSections == c.knownSections && updateMsg {
updateMsg = false
if c.storedSections == c.knownSections && updating {
updating = false
c.log.Info("Finished upgrading chain index")
}

Expand Down Expand Up @@ -412,7 +413,7 @@ func (c *ChainIndexer) setValidSections(sections uint64) {
c.storedSections = sections // needed if new > old
}

// sectionHead retrieves the last block hash of a processed section from the
// SectionHead retrieves the last block hash of a processed section from the
// index database.
func (c *ChainIndexer) SectionHead(section uint64) common.Hash {
var data [8]byte
Expand Down
2 changes: 1 addition & 1 deletion core/chain_indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (b *testChainIndexBackend) reorg(headNum uint64) uint64 {
return b.stored * b.indexer.sectionSize
}

func (b *testChainIndexBackend) Reset(section uint64, lastSectionHead common.Hash) error {
func (b *testChainIndexBackend) Reset(section uint64, prevHead common.Hash) error {
b.section = section
b.headerCnt = 0
return nil
Expand Down
10 changes: 5 additions & 5 deletions core/tx_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,13 +384,13 @@ func (h *priceHeap) Pop() interface{} {
// txPricedList is a price-sorted heap to allow operating on transactions pool
// contents in a price-incrementing way.
type txPricedList struct {
all *map[common.Hash]txLookupRec // Pointer to the map of all transactions
items *priceHeap // Heap of prices of all the stored transactions
stales int // Number of stale price points to (re-heap trigger)
all *map[common.Hash]*types.Transaction // Pointer to the map of all transactions
items *priceHeap // Heap of prices of all the stored transactions
stales int // Number of stale price points to (re-heap trigger)
}

// newTxPricedList creates a new price-sorted transaction heap.
func newTxPricedList(all *map[common.Hash]txLookupRec) *txPricedList {
func newTxPricedList(all *map[common.Hash]*types.Transaction) *txPricedList {
return &txPricedList{
all: all,
items: new(priceHeap),
Expand All @@ -416,7 +416,7 @@ func (l *txPricedList) Removed() {

l.stales, l.items = 0, &reheap
for _, tx := range *l.all {
*l.items = append(*l.items, tx.tx)
*l.items = append(*l.items, tx)
}
heap.Init(l.items)
}
Expand Down
Loading

0 comments on commit 0095531

Please sign in to comment.