Skip to content

Commit

Permalink
Merge pull request #4727 from onflow/amlandeep/pebble-checkpoint-inge…
Browse files Browse the repository at this point in the history
…stion

[Access] Pebble checkpoint ingestion
  • Loading branch information
koko1123 authored Sep 29, 2023
2 parents 8104663 + 0532094 commit 603e204
Show file tree
Hide file tree
Showing 5 changed files with 405 additions and 2 deletions.
141 changes: 141 additions & 0 deletions storage/pebble/bootstrap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package pebble

import (
"context"
"errors"
"fmt"
"path/filepath"

"github.com/cockroachdb/pebble"
"github.com/rs/zerolog"
"golang.org/x/sync/errgroup"

"github.com/onflow/flow-go/ledger/common/convert"
"github.com/onflow/flow-go/ledger/complete/wal"
)

// ErrAlreadyBootstrapped is the sentinel error for an already bootstrapped pebble instance
var ErrAlreadyBootstrapped = errors.New("found latest key set on badger instance, DB is already bootstrapped")

type RegisterBootstrap struct {
checkpointDir string
checkpointFileName string
log zerolog.Logger
db *pebble.DB
leafNodeChan chan *wal.LeafNode
rootHeight uint64
}

// NewRegisterBootstrap creates the bootstrap object for reading checkpoint data and the height tracker in pebble
// This object must be initialized and RegisterBootstrap.IndexCheckpointFile must be run to have the pebble db instance
// in the correct state to initialize a Registers store.
func NewRegisterBootstrap(
db *pebble.DB,
checkpointFile string,
rootHeight uint64,
log zerolog.Logger,
) (*RegisterBootstrap, error) {
// check for pre-populated heights, fail if it is populated
// i.e. the IndexCheckpointFile function has already run for the db in this directory
isBootstrapped, err := IsBootstrapped(db)
if err != nil {
return nil, err
}
if isBootstrapped {
// key detected, attempt to run bootstrap on corrupt or already bootstrapped data
return nil, ErrAlreadyBootstrapped
}
checkpointDir, checkpointFileName := filepath.Split(checkpointFile)
return &RegisterBootstrap{
checkpointDir: checkpointDir,
checkpointFileName: checkpointFileName,
log: log.With().Str("module", "register_bootstrap").Logger(),
db: db,
leafNodeChan: make(chan *wal.LeafNode, checkpointLeafNodeBufSize),
rootHeight: rootHeight,
}, nil
}

func (b *RegisterBootstrap) batchIndexRegisters(leafNodes []*wal.LeafNode) error {
b.log.Debug().Int("batch_size", len(leafNodes)).Msg("indexing batch of leaf nodes")
batch := b.db.NewBatch()
defer batch.Close()
for _, register := range leafNodes {
payload := register.Payload
key, err := payload.Key()
if err != nil {
return fmt.Errorf("could not get key from register payload: %w", err)
}

registerID, err := convert.LedgerKeyToRegisterID(key)
if err != nil {
return fmt.Errorf("could not get register ID from key: %w", err)
}

encoded := newLookupKey(b.rootHeight, registerID).Bytes()
err = batch.Set(encoded, payload.Value(), nil)
if err != nil {
return fmt.Errorf("failed to set key: %w", err)
}
}
err := batch.Commit(pebble.Sync)
if err != nil {
return fmt.Errorf("failed to commit batch: %w", err)
}
return nil
}

// indexCheckpointFileWorker asynchronously indexes register entries in b.checkpointDir
// with wal.OpenAndReadLeafNodesFromCheckpointV6
func (b *RegisterBootstrap) indexCheckpointFileWorker(ctx context.Context) error {
b.log.Info().Msg("started checkpoint index worker")
// collect leaf nodes to batch index until the channel is closed
batch := make([]*wal.LeafNode, 0, pebbleBootstrapRegisterBatchLen)
for leafNode := range b.leafNodeChan {
select {
case <-ctx.Done():
return nil
default:
batch = append(batch, leafNode)
if len(batch) >= pebbleBootstrapRegisterBatchLen {
err := b.batchIndexRegisters(batch)
if err != nil {
return fmt.Errorf("unable to index registers to pebble in batch: %w", err)
}
batch = make([]*wal.LeafNode, 0, pebbleBootstrapRegisterBatchLen)
}
}
}
// index the remaining registers if didn't reach a batch length.
err := b.batchIndexRegisters(batch)
if err != nil {
return fmt.Errorf("unable to index remaining registers to pebble: %w", err)
}
return nil
}

