From 1d264e343d909c8f36a4be54dc4ef724a461235c Mon Sep 17 00:00:00 2001 From: Somnath Banerjee Date: Thu, 11 Apr 2024 10:18:45 +0200 Subject: [PATCH] Add kv.log pruning alongside logIndex prune --- eth/stagedsync/stage_log_index.go | 24 ++++++++++++++++++++++-- eth/stagedsync/stage_log_index_test.go | 9 +++++++++ 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/eth/stagedsync/stage_log_index.go b/eth/stagedsync/stage_log_index.go index 43a4de58dd4..9f7d91833b0 100644 --- a/eth/stagedsync/stage_log_index.go +++ b/eth/stagedsync/stage_log_index.go @@ -106,6 +106,7 @@ func SpawnLogIndex(s *StageState, tx kv.RwTx, cfg LogIndexCfg, ctx context.Conte return nil } +// Add the topics and address index for logs, if not in prune range or addr in noPruneContracts func promoteLogIndex(logPrefix string, tx kv.RwTx, start uint64, endBlock uint64, pruneBlock uint64, cfg LogIndexCfg, ctx context.Context, logger log.Logger) error { quit := ctx.Done() logEvery := time.NewTicker(30 * time.Second) @@ -377,13 +378,18 @@ func pruneOldLogChunks(tx kv.RwTx, bucket string, inMem *etl.Collector, pruneTo if err != nil { return err } - blockNum := uint64(binary.BigEndian.Uint32(k[len(key):])) + var blockNum uint64 + if bucket == kv.Log { + blockNum = binary.BigEndian.Uint64(k) + } else { + blockNum = uint64(binary.BigEndian.Uint32(k[len(key):])) + } if !bytes.HasPrefix(k, key) || blockNum >= pruneTo { break } if err = c.DeleteCurrent(); err != nil { - return fmt.Errorf("failed delete, block=%d: %w", blockNum, err) + return fmt.Errorf("failed delete log/index, bucket=%v block=%d: %w", bucket, blockNum, err) } } return nil @@ -395,6 +401,7 @@ func pruneOldLogChunks(tx kv.RwTx, bucket string, inMem *etl.Collector, pruneTo return nil } +// Call pruneLogIndex with the current current sync progresses and commit the data to db func PruneLogIndex(s *PruneState, tx kv.RwTx, cfg LogIndexCfg, ctx context.Context, logger log.Logger) (err error) { if !cfg.prune.Receipts.Enabled() { return nil @@ -427,6 +434,7 @@ func PruneLogIndex(s *PruneState, tx kv.RwTx, cfg LogIndexCfg, ctx context.Conte return nil } +// Prune log indexes as well as logs within the prune range func pruneLogIndex(logPrefix string, tx kv.RwTx, tmpDir string, pruneFrom, pruneTo uint64, ctx context.Context, logger log.Logger, noPruneContracts map[libcommon.Address]bool) error { logEvery := time.NewTicker(logInterval) defer logEvery.Stop() @@ -436,6 +444,8 @@ func pruneLogIndex(logPrefix string, tx kv.RwTx, tmpDir string, pruneFrom, prune defer topics.Close() addrs := etl.NewCollector(logPrefix, tmpDir, etl.NewOldestEntryBuffer(bufferSize), logger) defer addrs.Close() + pruneLogKeyCollector := etl.NewCollector(logPrefix, tmpDir, etl.NewOldestEntryBuffer(bufferSize), logger) + defer pruneLogKeyCollector.Close() reader := bytes.NewReader(nil) { @@ -467,8 +477,10 @@ func pruneLogIndex(logPrefix string, tx kv.RwTx, tmpDir string, pruneFrom, prune return fmt.Errorf("receipt unmarshal failed: %w, block=%d", err, binary.BigEndian.Uint64(k)) } + notToPrune := false // To identify whether this log key has addr in noPruneContracts for _, l := range logs { if noPruneContracts != nil && noPruneContracts[l.Address] { + notToPrune = true continue } for _, topic := range l.Topics { @@ -480,6 +492,11 @@ func pruneLogIndex(logPrefix string, tx kv.RwTx, tmpDir string, pruneFrom, prune return err } } + // No logs (or sublogs) for this txId should be pruned + // if one of the logs belongs to noPruneContracts list + if !notToPrune { + pruneLogKeyCollector.Collect(k, nil) + } } } @@ -489,5 +506,8 @@ func pruneLogIndex(logPrefix string, tx kv.RwTx, tmpDir string, pruneFrom, prune if err := pruneOldLogChunks(tx, kv.LogAddressIndex, addrs, pruneTo, ctx); err != nil { return err } + if err := pruneOldLogChunks(tx, kv.Log, pruneLogKeyCollector, pruneTo, ctx); err != nil { + return err + } return nil } diff --git a/eth/stagedsync/stage_log_index_test.go b/eth/stagedsync/stage_log_index_test.go index 93e6bf75a6c..a16eee3f27f 100644 --- a/eth/stagedsync/stage_log_index_test.go +++ b/eth/stagedsync/stage_log_index_test.go @@ -158,6 +158,15 @@ func TestPruneLogIndex(t *testing.T) { require.NoError(err) require.True(total == 3) } + { + total := 0 + err = tx.ForEach(kv.Log, nil, func(k, v []byte) error { + total++ + return nil + }) + require.NoError(err) + require.True(total == 49) // 51 logs have been pruned + } } func TestUnwindLogIndex(t *testing.T) {