Skip to content

Commit

Permalink
Merge pull request #4811 from onflow/leo/read-trie-root-hash-from-che…
Browse files Browse the repository at this point in the history
…ckpoint

read trie root hash from checkpoint
  • Loading branch information
zhangchiqing authored Oct 16, 2023
2 parents e5db9ee + d4731b9 commit 0435707
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 8 deletions.
37 changes: 29 additions & 8 deletions ledger/complete/mtrie/flattener/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
encPayloadLengthSize = 4

encodedTrieSize = encNodeIndexSize + encRegCountSize + encRegSizeSize + encHashSize
EncodedTrieSize = encodedTrieSize
)

const payloadEncodingVersion = 1
Expand Down Expand Up @@ -268,6 +269,13 @@ func ReadNode(reader io.Reader, scratch []byte, getNode func(nodeIndex uint64) (
return n, nil
}

type EncodedTrie struct {
RootIndex uint64
RegCount uint64
RegSize uint64
RootHash hash.Hash
}

// EncodeTrie encodes trie in the following format:
// - root node index (8 byte)
// - allocated reg count (8 byte)
Expand Down Expand Up @@ -305,17 +313,15 @@ func EncodeTrie(trie *trie.MTrie, rootIndex uint64, scratch []byte) []byte {
return buf[:pos]
}

// ReadTrie reconstructs a trie from data read from reader.
func ReadTrie(reader io.Reader, scratch []byte, getNode func(nodeIndex uint64) (*node.Node, error)) (*trie.MTrie, error) {

func ReadEncodedTrie(reader io.Reader, scratch []byte) (EncodedTrie, error) {
if len(scratch) < encodedTrieSize {
scratch = make([]byte, encodedTrieSize)
}

// Read encoded trie
_, err := io.ReadFull(reader, scratch[:encodedTrieSize])
if err != nil {
return nil, fmt.Errorf("failed to read serialized trie: %w", err)
return EncodedTrie{}, fmt.Errorf("failed to read serialized trie: %w", err)
}

pos := 0
Expand All @@ -335,21 +341,36 @@ func ReadTrie(reader io.Reader, scratch []byte, getNode func(nodeIndex uint64) (
// Decode root node hash
readRootHash, err := hash.ToHash(scratch[pos : pos+encHashSize])
if err != nil {
return nil, fmt.Errorf("failed to decode hash of serialized trie: %w", err)
return EncodedTrie{}, fmt.Errorf("failed to decode hash of serialized trie: %w", err)
}

return EncodedTrie{
RootIndex: rootIndex,
RegCount: regCount,
RegSize: regSize,
RootHash: readRootHash,
}, nil
}

// ReadTrie reconstructs a trie from data read from reader.
func ReadTrie(reader io.Reader, scratch []byte, getNode func(nodeIndex uint64) (*node.Node, error)) (*trie.MTrie, error) {
encodedTrie, err := ReadEncodedTrie(reader, scratch)
if err != nil {
return nil, err
}

rootNode, err := getNode(rootIndex)
rootNode, err := getNode(encodedTrie.RootIndex)
if err != nil {
return nil, fmt.Errorf("failed to find root node of serialized trie: %w", err)
}

mtrie, err := trie.NewMTrie(rootNode, regCount, regSize)
mtrie, err := trie.NewMTrie(rootNode, encodedTrie.RegCount, encodedTrie.RegSize)
if err != nil {
return nil, fmt.Errorf("failed to restore serialized trie: %w", err)
}

rootHash := mtrie.RootHash()
if !rootHash.Equals(ledger.RootHash(readRootHash)) {
if !rootHash.Equals(ledger.RootHash(encodedTrie.RootHash)) {
return nil, fmt.Errorf("failed to restore serialized trie: roothash doesn't match")
}

Expand Down
57 changes: 57 additions & 0 deletions ledger/complete/wal/checkpoint_v6_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/rs/zerolog"

"github.com/onflow/flow-go/ledger"
"github.com/onflow/flow-go/ledger/complete/mtrie/flattener"
"github.com/onflow/flow-go/ledger/complete/mtrie/node"
"github.com/onflow/flow-go/ledger/complete/mtrie/trie"
Expand All @@ -19,6 +20,8 @@ import (
// ErrEOFNotReached for indicating end of file not reached error
var ErrEOFNotReached = errors.New("expect to reach EOF, but actually didn't")

var ReadTriesRootHash = readTriesRootHash

// readCheckpointV6 reads checkpoint file from a main file and 17 file parts.
// the main file stores:
// - version
Expand Down Expand Up @@ -631,6 +634,60 @@ func readTopLevelTries(dir string, fileName string, subtrieNodes [][]*node.Node,
return tries, nil
}

func readTriesRootHash(logger zerolog.Logger, dir string, fileName string) (
trieRoots []ledger.RootHash,
errToReturn error,
) {

filepath, _ := filePathTopTries(dir, fileName)
file, err := os.Open(filepath)
if err != nil {
return nil, fmt.Errorf("could not open file %v: %w", filepath, err)
}
defer func(file *os.File) {
evictErr := evictFileFromLinuxPageCache(file, false, logger)
if evictErr != nil {
logger.Warn().Msgf("failed to evict top trie file %s from Linux page cache: %s", filepath, evictErr)
// No need to return this error because it's possible to continue normal operations.
}
errToReturn = closeAndMergeError(file, errToReturn)
}(file)

// read and validate magic bytes and version
err = validateFileHeader(MagicBytesCheckpointToptrie, VersionV6, file)
if err != nil {
return nil, err
}

// read subtrie Node count and validate
_, triesCount, _, err := readTopTriesFooter(file)
if err != nil {
return nil, fmt.Errorf("could not read top tries footer: %w", err)
}

footerOffset := encNodeCountSize + encTrieCountSize + crc32SumSize
trieRootOffset := footerOffset + flattener.EncodedTrieSize*int(triesCount)

_, err = file.Seek(int64(-trieRootOffset), io.SeekEnd)
if err != nil {
return nil, fmt.Errorf("could not seek to 0: %w", err)
}

reader := bufio.NewReaderSize(file, defaultBufioReadSize)
trieRoots = make([]ledger.RootHash, 0, triesCount)
scratch := make([]byte, 1024*4) // must not be less than 1024
for i := 0; i < int(triesCount); i++ {
trieRootNode, err := flattener.ReadEncodedTrie(reader, scratch)
if err != nil {
return nil, fmt.Errorf("could not read trie root node: %w", err)
}

trieRoots = append(trieRoots, ledger.RootHash(trieRootNode.RootHash))
}

return trieRoots, nil
}

func readFileHeader(reader io.Reader) (uint16, uint16, error) {
bytes := make([]byte, encMagicSize+encVersionSize)
_, err := io.ReadFull(reader, bytes)
Expand Down
34 changes: 34 additions & 0 deletions ledger/complete/wal/checkpoint_v6_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,3 +590,37 @@ func TestCopyCheckpointFileV6(t *testing.T) {
requireTriesEqual(t, tries, decoded)
})
}

func TestReadCheckpointRootHash(t *testing.T) {
unittest.RunWithTempDir(t, func(dir string) {
tries := createSimpleTrie(t)
fileName := "checkpoint"
logger := unittest.Logger()
require.NoErrorf(t, StoreCheckpointV6Concurrently(tries, dir, fileName, logger), "fail to store checkpoint")

trieRoots, err := readTriesRootHash(logger, dir, fileName)
require.NoError(t, err)
for i, root := range trieRoots {
expectedHash := tries[i].RootHash()
require.Equal(t, expectedHash, root)
}
require.Equal(t, len(tries), len(trieRoots))
})
}

func TestReadCheckpointRootHashMulti(t *testing.T) {
unittest.RunWithTempDir(t, func(dir string) {
tries := createMultipleRandomTries(t)
fileName := "checkpoint"
logger := unittest.Logger()
require.NoErrorf(t, StoreCheckpointV6Concurrently(tries, dir, fileName, logger), "fail to store checkpoint")

trieRoots, err := readTriesRootHash(logger, dir, fileName)
require.NoError(t, err)
for i, root := range trieRoots {
expectedHash := tries[i].RootHash()
require.Equal(t, expectedHash, root)
}
require.Equal(t, len(tries), len(trieRoots))
})
}

0 comments on commit 0435707

Please sign in to comment.