// IndexCheckpointFile indexes the checkpoint file in the Dir provided
func (b *RegisterBootstrap) IndexCheckpointFile(ctx context.Context) error {
cct, cancel := context.WithCancel(ctx)
defer cancel()
g, gCtx := errgroup.WithContext(cct)
b.log.Info().Msg("indexing checkpoint file for pebble register store")
for i := 0; i < pebbleBootstrapWorkerCount; i++ {
g.Go(func() error {
return b.indexCheckpointFileWorker(gCtx)
})
}
err := wal.OpenAndReadLeafNodesFromCheckpointV6(b.leafNodeChan, b.checkpointDir, b.checkpointFileName, b.log)
if err != nil {
return fmt.Errorf("error reading leaf node: %w", err)
}
if err = g.Wait(); err != nil {
return fmt.Errorf("failed to index checkpoint file: %w", err)
}
b.log.Info().Msg("checkpoint indexing complete")
err = initHeights(b.db, b.rootHeight)
if err != nil {
return fmt.Errorf("could not index latest height: %w", err)
}
return nil
}
250 changes: 250 additions & 0 deletions storage/pebble/bootstrap_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
package pebble

import (
"context"
"encoding/binary"
"fmt"
"io"
"os"
"path"
"testing"

"github.com/cockroachdb/pebble"
"github.com/rs/zerolog"
"github.com/stretchr/testify/require"

"github.com/onflow/flow-go/ledger"
"github.com/onflow/flow-go/ledger/common/convert"
"github.com/onflow/flow-go/ledger/common/testutils"
"github.com/onflow/flow-go/ledger/complete/mtrie/trie"
"github.com/onflow/flow-go/ledger/complete/wal"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/utils/unittest"
)

const defaultRegisterValue = byte('v')

func TestRegisterBootstrap_NewBootstrap(t *testing.T) {
t.Parallel()
unittest.RunWithTempDir(t, func(dir string) {
rootHeight := uint64(1)
log := zerolog.New(io.Discard)
p, err := OpenRegisterPebbleDB(dir)
require.NoError(t, err)
// set heights
require.NoError(t, initHeights(p, rootHeight))
// errors if FirstHeight or LastHeight are populated
_, err = NewRegisterBootstrap(p, dir, rootHeight, log)
require.ErrorIs(t, err, ErrAlreadyBootstrapped)
})
}

func TestRegisterBootstrap_IndexCheckpointFile_Happy(t *testing.T) {
t.Parallel()
log := zerolog.New(io.Discard)
rootHeight := uint64(10000)
unittest.RunWithTempDir(t, func(dir string) {
tries, registerIDs := simpleTrieWithValidRegisterIDs(t)
fileName := "simple-checkpoint"
require.NoErrorf(t, wal.StoreCheckpointV6Concurrently(tries, dir, fileName, log), "fail to store checkpoint")
checkpointFile := path.Join(dir, fileName)
pb, dbDir := createPebbleForTest(t)

bootstrap, err := NewRegisterBootstrap(pb, checkpointFile, rootHeight, log)
require.NoError(t, err)
err = bootstrap.IndexCheckpointFile(context.Background())
require.NoError(t, err)

// create registers instance and check values
reg, err := NewRegisters(pb)
require.NoError(t, err)

require.Equal(t, reg.LatestHeight(), rootHeight)
require.Equal(t, reg.FirstHeight(), rootHeight)

for _, register := range registerIDs {
val, err := reg.Get(*register, rootHeight)
require.NoError(t, err)
require.Equal(t, val, []byte{defaultRegisterValue})
}

require.NoError(t, pb.Close())
require.NoError(t, os.RemoveAll(dbDir))
})
}

