Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

channeldb: add optional migration to prune revocation logs #6469

Merged
merged 13 commits into from
Aug 5, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 104 additions & 9 deletions channeldb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/lightningnetwork/lnd/channeldb/migration26"
"github.com/lightningnetwork/lnd/channeldb/migration27"
"github.com/lightningnetwork/lnd/channeldb/migration29"
"github.com/lightningnetwork/lnd/channeldb/migration30"
"github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
"github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/kvdb"
Expand All @@ -45,17 +46,34 @@ var (
// up-to-date version of the database.
type migration func(tx kvdb.RwTx) error

type version struct {
// mandatoryVersion defines a db version that must be applied before the lnd
// starts.
type mandatoryVersion struct {
number uint32
migration migration
}

// optionalMigration defines an optional migration function. When a migration
// is optional, it usually involves a large scale of changes that might touch
// millions of keys. Due to OOM concern, the update cannot be safely done
// within one db transaction. Thus, for optional migrations, they must take the
// db backend and construct transactions as needed.
type optionalMigration func(db kvdb.Backend) error

// optionalVersion defines a db version that can be optionally applied. When
// applying migrations, we must apply all the mandatory migrations first before
// attempting optional ones.
type optionalVersion struct {
name string
migration optionalMigration
}

var (
// dbVersions is storing all versions of database. If current version
// of database don't match with latest version this list will be used
// for retrieving all migration function that are need to apply to the
// current db.
dbVersions = []version{
// dbVersions is storing all mandatory versions of database. If current
// version of database don't match with latest version this list will
// be used for retrieving all migration function that are need to apply
// to the current db.
dbVersions = []mandatoryVersion{
{
// The base DB version requires no migration.
number: 0,
Expand Down Expand Up @@ -237,6 +255,19 @@ var (
},
}

// optionalVersions stores all optional migrations that are applied
// after dbVersions.
//
// NOTE: optional migrations must be fault-tolerant and re-run already
// migrated data must be noop, which means the migration must be able
// to determine its state.
optionalVersions = []optionalVersion{
{
name: "prune revocation log",
migration: migration30.MigrateRevocationLog,
},
}

// Big endian is the preferred byte order, due to cursor scans over
// integer keys iterating in order.
byteOrder = binary.BigEndian
Expand Down Expand Up @@ -337,6 +368,13 @@ func CreateWithBackend(backend kvdb.Backend, modifiers ...OptionModifier) (*DB,
backend.Close()
return nil, err
}

// Grab the optional migration config.
omc := opts.OptionalMiragtionConfig
if err := chanDB.applyOptionalVersions(omc); err != nil {
backend.Close()
return nil, err
}
}

return chanDB, nil
Expand Down Expand Up @@ -1309,7 +1347,7 @@ func (c *ChannelStateDB) DeleteChannelOpeningState(outPoint []byte) error {
// syncVersions function is used for safe db version synchronization. It
// applies migration functions to the current database and recovers the
// previous state of db if at least one error/panic appeared during migration.
func (d *DB) syncVersions(versions []version) error {
func (d *DB) syncVersions(versions []mandatoryVersion) error {
meta, err := d.FetchMeta(nil)
if err != nil {
if err == ErrMetaNotFound {
Expand Down Expand Up @@ -1379,6 +1417,61 @@ func (d *DB) syncVersions(versions []version) error {
}, func() {})
}

// applyOptionalVersions takes a config to determine whether the optional
// migrations will be applied.
//
// NOTE: only support the prune_revocation_log optional migration atm.
func (d *DB) applyOptionalVersions(cfg OptionalMiragtionConfig) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we'd thread through that quit channel here to make the migration resumable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I guess to achieve this, we'll pass the ctx used in BuildDatabase through BuildDatabase -> CreateWithBackend -> applyOptionalVersions -> version.migration and catch the cancel signal there?

om, err := d.fetchOptionalMeta()
if err != nil {
if err == ErrMetaNotFound {
om = &OptionalMeta{
Versions: make(map[uint64]string),
}
} else {
return err
}
}

log.Infof("Checking for optional update: prune_revocation_log=%v, "+
"db_version=%s", cfg.PruneRevocationLog, om)

// Exit early if the optional migration is not specified.
if !cfg.PruneRevocationLog {
return nil
}

// Exit early if the optional migration has already been applied.
if _, ok := om.Versions[0]; ok {
return nil
}

// Get the optional version.
version := optionalVersions[0]
log.Infof("Performing database optional migration: %s", version.name)

// Migrate the data.
if err := version.migration(d); err != nil {
log.Errorf("Unable to apply optional migration: %s, error: %v",
version.name, err)
return err
}

// Update the optional meta. Notice that unlike the mandatory db
// migrations where we perform the migration and updating meta in a
// single db transaction, we use different transactions here. Even when
// the following update is failed, we should be fine here as we would
// re-run the optional migration again, which is a noop, during next
// startup.
om.Versions[0] = version.name
if err := d.putOptionalMeta(om); err != nil {
log.Errorf("Unable to update optional meta: %v", err)
return err
}

return nil
}

// ChannelGraph returns the current instance of the directed channel graph.
func (d *DB) ChannelGraph() *ChannelGraph {
return d.graph
Expand All @@ -1390,13 +1483,15 @@ func (d *DB) ChannelStateDB() *ChannelStateDB {
return d.channelStateDB
}

func getLatestDBVersion(versions []version) uint32 {
func getLatestDBVersion(versions []mandatoryVersion) uint32 {
return versions[len(versions)-1].number
}

// getMigrationsToApply retrieves the migration function that should be
// applied to the database.
func getMigrationsToApply(versions []version, version uint32) ([]migration, []uint32) {
func getMigrationsToApply(versions []mandatoryVersion,
version uint32) ([]migration, []uint32) {

migrations := make([]migration, 0, len(versions))
migrationVersions := make([]uint32, 0, len(versions))

Expand Down
97 changes: 97 additions & 0 deletions channeldb/meta.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package channeldb

import (
"bytes"
"fmt"

"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/tlv"
)

var (
Expand All @@ -12,6 +16,10 @@ var (
// dbVersionKey is a boltdb key and it's used for storing/retrieving
// current database version.
dbVersionKey = []byte("dbp")

// dbVersionKey is a boltdb key and it's used for storing/retrieving
// a list of optional migrations that have been applied.
optionalVersionKey = []byte("ovk")
)

// Meta structure holds the database meta information.
Expand Down Expand Up @@ -80,3 +88,92 @@ func putDbVersion(metaBucket kvdb.RwBucket, meta *Meta) error {
byteOrder.PutUint32(scratch, meta.DbVersionNumber)
return metaBucket.Put(dbVersionKey, scratch)
}

// OptionalMeta structure holds the database optional migration information.
type OptionalMeta struct {
// Versions is a set that contains the versions that have been applied.
// When saved to disk, only the indexes are stored.
Versions map[uint64]string
}

func (om *OptionalMeta) String() string {
s := ""
for index, name := range om.Versions {
s += fmt.Sprintf("%d: %s", index, name)
}
if s == "" {
s = "empty"
}
return s
}

// fetchOptionalMeta reads the optional meta from the database.
func (d *DB) fetchOptionalMeta() (*OptionalMeta, error) {
om := &OptionalMeta{
Versions: make(map[uint64]string),
}

err := kvdb.View(d, func(tx kvdb.RTx) error {
metaBucket := tx.ReadBucket(metaBucket)
if metaBucket == nil {
return ErrMetaNotFound
}

vBytes := metaBucket.Get(optionalVersionKey)
// Exit early if nothing found.
if vBytes == nil {
return nil
}

// Read the versions' length.
r := bytes.NewReader(vBytes)
vLen, err := tlv.ReadVarInt(r, &[8]byte{})
if err != nil {
return err
}

// Write the version index.
for i := uint64(0); i < vLen; i++ {
version, err := tlv.ReadVarInt(r, &[8]byte{})
if err != nil {
return err
}
om.Versions[version] = optionalVersions[i].name
}

return nil
}, func() {})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though we don't know of people running nodes on etcd if they do, it's safer if we reset any external state properly, like in this current block we update the om.Versions map which is external to the closure.

if err != nil {
return nil, err
}

return om, nil
}

// fetchOptionalMeta writes an optional meta to the database.
func (d *DB) putOptionalMeta(om *OptionalMeta) error {
return kvdb.Update(d, func(tx kvdb.RwTx) error {
metaBucket, err := tx.CreateTopLevelBucket(metaBucket)
if err != nil {
return err
}

var b bytes.Buffer

// Write the total length.
err = tlv.WriteVarInt(&b, uint64(len(om.Versions)), &[8]byte{})
if err != nil {
return err
}

// Write the version indexes.
for v := range om.Versions {
err := tlv.WriteVarInt(&b, v, &[8]byte{})
if err != nil {
return err
}
}

return metaBucket.Put(optionalVersionKey, b.Bytes())
}, func() {})
}
86 changes: 84 additions & 2 deletions channeldb/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func applyMigration(t *testing.T, beforeMigration, afterMigration func(d *DB),
t.Fatalf("unable to store meta data: %v", err)
}

versions := []version{
versions := []mandatoryVersion{
{
number: 0,
migration: nil,
Expand Down Expand Up @@ -124,7 +124,7 @@ func TestOrderOfMigrations(t *testing.T) {
t.Parallel()

appliedMigration := -1
versions := []version{
versions := []mandatoryVersion{
{0, nil},
{1, nil},
{2, func(tx kvdb.RwTx) error {
Expand Down Expand Up @@ -498,3 +498,85 @@ func TestMigrationDryRun(t *testing.T) {
true,
true)
}

// TestOptionalMeta checks the basic read and write for the optional meta.
func TestOptionalMeta(t *testing.T) {
t.Parallel()

db, cleanUp, err := MakeTestDB()
defer cleanUp()
require.NoError(t, err)

// Test read an empty optional meta.
om, err := db.fetchOptionalMeta()
require.NoError(t, err, "error getting optional meta")
require.Empty(t, om.Versions, "expected empty versions")

// Test write an optional meta.
om = &OptionalMeta{
Versions: map[uint64]string{
0: optionalVersions[0].name,
},
}
err = db.putOptionalMeta(om)
require.NoError(t, err, "error putting optional meta")

om1, err := db.fetchOptionalMeta()
require.NoError(t, err, "error getting optional meta")
require.Equal(t, om, om1, "unexpected empty versions")
require.Equal(t, "0: prune revocation log", om.String())
}

// TestApplyOptionalVersions checks that the optional migration is applied as
// expected based on the config.
func TestApplyOptionalVersions(t *testing.T) {
t.Parallel()

db, cleanUp, err := MakeTestDB()
defer cleanUp()
require.NoError(t, err)

// Overwrite the migration function so we can count how many times the
// migration has happened.
migrateCount := 0
optionalVersions[0].migration = func(_ kvdb.Backend) error {
migrateCount++
return nil
}

// Test that when the flag is false, no migration happens.
cfg := OptionalMiragtionConfig{}
err = db.applyOptionalVersions(cfg)
require.NoError(t, err, "failed to apply optional migration")
require.Equal(t, 0, migrateCount, "expected no migration")

// Check the optional meta is not updated.
om, err := db.fetchOptionalMeta()
require.NoError(t, err, "error getting optional meta")
require.Empty(t, om.Versions, "expected empty versions")

// Test that when specified, the optional migration is applied.
cfg.PruneRevocationLog = true
err = db.applyOptionalVersions(cfg)
require.NoError(t, err, "failed to apply optional migration")
require.Equal(t, 1, migrateCount, "expected migration")

// Fetch the updated optional meta.
om, err = db.fetchOptionalMeta()
require.NoError(t, err, "error getting optional meta")

// Verify that the optional meta is updated as expected.
omExpected := &OptionalMeta{
Versions: map[uint64]string{
0: optionalVersions[0].name,
},
}
require.Equal(t, omExpected, om, "unexpected empty versions")

// Test that though specified, the optional migration is not run since
// it's already been applied.
cfg.PruneRevocationLog = true
err = db.applyOptionalVersions(cfg)
require.NoError(t, err, "failed to apply optional migration")
require.Equal(t, 1, migrateCount, "expected no migration")
}
Loading