func TestRegisterBootstrap_IndexCheckpointFile_Empty(t *testing.T) {
t.Parallel()
log := zerolog.New(io.Discard)
rootHeight := uint64(10000)
unittest.RunWithTempDir(t, func(dir string) {
tries := []*trie.MTrie{trie.NewEmptyMTrie()}
fileName := "empty-checkpoint"
require.NoErrorf(t, wal.StoreCheckpointV6Concurrently(tries, dir, fileName, log), "fail to store checkpoint")
checkpointFile := path.Join(dir, fileName)
pb, dbDir := createPebbleForTest(t)

bootstrap, err := NewRegisterBootstrap(pb, checkpointFile, rootHeight, log)
require.NoError(t, err)
err = bootstrap.IndexCheckpointFile(context.Background())
require.NoError(t, err)

// create registers instance and check values
reg, err := NewRegisters(pb)
require.NoError(t, err)

require.Equal(t, reg.LatestHeight(), rootHeight)
require.Equal(t, reg.FirstHeight(), rootHeight)

require.NoError(t, pb.Close())
require.NoError(t, os.RemoveAll(dbDir))
})
}

func TestRegisterBootstrap_IndexCheckpointFile_FormatIssue(t *testing.T) {
t.Parallel()
pa1 := testutils.PathByUint8(0)
pa2 := testutils.PathByUint8(1)
rootHeight := uint64(666)
pl1 := testutils.LightPayload8('A', 'A')
pl2 := testutils.LightPayload('B', 'B')
paths := []ledger.Path{pa1, pa2}
payloads := []ledger.Payload{*pl1, *pl2}
emptyTrie := trie.NewEmptyMTrie()
trieWithInvalidEntry, _, err := trie.NewTrieWithUpdatedRegisters(emptyTrie, paths, payloads, true)
require.NoError(t, err)
log := zerolog.New(io.Discard)

unittest.RunWithTempDir(t, func(dir string) {
fileName := "invalid-checkpoint"
require.NoErrorf(t, wal.StoreCheckpointV6Concurrently([]*trie.MTrie{trieWithInvalidEntry}, dir, fileName, log),
"fail to store checkpoint")
checkpointFile := path.Join(dir, fileName)
pb, dbDir := createPebbleForTest(t)

bootstrap, err := NewRegisterBootstrap(pb, checkpointFile, rootHeight, log)
require.NoError(t, err)
err = bootstrap.IndexCheckpointFile(context.Background())
require.ErrorContains(t, err, "unexpected ledger key format")
require.NoError(t, pb.Close())
require.NoError(t, os.RemoveAll(dbDir))
})

}

func TestRegisterBootstrap_IndexCheckpointFile_CorruptedCheckpointFile(t *testing.T) {
t.Parallel()
rootHeight := uint64(666)
log := zerolog.New(io.Discard)
unittest.RunWithTempDir(t, func(dir string) {
tries, _ := largeTrieWithValidRegisterIDs(t)
checkpointFileName := "large-checkpoint-incomplete"
require.NoErrorf(t, wal.StoreCheckpointV6Concurrently(tries, dir, checkpointFileName, log), "fail to store checkpoint")
// delete 2nd part of the file (2nd subtrie)
fileToDelete := path.Join(dir, fmt.Sprintf("%v.%03d", checkpointFileName, 2))
err := os.RemoveAll(fileToDelete)
require.NoError(t, err)
pb, dbDir := createPebbleForTest(t)
bootstrap, err := NewRegisterBootstrap(pb, checkpointFileName, rootHeight, log)
require.NoError(t, err)
err = bootstrap.IndexCheckpointFile(context.Background())
require.ErrorIs(t, err, os.ErrNotExist)
require.NoError(t, os.RemoveAll(dbDir))
})
}

func TestRegisterBootstrap_IndexCheckpointFile_MultipleBatch(t *testing.T) {
t.Parallel()
log := zerolog.New(io.Discard)
rootHeight := uint64(10000)
unittest.RunWithTempDir(t, func(dir string) {
tries, registerIDs := largeTrieWithValidRegisterIDs(t)
fileName := "large-checkpoint"
require.NoErrorf(t, wal.StoreCheckpointV6Concurrently(tries, dir, fileName, log), "fail to store checkpoint")
checkpointFile := path.Join(dir, fileName)
pb, dbDir := createPebbleForTest(t)
bootstrap, err := NewRegisterBootstrap(pb, checkpointFile, rootHeight, log)
require.NoError(t, err)
err = bootstrap.IndexCheckpointFile(context.Background())
require.NoError(t, err)

// create registers instance and check values
reg, err := NewRegisters(pb)
require.NoError(t, err)

require.Equal(t, reg.LatestHeight(), rootHeight)
require.Equal(t, reg.FirstHeight(), rootHeight)

for _, register := range registerIDs {
val, err := reg.Get(*register, rootHeight)
require.NoError(t, err)
require.Equal(t, val, []byte{defaultRegisterValue})
}

require.NoError(t, pb.Close())
require.NoError(t, os.RemoveAll(dbDir))
})

}

func simpleTrieWithValidRegisterIDs(t *testing.T) ([]*trie.MTrie, []*flow.RegisterID) {
return trieWithValidRegisterIDs(t, 2)
}

func largeTrieWithValidRegisterIDs(t *testing.T) ([]*trie.MTrie, []*flow.RegisterID) {
// large enough trie so every worker should have something to index
largeTrieSize := 2 * pebbleBootstrapRegisterBatchLen * pebbleBootstrapWorkerCount
return trieWithValidRegisterIDs(t, uint16(largeTrieSize))
}

func trieWithValidRegisterIDs(t *testing.T, n uint16) ([]*trie.MTrie, []*flow.RegisterID) {
emptyTrie := trie.NewEmptyMTrie()
resultRegisterIDs := make([]*flow.RegisterID, 0, n)
paths := randomRegisterPaths(n)
payloads := randomRegisterPayloads(n)
for _, payload := range payloads {
key, err := payload.Key()
require.NoError(t, err)
regID, err := convert.LedgerKeyToRegisterID(key)
require.NoError(t, err)
resultRegisterIDs = append(resultRegisterIDs, &regID)
}
populatedTrie, depth, err := trie.NewTrieWithUpdatedRegisters(emptyTrie, paths, payloads, true)
// make sure it has at least 1 leaf node
require.GreaterOrEqual(t, depth, uint16(1))
require.NoError(t, err)
resultTries := []*trie.MTrie{emptyTrie, populatedTrie}
return resultTries, resultRegisterIDs
}

func randomRegisterPayloads(n uint16) []ledger.Payload {
p := make([]ledger.Payload, 0, n)
for i := uint16(0); i < n; i++ {
o := make([]byte, 0, 8)
o = binary.BigEndian.AppendUint16(o, n)
k := ledger.Key{KeyParts: []ledger.KeyPart{
{Type: convert.KeyPartOwner, Value: o},
{Type: convert.KeyPartKey, Value: o},
}}
// values are always 'v' for ease of testing/checking
v := ledger.Value{defaultRegisterValue}
pl := ledger.NewPayload(k, v)
p = append(p, *pl)
}
return p
}

func randomRegisterPaths(n uint16) []ledger.Path {
p := make([]ledger.Path, 0, n)
for i := uint16(0); i < n; i++ {
p = append(p, testutils.PathByUint16(i))
}
return p
}

func createPebbleForTest(t *testing.T) (*pebble.DB, string) {
dbDir := unittest.TempPebblePath(t)
pb, err := OpenRegisterPebbleDB(dbDir)
require.NoError(t, err)
return pb, dbDir
}
Loading

0 comments on commit 603e204

Please sign in to comment.