diff --git a/channeldb/db.go b/channeldb/db.go index 998ddf9cef..c30e6bdee6 100644 --- a/channeldb/db.go +++ b/channeldb/db.go @@ -23,6 +23,7 @@ import ( "github.com/lightningnetwork/lnd/channeldb/migration26" "github.com/lightningnetwork/lnd/channeldb/migration27" "github.com/lightningnetwork/lnd/channeldb/migration29" + "github.com/lightningnetwork/lnd/channeldb/migration30" "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11" "github.com/lightningnetwork/lnd/clock" "github.com/lightningnetwork/lnd/kvdb" @@ -45,17 +46,34 @@ var ( // up-to-date version of the database. type migration func(tx kvdb.RwTx) error -type version struct { +// mandatoryVersion defines a db version that must be applied before the lnd +// starts. +type mandatoryVersion struct { number uint32 migration migration } +// optionalMigration defines an optional migration function. When a migration +// is optional, it usually involves a large scale of changes that might touch +// millions of keys. Due to OOM concern, the update cannot be safely done +// within one db transaction. Thus, for optional migrations, they must take the +// db backend and construct transactions as needed. +type optionalMigration func(db kvdb.Backend) error + +// optionalVersion defines a db version that can be optionally applied. When +// applying migrations, we must apply all the mandatory migrations first before +// attempting optional ones. +type optionalVersion struct { + name string + migration optionalMigration +} + var ( - // dbVersions is storing all versions of database. If current version - // of database don't match with latest version this list will be used - // for retrieving all migration function that are need to apply to the - // current db. - dbVersions = []version{ + // dbVersions is storing all mandatory versions of database. If current + // version of database don't match with latest version this list will + // be used for retrieving all migration function that are need to apply + // to the current db. + dbVersions = []mandatoryVersion{ { // The base DB version requires no migration. number: 0, @@ -237,6 +255,19 @@ var ( }, } + // optionalVersions stores all optional migrations that are applied + // after dbVersions. + // + // NOTE: optional migrations must be fault-tolerant and re-run already + // migrated data must be noop, which means the migration must be able + // to determine its state. + optionalVersions = []optionalVersion{ + { + name: "prune revocation log", + migration: migration30.MigrateRevocationLog, + }, + } + // Big endian is the preferred byte order, due to cursor scans over // integer keys iterating in order. byteOrder = binary.BigEndian @@ -337,6 +368,13 @@ func CreateWithBackend(backend kvdb.Backend, modifiers ...OptionModifier) (*DB, backend.Close() return nil, err } + + // Grab the optional migration config. + omc := opts.OptionalMiragtionConfig + if err := chanDB.applyOptionalVersions(omc); err != nil { + backend.Close() + return nil, err + } } return chanDB, nil @@ -1309,7 +1347,7 @@ func (c *ChannelStateDB) DeleteChannelOpeningState(outPoint []byte) error { // syncVersions function is used for safe db version synchronization. It // applies migration functions to the current database and recovers the // previous state of db if at least one error/panic appeared during migration. -func (d *DB) syncVersions(versions []version) error { +func (d *DB) syncVersions(versions []mandatoryVersion) error { meta, err := d.FetchMeta(nil) if err != nil { if err == ErrMetaNotFound { @@ -1379,6 +1417,69 @@ func (d *DB) syncVersions(versions []version) error { }, func() {}) } +// applyOptionalVersions takes a config to determine whether the optional +// migrations will be applied. +// +// NOTE: only support the prune_revocation_log optional migration atm. +func (d *DB) applyOptionalVersions(cfg OptionalMiragtionConfig) error { + // TODO(yy): need to design the db to support dry run for optional + // migrations. + if d.dryRun { + log.Info("Skipped optional migrations as dry run mode is not " + + "supported yet") + return nil + } + + om, err := d.fetchOptionalMeta() + if err != nil { + if err == ErrMetaNotFound { + om = &OptionalMeta{ + Versions: make(map[uint64]string), + } + } else { + return err + } + } + + log.Infof("Checking for optional update: prune_revocation_log=%v, "+ + "db_version=%s", cfg.PruneRevocationLog, om) + + // Exit early if the optional migration is not specified. + if !cfg.PruneRevocationLog { + return nil + } + + // Exit early if the optional migration has already been applied. + if _, ok := om.Versions[0]; ok { + return nil + } + + // Get the optional version. + version := optionalVersions[0] + log.Infof("Performing database optional migration: %s", version.name) + + // Migrate the data. + if err := version.migration(d); err != nil { + log.Errorf("Unable to apply optional migration: %s, error: %v", + version.name, err) + return err + } + + // Update the optional meta. Notice that unlike the mandatory db + // migrations where we perform the migration and updating meta in a + // single db transaction, we use different transactions here. Even when + // the following update is failed, we should be fine here as we would + // re-run the optional migration again, which is a noop, during next + // startup. + om.Versions[0] = version.name + if err := d.putOptionalMeta(om); err != nil { + log.Errorf("Unable to update optional meta: %v", err) + return err + } + + return nil +} + // ChannelGraph returns the current instance of the directed channel graph. func (d *DB) ChannelGraph() *ChannelGraph { return d.graph @@ -1390,13 +1491,15 @@ func (d *DB) ChannelStateDB() *ChannelStateDB { return d.channelStateDB } -func getLatestDBVersion(versions []version) uint32 { +func getLatestDBVersion(versions []mandatoryVersion) uint32 { return versions[len(versions)-1].number } // getMigrationsToApply retrieves the migration function that should be // applied to the database. -func getMigrationsToApply(versions []version, version uint32) ([]migration, []uint32) { +func getMigrationsToApply(versions []mandatoryVersion, + version uint32) ([]migration, []uint32) { + migrations := make([]migration, 0, len(versions)) migrationVersions := make([]uint32, 0, len(versions)) diff --git a/channeldb/log.go b/channeldb/log.go index 42b55ccf1b..2e2703ccd8 100644 --- a/channeldb/log.go +++ b/channeldb/log.go @@ -8,6 +8,7 @@ import ( "github.com/lightningnetwork/lnd/channeldb/migration13" "github.com/lightningnetwork/lnd/channeldb/migration16" "github.com/lightningnetwork/lnd/channeldb/migration24" + "github.com/lightningnetwork/lnd/channeldb/migration30" "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11" "github.com/lightningnetwork/lnd/kvdb" ) @@ -38,5 +39,6 @@ func UseLogger(logger btclog.Logger) { migration13.UseLogger(logger) migration16.UseLogger(logger) migration24.UseLogger(logger) + migration30.UseLogger(logger) kvdb.UseLogger(logger) } diff --git a/channeldb/meta.go b/channeldb/meta.go index df4b0df349..9173224b84 100644 --- a/channeldb/meta.go +++ b/channeldb/meta.go @@ -1,7 +1,11 @@ package channeldb import ( + "bytes" + "fmt" + "github.com/lightningnetwork/lnd/kvdb" + "github.com/lightningnetwork/lnd/tlv" ) var ( @@ -12,6 +16,10 @@ var ( // dbVersionKey is a boltdb key and it's used for storing/retrieving // current database version. dbVersionKey = []byte("dbp") + + // dbVersionKey is a boltdb key and it's used for storing/retrieving + // a list of optional migrations that have been applied. + optionalVersionKey = []byte("ovk") ) // Meta structure holds the database meta information. @@ -80,3 +88,92 @@ func putDbVersion(metaBucket kvdb.RwBucket, meta *Meta) error { byteOrder.PutUint32(scratch, meta.DbVersionNumber) return metaBucket.Put(dbVersionKey, scratch) } + +// OptionalMeta structure holds the database optional migration information. +type OptionalMeta struct { + // Versions is a set that contains the versions that have been applied. + // When saved to disk, only the indexes are stored. + Versions map[uint64]string +} + +func (om *OptionalMeta) String() string { + s := "" + for index, name := range om.Versions { + s += fmt.Sprintf("%d: %s", index, name) + } + if s == "" { + s = "empty" + } + return s +} + +// fetchOptionalMeta reads the optional meta from the database. +func (d *DB) fetchOptionalMeta() (*OptionalMeta, error) { + om := &OptionalMeta{ + Versions: make(map[uint64]string), + } + + err := kvdb.View(d, func(tx kvdb.RTx) error { + metaBucket := tx.ReadBucket(metaBucket) + if metaBucket == nil { + return ErrMetaNotFound + } + + vBytes := metaBucket.Get(optionalVersionKey) + // Exit early if nothing found. + if vBytes == nil { + return nil + } + + // Read the versions' length. + r := bytes.NewReader(vBytes) + vLen, err := tlv.ReadVarInt(r, &[8]byte{}) + if err != nil { + return err + } + + // Write the version index. + for i := uint64(0); i < vLen; i++ { + version, err := tlv.ReadVarInt(r, &[8]byte{}) + if err != nil { + return err + } + om.Versions[version] = optionalVersions[i].name + } + + return nil + }, func() {}) + if err != nil { + return nil, err + } + + return om, nil +} + +// fetchOptionalMeta writes an optional meta to the database. +func (d *DB) putOptionalMeta(om *OptionalMeta) error { + return kvdb.Update(d, func(tx kvdb.RwTx) error { + metaBucket, err := tx.CreateTopLevelBucket(metaBucket) + if err != nil { + return err + } + + var b bytes.Buffer + + // Write the total length. + err = tlv.WriteVarInt(&b, uint64(len(om.Versions)), &[8]byte{}) + if err != nil { + return err + } + + // Write the version indexes. + for v := range om.Versions { + err := tlv.WriteVarInt(&b, v, &[8]byte{}) + if err != nil { + return err + } + } + + return metaBucket.Put(optionalVersionKey, b.Bytes()) + }, func() {}) +} diff --git a/channeldb/meta_test.go b/channeldb/meta_test.go index a2366cfc37..830cee5ba5 100644 --- a/channeldb/meta_test.go +++ b/channeldb/meta_test.go @@ -44,7 +44,7 @@ func applyMigration(t *testing.T, beforeMigration, afterMigration func(d *DB), t.Fatalf("unable to store meta data: %v", err) } - versions := []version{ + versions := []mandatoryVersion{ { number: 0, migration: nil, @@ -124,7 +124,7 @@ func TestOrderOfMigrations(t *testing.T) { t.Parallel() appliedMigration := -1 - versions := []version{ + versions := []mandatoryVersion{ {0, nil}, {1, nil}, {2, func(tx kvdb.RwTx) error { @@ -498,3 +498,85 @@ func TestMigrationDryRun(t *testing.T) { true, true) } + +// TestOptionalMeta checks the basic read and write for the optional meta. +func TestOptionalMeta(t *testing.T) { + t.Parallel() + + db, cleanUp, err := MakeTestDB() + defer cleanUp() + require.NoError(t, err) + + // Test read an empty optional meta. + om, err := db.fetchOptionalMeta() + require.NoError(t, err, "error getting optional meta") + require.Empty(t, om.Versions, "expected empty versions") + + // Test write an optional meta. + om = &OptionalMeta{ + Versions: map[uint64]string{ + 0: optionalVersions[0].name, + }, + } + err = db.putOptionalMeta(om) + require.NoError(t, err, "error putting optional meta") + + om1, err := db.fetchOptionalMeta() + require.NoError(t, err, "error getting optional meta") + require.Equal(t, om, om1, "unexpected empty versions") + require.Equal(t, "0: prune revocation log", om.String()) +} + +// TestApplyOptionalVersions checks that the optional migration is applied as +// expected based on the config. +func TestApplyOptionalVersions(t *testing.T) { + t.Parallel() + + db, cleanUp, err := MakeTestDB() + defer cleanUp() + require.NoError(t, err) + + // Overwrite the migration function so we can count how many times the + // migration has happened. + migrateCount := 0 + optionalVersions[0].migration = func(_ kvdb.Backend) error { + migrateCount++ + return nil + } + + // Test that when the flag is false, no migration happens. + cfg := OptionalMiragtionConfig{} + err = db.applyOptionalVersions(cfg) + require.NoError(t, err, "failed to apply optional migration") + require.Equal(t, 0, migrateCount, "expected no migration") + + // Check the optional meta is not updated. + om, err := db.fetchOptionalMeta() + require.NoError(t, err, "error getting optional meta") + require.Empty(t, om.Versions, "expected empty versions") + + // Test that when specified, the optional migration is applied. + cfg.PruneRevocationLog = true + err = db.applyOptionalVersions(cfg) + require.NoError(t, err, "failed to apply optional migration") + require.Equal(t, 1, migrateCount, "expected migration") + + // Fetch the updated optional meta. + om, err = db.fetchOptionalMeta() + require.NoError(t, err, "error getting optional meta") + + // Verify that the optional meta is updated as expected. + omExpected := &OptionalMeta{ + Versions: map[uint64]string{ + 0: optionalVersions[0].name, + }, + } + require.Equal(t, omExpected, om, "unexpected empty versions") + + // Test that though specified, the optional migration is not run since + // it's already been applied. + cfg.PruneRevocationLog = true + err = db.applyOptionalVersions(cfg) + require.NoError(t, err, "failed to apply optional migration") + require.Equal(t, 1, migrateCount, "expected no migration") +} diff --git a/channeldb/migration30/iterator.go b/channeldb/migration30/iterator.go new file mode 100644 index 0000000000..d4e08cbf4c --- /dev/null +++ b/channeldb/migration30/iterator.go @@ -0,0 +1,338 @@ +package migration30 + +import ( + "bytes" + "encoding/binary" + "errors" + "fmt" + + mig25 "github.com/lightningnetwork/lnd/channeldb/migration25" + + "github.com/lightningnetwork/lnd/kvdb" +) + +var ( + // openChanBucket stores all the currently open channels. This bucket + // has a second, nested bucket which is keyed by a node's ID. Within + // that node ID bucket, all attributes required to track, update, and + // close a channel are stored. + openChannelBucket = []byte("open-chan-bucket") + + // errExit is returned when the callback function used in iterator + // needs to exit the iteration. + errExit = errors.New("exit condition met") +) + +// updateLocator defines a locator that can be used to find the next record to +// be migrated. This is useful when an interrupted migration that leads to a +// mixed revocation log formats saved in our database, we can then restart the +// migration using the locator to continue migrating the rest. +type updateLocator struct { + // nodePub, chainHash and fundingOutpoint are used to locate the + // channel bucket. + nodePub []byte + chainHash []byte + fundingOutpoint []byte + + // nextHeight is used to locate the next old revocation log to be + // migrated. A nil value means we've finished the migration. + nextHeight []byte +} + +// fetchChanBucket is a helper function that returns the bucket where a +// channel's data resides in given: the public key for the node, the outpoint, +// and the chainhash that the channel resides on. +func (ul *updateLocator) locateChanBucket(rootBucket kvdb.RwBucket) ( + kvdb.RwBucket, error) { + + // Within this top level bucket, fetch the bucket dedicated to storing + // open channel data specific to the remote node. + nodeChanBucket := rootBucket.NestedReadWriteBucket(ul.nodePub) + if nodeChanBucket == nil { + return nil, mig25.ErrNoActiveChannels + } + + // We'll then recurse down an additional layer in order to fetch the + // bucket for this particular chain. + chainBucket := nodeChanBucket.NestedReadWriteBucket(ul.chainHash) + if chainBucket == nil { + return nil, mig25.ErrNoActiveChannels + } + + // With the bucket for the node and chain fetched, we can now go down + // another level, for this channel itself. + chanBucket := chainBucket.NestedReadWriteBucket(ul.fundingOutpoint) + if chanBucket == nil { + return nil, mig25.ErrChannelNotFound + } + + return chanBucket, nil +} + +// findNextMigrateHeight finds the next commit height that's not migrated. It +// returns the commit height bytes found. A nil return value means the +// migration has been completed for this particular channel bucket. +func findNextMigrateHeight(chanBucket kvdb.RwBucket) []byte { + // Read the old log bucket. The old bucket doesn't exist, indicating + // either we don't have any old logs for this channel, or the migration + // has been finished and the old bucket has been deleted. + oldBucket := chanBucket.NestedReadBucket( + revocationLogBucketDeprecated, + ) + if oldBucket == nil { + return nil + } + + // Acquire a read cursor for the old bucket. + oldCursor := oldBucket.ReadCursor() + + // Read the new log bucket. The sub-bucket hasn't been created yet, + // indicating we haven't migrated any logs under this channel. In this + // case, we'll return the first commit height found from the old + // revocation log bucket as the next height. + logBucket := chanBucket.NestedReadBucket(revocationLogBucket) + if logBucket == nil { + nextHeight, _ := oldCursor.First() + return nextHeight + } + + // Acquire a read cursor for the new bucket. + cursor := logBucket.ReadCursor() + + // Read the last migrated record. If the key is nil, we haven't + // migrated any logs yet. In this case we return the first commit + // height found from the old revocation log bucket. For instance, + // - old log: [1, 2] + // - new log: [] + // We will return the first key [1]. + migratedHeight, _ := cursor.Last() + if migratedHeight == nil { + nextHeight, _ := oldCursor.First() + return nextHeight + } + + // Read the last height from the old log bucket. + endHeight, _ := oldCursor.Last() + + switch bytes.Compare(migratedHeight, endHeight) { + // If the height of the last old revocation equals to the migrated + // height, we've done migrating for this channel. For instance, + // - old log: [1, 2] + // - new log: [1, 2] + case 0: + return nil + + // If the migrated height is smaller, it means this is a resumed + // migration. In this case we will return the next height found in the + // old bucket. For instance, + // - old log: [1, 2] + // - new log: [1] + // We will return the key [2]. + case -1: + // Now point the cursor to the migratedHeight. If we cannot + // find this key from the old log bucket, the database might be + // corrupted. In this case, we would return the first key so + // that we would redo the migration for this chan bucket. + matchedHeight, _ := oldCursor.Seek(migratedHeight) + + // NOTE: because Seek will return the next key when the passed + // key cannot be found, we need to compare the `matchedHeight` + // to decide whether `migratedHeight` is found or not. + if !bytes.Equal(matchedHeight, migratedHeight) { + log.Warnf("Old revocation bucket doesn't have "+ + "CommitHeight=%v yet it's found in the new "+ + "bucket. It's likely the new revocation log "+ + "bucket is corrupted. Migrations will be"+ + "applied again.", + binary.BigEndian.Uint64(migratedHeight)) + + // Now return the first height found in the old bucket + // so we can redo the migration. + nextHeight, _ := oldCursor.First() + return nextHeight + } + + // Otherwise, find the next height to be migrated. + nextHeight, _ := oldCursor.Next() + return nextHeight + + // If the migrated height is greater, it means this node has new logs + // saved after v0.15.0. In this case, we need to further decide whether + // the old logs have been migrated or not. + case 1: + } + + // If we ever reached here, it means we have a mixed of new and old + // logs saved. Suppose we have old logs as, + // - old log: [1, 2] + // We'd have four possible scenarios, + // - new log: [ 3, 4] <- no migration happened, return [1]. + // - new log: [1, 3, 4] <- resumed migration, return [2]. + // - new log: [ 2, 3, 4] <- corrupted migration, return [1]. + // - new log: [1, 2, 3, 4] <- finished migration, return nil. + // To find the next migration height, we will iterate the old logs to + // grab the heights and query them in the new bucket until an height + // cannot be found, which is our next migration height. Or, if the old + // heights can all be found, it indicates a finished migration. + + // Move the cursor to the first record. + oldKey, _ := oldCursor.First() + + // NOTE: this action can be time-consuming as we are iterating the + // records and compare them. However, we would only ever hit here if + // this is a resumed migration with new logs created after v.0.15.0. + for { + // Try to locate the old key in the new bucket. If it cannot be + // found, it will be the next migrate height. + newKey, _ := cursor.Seek(oldKey) + + // If the old key is not found in the new bucket, return it as + // our next migration height. + // + // NOTE: because Seek will return the next key when the passed + // key cannot be found, we need to compare the keys to deicde + // whether the old key is found or not. + if !bytes.Equal(newKey, oldKey) { + return oldKey + } + + // Otherwise, keep iterating the old bucket. + oldKey, _ = oldCursor.Next() + + // If we've done iterating, yet all the old keys can be found + // in the new bucket, this means the migration has been + // finished. + if oldKey == nil { + return nil + } + } +} + +// locateNextUpdateNum returns a locator that's used to start our migration. A +// nil locator means the migration has been finished. +func locateNextUpdateNum(openChanBucket kvdb.RwBucket) (*updateLocator, error) { + locator := &updateLocator{} + + // cb is the callback function to be used when iterating the buckets. + cb := func(chanBucket kvdb.RwBucket, l *updateLocator) error { + locator = l + + updateNum := findNextMigrateHeight(chanBucket) + + // We've found the next commit height and can now exit. + if updateNum != nil { + locator.nextHeight = updateNum + return errExit + } + return nil + } + + // Iterate the buckets. If we received an exit signal, return the + // locator. + err := iterateBuckets(openChanBucket, nil, cb) + if err == errExit { + log.Debugf("found locator: nodePub=%x, fundingOutpoint=%x, "+ + "nextHeight=%x", locator.nodePub, locator.chainHash, + locator.nextHeight) + return locator, nil + } + + // If the err is nil, we've iterated all the sub-buckets and the + // migration is finished. + return nil, err +} + +// callback defines a type that's used by the iterator. +type callback func(k, v []byte) error + +// iterator is a helper function that iterates a given bucket and performs the +// callback function on each key. If a seeker is specified, it will move the +// cursor to the given position otherwise it will start from the first item. +func iterator(bucket kvdb.RBucket, seeker []byte, cb callback) error { + c := bucket.ReadCursor() + k, v := c.First() + + // Move the cursor to the specified position if seeker is non-nil. + if seeker != nil { + k, v = c.Seek(seeker) + } + + // Start the iteration and exit on condition. + for k, v := k, v; k != nil; k, v = c.Next() { + // cb might return errExit to signal exiting the iteration. + if err := cb(k, v); err != nil { + return err + } + } + return nil +} + +// step defines the callback type that's used when iterating the buckets. +type step func(bucket kvdb.RwBucket, l *updateLocator) error + +// iterateBuckets locates the cursor at a given position specified by the +// updateLocator and starts the iteration. If a nil locator is passed, it will +// start the iteration from the beginning. During each iteration, the callback +// function is called and it may exit the iteration when the callback returns +// an errExit to signal an exit condition. +func iterateBuckets(openChanBucket kvdb.RwBucket, + l *updateLocator, cb step) error { + + // If the locator is nil, we will initiate an empty one, which is + // further used by the iterator. + if l == nil { + l = &updateLocator{} + } + + // iterChanBucket iterates the chain bucket to act on each of the + // channel buckets. + iterChanBucket := func(chain kvdb.RwBucket, + k1, k2, _ []byte, cb step) error { + + return iterator( + chain, l.fundingOutpoint, + func(k3, _ []byte) error { + // Read the sub-bucket level 3. + chanBucket := chain.NestedReadWriteBucket(k3) + if chanBucket == nil { + return fmt.Errorf("no bucket for "+ + "chanPoint=%x", k3) + } + + // Construct a new locator at this position. + locator := &updateLocator{ + nodePub: k1, + chainHash: k2, + fundingOutpoint: k3, + } + + // Set the seeker to nil so it won't affect + // other buckets. + l.fundingOutpoint = nil + + return cb(chanBucket, locator) + }) + } + + return iterator(openChanBucket, l.nodePub, func(k1, v []byte) error { + // Read the sub-bucket level 1. + node := openChanBucket.NestedReadWriteBucket(k1) + if node == nil { + return fmt.Errorf("no bucket for node %x", k1) + } + + return iterator(node, l.chainHash, func(k2, v []byte) error { + // Read the sub-bucket level 2. + chain := node.NestedReadWriteBucket(k2) + if chain == nil { + return fmt.Errorf("no bucket for chain=%x", k2) + } + + // Set the seeker to nil so it won't affect other + // buckets. + l.chainHash = nil + + return iterChanBucket(chain, k1, k2, v, cb) + }) + }) +} diff --git a/channeldb/migration30/iterator_test.go b/channeldb/migration30/iterator_test.go new file mode 100644 index 0000000000..7593124f52 --- /dev/null +++ b/channeldb/migration30/iterator_test.go @@ -0,0 +1,697 @@ +package migration30 + +import ( + "bytes" + "testing" + + "github.com/btcsuite/btcd/btcutil" + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" + "github.com/lightningnetwork/lnd/channeldb/migtest" + "github.com/lightningnetwork/lnd/kvdb" + "github.com/stretchr/testify/require" + + lnwire "github.com/lightningnetwork/lnd/channeldb/migration/lnwire21" + mig25 "github.com/lightningnetwork/lnd/channeldb/migration25" + mig26 "github.com/lightningnetwork/lnd/channeldb/migration26" + mig "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11" +) + +var ( + testRefundTimeout = uint32(740_000) + testIncoming = true + testRHash = bytes.Repeat([]byte{1}, 32) + + testOutputIndex = int32(0) + testHTLCAmt = lnwire.MilliSatoshi(1000_000) + testLocalAmt = btcutil.Amount(10_000) + testRemoteAmt = btcutil.Amount(20_000) + + testTx = &wire.MsgTx{ + Version: 1, + TxIn: []*wire.TxIn{ + { + PreviousOutPoint: wire.OutPoint{ + Hash: chainhash.Hash{}, + Index: 0xffffffff, + }, + Sequence: 0xffffffff, + }, + }, + TxOut: []*wire.TxOut{ + {Value: int64(testHTLCAmt.ToSatoshis())}, + {Value: int64(testLocalAmt)}, + {Value: int64(testRemoteAmt)}, + }, + LockTime: 5, + } +) + +// TestLocateChanBucket checks that the updateLocator can successfully locate a +// chanBucket or returns an error. +func TestLocateChanBucket(t *testing.T) { + t.Parallel() + + // Create test database. + cdb, cleanUp, err := migtest.MakeDB() + defer cleanUp() + require.NoError(t, err) + + // Create a test channel. + c := createTestChannel(nil) + + var buf bytes.Buffer + require.NoError(t, mig.WriteOutpoint(&buf, &c.FundingOutpoint)) + + // Prepare the info needed to query the bucket. + nodePub := c.IdentityPub.SerializeCompressed() + chainHash := c.ChainHash[:] + cp := buf.Bytes() + + // Create test buckets. + err = kvdb.Update(cdb, func(tx kvdb.RwTx) error { + _, err := mig25.CreateChanBucket(tx, &c.OpenChannel) + if err != nil { + return err + } + return nil + }, func() {}) + require.NoError(t, err) + + // testLocator is a helper closure that tests a given locator's + // locateChanBucket method. + testLocator := func(l *updateLocator) error { + return kvdb.Update(cdb, func(tx kvdb.RwTx) error { + rootBucket := tx.ReadWriteBucket(openChannelBucket) + _, err := l.locateChanBucket(rootBucket) + return err + }, func() {}) + } + + testCases := []struct { + name string + locator *updateLocator + expectedErr error + }{ + { + name: "empty node pub key", + locator: &updateLocator{}, + expectedErr: mig25.ErrNoActiveChannels, + }, + { + name: "empty chainhash", + locator: &updateLocator{ + nodePub: nodePub, + }, + expectedErr: mig25.ErrNoActiveChannels, + }, + { + name: "empty funding outpoint", + locator: &updateLocator{ + nodePub: nodePub, + chainHash: chainHash, + }, + expectedErr: mig25.ErrChannelNotFound, + }, + { + name: "successful query", + locator: &updateLocator{ + nodePub: nodePub, + chainHash: chainHash, + fundingOutpoint: cp, + }, + expectedErr: nil, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + err := testLocator(tc.locator) + require.Equal(t, tc.expectedErr, err) + }) + } +} + +// TestFindNextMigrateHeight checks that given a channel bucket, we can +// successfully find the next un-migrated commit height. +func TestFindNextMigrateHeight(t *testing.T) { + t.Parallel() + + // Create test database. + cdb, cleanUp, err := migtest.MakeDB() + defer cleanUp() + require.NoError(t, err) + + // tester is a helper closure that finds the next migration height. + tester := func(c *mig26.OpenChannel) []byte { + var height []byte + err := kvdb.Update(cdb, func(tx kvdb.RwTx) error { + chanBucket, err := mig25.FetchChanBucket( + tx, &c.OpenChannel, + ) + if err != nil { + return err + } + + height = findNextMigrateHeight(chanBucket) + return nil + }, func() {}) + require.NoError(t, err) + + return height + } + + testCases := []struct { + name string + oldLogs []mig.ChannelCommitment + newLogs []mig.ChannelCommitment + expectedHeight []byte + }{ + { + // When we don't have any old logs, our next migration + // height would be nil. + name: "empty old logs", + expectedHeight: nil, + }, + { + // When we don't have any migrated logs, our next + // migration height would be the first height found in + // the old logs. + name: "empty migrated logs", + oldLogs: []mig.ChannelCommitment{ + createDummyChannelCommit(1), + createDummyChannelCommit(2), + }, + expectedHeight: []byte{0, 0, 0, 0, 0, 0, 0, 1}, + }, + { + // When we have migrated logs, the next migration + // height should be the first height found in the old + // logs but not in the migrated logs. + name: "have migrated logs", + oldLogs: []mig.ChannelCommitment{ + createDummyChannelCommit(1), + createDummyChannelCommit(2), + }, + newLogs: []mig.ChannelCommitment{ + createDummyChannelCommit(1), + }, + expectedHeight: []byte{0, 0, 0, 0, 0, 0, 0, 2}, + }, + { + // When both the logs have equal indexes, the next + // migration should be nil as we've finished migrating + // for this bucket. + name: "have finished logs", + oldLogs: []mig.ChannelCommitment{ + createDummyChannelCommit(1), + createDummyChannelCommit(2), + }, + newLogs: []mig.ChannelCommitment{ + createDummyChannelCommit(1), + createDummyChannelCommit(2), + }, + expectedHeight: nil, + }, + { + // When there are new logs saved in the new bucket, + // which happens when the node is running with + // v.0.15.0, and we don't have any migrated logs, the + // next migration height should be the first height + // found in the old bucket. + name: "have new logs but no migrated logs", + oldLogs: []mig.ChannelCommitment{ + createDummyChannelCommit(1), + createDummyChannelCommit(2), + }, + newLogs: []mig.ChannelCommitment{ + createDummyChannelCommit(3), + createDummyChannelCommit(4), + }, + expectedHeight: []byte{0, 0, 0, 0, 0, 0, 0, 1}, + }, + { + // When there are new logs saved in the new bucket, + // which happens when the node is running with + // v.0.15.0, and we have migrated logs, the returned + // value should be the next un-migrated height. + name: "have new logs and migrated logs", + oldLogs: []mig.ChannelCommitment{ + createDummyChannelCommit(1), + createDummyChannelCommit(2), + }, + newLogs: []mig.ChannelCommitment{ + createDummyChannelCommit(1), + createDummyChannelCommit(3), + createDummyChannelCommit(4), + }, + expectedHeight: []byte{0, 0, 0, 0, 0, 0, 0, 2}, + }, + { + // When there are new logs saved in the new bucket, + // which happens when the node is running with + // v.0.15.0, and we have corrupted logs, the returned + // value should be the first height in the old bucket. + name: "have new logs but missing logs", + oldLogs: []mig.ChannelCommitment{ + createDummyChannelCommit(1), + createDummyChannelCommit(2), + }, + newLogs: []mig.ChannelCommitment{ + createDummyChannelCommit(2), + createDummyChannelCommit(3), + createDummyChannelCommit(4), + }, + expectedHeight: []byte{0, 0, 0, 0, 0, 0, 0, 1}, + }, + { + // When there are new logs saved in the new bucket, + // which happens when the node is running with + // v.0.15.0, and we have finished the migration, we + // expect a nil height to be returned. + name: "have new logs and finished logs", + oldLogs: []mig.ChannelCommitment{ + createDummyChannelCommit(1), + createDummyChannelCommit(2), + }, + newLogs: []mig.ChannelCommitment{ + createDummyChannelCommit(1), + createDummyChannelCommit(2), + createDummyChannelCommit(3), + createDummyChannelCommit(4), + }, + expectedHeight: nil, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + // Create a test channel. + c := createTestChannel(nil) + + // Setup the database. + err := setupTestLogs(cdb, c, tc.oldLogs, tc.newLogs) + require.NoError(t, err) + + // Run the test and check the expected next migration + // height is returned. + height := tester(c) + require.Equal(t, tc.expectedHeight, height) + }) + } +} + +// TestIterator checks that the iterator iterate the given bucket correctly. +func TestIterator(t *testing.T) { + t.Parallel() + + // Create test database. + cdb, cleanUp, err := migtest.MakeDB() + defer cleanUp() + require.NoError(t, err) + + // exitKey is used to signal exit when hitting this key. + exitKey := []byte{1} + + // seekKey is used to position the cursor. + seekKey := []byte{2} + + // endKey is the last key saved in the test bucket. + endKey := []byte{3} + + // Create test bucket. + bucketName := []byte("test-bucket") + err = kvdb.Update(cdb, func(tx kvdb.RwTx) error { + bucket, err := tx.CreateTopLevelBucket(bucketName) + if err != nil { + return err + } + if err := bucket.Put(exitKey, testRHash); err != nil { + return err + } + if err := bucket.Put(seekKey, testRHash); err != nil { + return err + } + + return bucket.Put(endKey, testRHash) + }, func() {}) + require.NoError(t, err) + + // tester is a helper closure that tests the iterator. + tester := func(seeker []byte, cb callback, expectedErr error) { + err := kvdb.View(cdb, func(tx kvdb.RTx) error { + bucket := tx.ReadBucket(bucketName) + return iterator(bucket, seeker, cb) + }, func() {}) + + // Check the err is returned as expected. + require.Equal(t, expectedErr, err) + } + + // keysItered records the keys have been iterated. + keysItered := make([][]byte, 0) + + // testCb creates a dummy callback that saves the keys it have + // iterated. + testCb := func(k, v []byte) error { + keysItered = append(keysItered, k) + if bytes.Equal(k, exitKey) { + return errExit + } + return nil + } + + // Test that without a seeker, we would iterate from the beginning, + // which will end up iterating only one key since we would exit on it. + tester(nil, testCb, errExit) + require.Equal(t, [][]byte{exitKey}, keysItered) + + // Reset the keys. + keysItered = make([][]byte, 0) + + // Now test that when we use a seeker, we would start our iteration at + // the seeker posisiton. This means we won't exit it early since we've + // skipped the exitKey. + tester(seekKey, testCb, nil) + require.Equal(t, [][]byte{seekKey, endKey}, keysItered) +} + +// TestIterateBuckets checks that we can successfully iterate the buckets and +// update the locator during the iteration. +func TestIterateBuckets(t *testing.T) { + t.Parallel() + + // Create test database. + cdb, cleanUp, err := migtest.MakeDB() + defer cleanUp() + require.NoError(t, err) + + // Create three test channels. + c1 := createTestChannel(nil) + c2 := createTestChannel(nil) + c3 := createTestChannel(nil) + + // Create test buckets. + err = kvdb.Update(cdb, func(tx kvdb.RwTx) error { + _, err := mig25.CreateChanBucket(tx, &c1.OpenChannel) + if err != nil { + return err + } + + _, err = mig25.CreateChanBucket(tx, &c2.OpenChannel) + if err != nil { + return err + } + + _, err = mig25.CreateChanBucket(tx, &c3.OpenChannel) + if err != nil { + return err + } + + return nil + }, func() {}) + require.NoError(t, err) + + // testCb creates a dummy callback that saves the locator it received. + locators := make([]*updateLocator, 0) + testCb := func(_ kvdb.RwBucket, l *updateLocator) error { // nolint:unparam + locators = append(locators, l) + return nil + } + + // Iterate the buckets with a nil locator. + err = kvdb.Update(cdb, func(tx kvdb.RwTx) error { + bucket := tx.ReadWriteBucket(openChannelBucket) + return iterateBuckets(bucket, nil, testCb) + }, func() {}) + require.NoError(t, err) + + // We should see three locators. + require.Len(t, locators, 3) + + // We now test we can iterate the buckets using a locator. + // + // Copy the locator which points to the second channel. + locator := &updateLocator{ + nodePub: locators[1].nodePub, + chainHash: locators[1].chainHash, + fundingOutpoint: locators[1].fundingOutpoint, + } + + // Reset the locators. + locators = make([]*updateLocator, 0) + + // Iterate the buckets with a locator. + err = kvdb.Update(cdb, func(tx kvdb.RwTx) error { + bucket := tx.ReadWriteBucket(openChannelBucket) + return iterateBuckets(bucket, locator, testCb) + }, func() {}) + require.NoError(t, err) + + // We should see two locators. + require.Len(t, locators, 2) +} + +// TestLocalNextUpdateNum checks that we can successfully locate the next +// migration target record. +func TestLocalNextUpdateNum(t *testing.T) { + t.Parallel() + + // assertLocator checks the locator has expected values in its fields. + assertLocator := func(t *testing.T, c *mig26.OpenChannel, + height []byte, l *updateLocator) { + + var buf bytes.Buffer + require.NoError( + t, mig.WriteOutpoint(&buf, &c.FundingOutpoint), + ) + + // Prepare the info needed to validate the locator. + nodePub := c.IdentityPub.SerializeCompressed() + chainHash := c.ChainHash[:] + cp := buf.Bytes() + + require.Equal(t, nodePub, l.nodePub, "wrong nodePub") + require.Equal(t, chainHash, l.chainHash, "wrong chainhash") + require.Equal(t, cp, l.fundingOutpoint, "wrong outpoint") + require.Equal(t, height, l.nextHeight, "wrong nextHeight") + } + + // createTwoChannels is a helper closure that creates two testing + // channels and returns the channels sorted by their nodePub to match + // how they are stored in boltdb. + createTwoChannels := func() (*mig26.OpenChannel, *mig26.OpenChannel) { + c1 := createTestChannel(nil) + c2 := createTestChannel(nil) + + // If c1 is greater than c2, boltdb will put c2 before c1. + if bytes.Compare( + c1.IdentityPub.SerializeCompressed(), + c2.IdentityPub.SerializeCompressed(), + ) > 0 { + + c1, c2 = c2, c1 + } + + return c1, c2 + } + + // createNotFinished will setup a situation where we have un-migrated + // logs and return the next migration height. + createNotFinished := func(cdb kvdb.Backend, + c *mig26.OpenChannel) []byte { + + // Create test logs. + oldLogs := []mig.ChannelCommitment{ + createDummyChannelCommit(1), + createDummyChannelCommit(2), + } + newLogs := []mig.ChannelCommitment{ + createDummyChannelCommit(1), + } + err := setupTestLogs(cdb, c, oldLogs, newLogs) + require.NoError(t, err) + + return []byte{0, 0, 0, 0, 0, 0, 0, 2} + } + + // createFinished will setup a situation where all the old logs have + // been migrated and return a nil. + createFinished := func(cdb kvdb.Backend, c *mig26.OpenChannel) []byte { // nolint:unparam + // Create test logs. + oldLogs := []mig.ChannelCommitment{ + createDummyChannelCommit(1), + createDummyChannelCommit(2), + } + newLogs := []mig.ChannelCommitment{ + createDummyChannelCommit(1), + createDummyChannelCommit(2), + } + err := setupTestLogs(cdb, c, oldLogs, newLogs) + require.NoError(t, err) + + return nil + } + + // emptyChannel builds a test case where no channel buckets exist. + emptyChannel := func(cdb kvdb.Backend) ( + *mig26.OpenChannel, []byte) { + + // Create the root bucket. + err := setupTestLogs(cdb, nil, nil, nil) + require.NoError(t, err) + + return nil, nil + } + + // singleChannelNoLogs builds a test case where we have a single + // channel without any revocation logs. + singleChannelNoLogs := func(cdb kvdb.Backend) ( + *mig26.OpenChannel, []byte) { + + // Create a test channel. + c := createTestChannel(nil) + + // Create test logs. + err := setupTestLogs(cdb, c, nil, nil) + require.NoError(t, err) + + return c, nil + } + + // singleChannelNotFinished builds a test case where we have a single + // channel and have unfinished old logs. + singleChannelNotFinished := func(cdb kvdb.Backend) ( + *mig26.OpenChannel, []byte) { + + c := createTestChannel(nil) + return c, createNotFinished(cdb, c) + } + + // singleChannelFinished builds a test where we have a single channel + // and have finished all the migration. + singleChannelFinished := func(cdb kvdb.Backend) ( + *mig26.OpenChannel, []byte) { + + c := createTestChannel(nil) + return c, createFinished(cdb, c) + } + + // twoChannelsNotFinished builds a test case where we have two channels + // and have unfinished old logs. + twoChannelsNotFinished := func(cdb kvdb.Backend) ( + *mig26.OpenChannel, []byte) { + + c1, c2 := createTwoChannels() + createFinished(cdb, c1) + return c2, createNotFinished(cdb, c2) + } + + // twoChannelsFinished builds a test case where we have two channels + // and have finished the migration. + twoChannelsFinished := func(cdb kvdb.Backend) ( + *mig26.OpenChannel, []byte) { + + c1, c2 := createTwoChannels() + createFinished(cdb, c1) + return c2, createFinished(cdb, c2) + } + + type setupFunc func(cdb kvdb.Backend) (*mig26.OpenChannel, []byte) + + testCases := []struct { + name string + setup setupFunc + expectFinish bool + }{ + { + name: "empty buckets", + setup: emptyChannel, + expectFinish: true, + }, + { + name: "single channel no logs", + setup: singleChannelNoLogs, + expectFinish: true, + }, + { + name: "single channel not finished", + setup: singleChannelNotFinished, + expectFinish: false, + }, + { + name: "single channel finished", + setup: singleChannelFinished, + expectFinish: true, + }, + { + name: "two channels not finished", + setup: twoChannelsNotFinished, + expectFinish: false, + }, + { + name: "two channels finished", + setup: twoChannelsFinished, + expectFinish: true, + }, + } + + // tester is a helper closure that finds the locator. + tester := func(t *testing.T, cdb kvdb.Backend) *updateLocator { + var l *updateLocator + err := kvdb.Update(cdb, func(tx kvdb.RwTx) error { + rootBucket := tx.ReadWriteBucket(openChannelBucket) + + // Find the locator. + locator, err := locateNextUpdateNum(rootBucket) + if err != nil { + return err + } + + l = locator + return nil + }, func() {}) + require.NoError(t, err) + + return l + } + + for _, tc := range testCases { + // Create a test database. + cdb, cleanUp, err := migtest.MakeDB() + defer cleanUp() + require.NoError(t, err) + + tc := tc + t.Run(tc.name, func(t *testing.T) { + // Setup the test case. + c, height := tc.setup(cdb) + + // Run the test and assert the locator. + locator := tester(t, cdb) + if tc.expectFinish { + require.Nil(t, locator, "expected nil locator") + } else { + assertLocator(t, c, height, locator) + } + }) + } +} + +func createDummyChannelCommit(height uint64) mig.ChannelCommitment { + htlc := mig.HTLC{ + Amt: testHTLCAmt, + RefundTimeout: testRefundTimeout, + OutputIndex: testOutputIndex, + Incoming: testIncoming, + } + copy(htlc.RHash[:], testRHash) + c := mig.ChannelCommitment{ + CommitHeight: height, + Htlcs: []mig.HTLC{htlc}, + CommitTx: testTx, + } + return c +} diff --git a/channeldb/migration30/lnwallet.go b/channeldb/migration30/lnwallet.go new file mode 100644 index 0000000000..d3bf885dc8 --- /dev/null +++ b/channeldb/migration30/lnwallet.go @@ -0,0 +1,361 @@ +package migration30 + +import ( + "bytes" + + mig25 "github.com/lightningnetwork/lnd/channeldb/migration25" + mig26 "github.com/lightningnetwork/lnd/channeldb/migration26" + mig "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11" + + "github.com/btcsuite/btcd/btcec/v2" + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/lightningnetwork/lnd/input" +) + +// CommitmentKeyRing holds all derived keys needed to construct commitment and +// HTLC transactions. The keys are derived differently depending whether the +// commitment transaction is ours or the remote peer's. Private keys associated +// with each key may belong to the commitment owner or the "other party" which +// is referred to in the field comments, regardless of which is local and which +// is remote. +type CommitmentKeyRing struct { + // CommitPoint is the "per commitment point" used to derive the tweak + // for each base point. + CommitPoint *btcec.PublicKey + + // LocalCommitKeyTweak is the tweak used to derive the local public key + // from the local payment base point or the local private key from the + // base point secret. This may be included in a SignDescriptor to + // generate signatures for the local payment key. + // + // NOTE: This will always refer to "our" local key, regardless of + // whether this is our commit or not. + LocalCommitKeyTweak []byte + + // TODO(roasbeef): need delay tweak as well? + + // LocalHtlcKeyTweak is the tweak used to derive the local HTLC key + // from the local HTLC base point. This value is needed in order to + // derive the final key used within the HTLC scripts in the commitment + // transaction. + // + // NOTE: This will always refer to "our" local HTLC key, regardless of + // whether this is our commit or not. + LocalHtlcKeyTweak []byte + + // LocalHtlcKey is the key that will be used in any clause paying to + // our node of any HTLC scripts within the commitment transaction for + // this key ring set. + // + // NOTE: This will always refer to "our" local HTLC key, regardless of + // whether this is our commit or not. + LocalHtlcKey *btcec.PublicKey + + // RemoteHtlcKey is the key that will be used in clauses within the + // HTLC script that send money to the remote party. + // + // NOTE: This will always refer to "their" remote HTLC key, regardless + // of whether this is our commit or not. + RemoteHtlcKey *btcec.PublicKey + + // ToLocalKey is the commitment transaction owner's key which is + // included in HTLC success and timeout transaction scripts. This is + // the public key used for the to_local output of the commitment + // transaction. + // + // NOTE: Who's key this is depends on the current perspective. If this + // is our commitment this will be our key. + ToLocalKey *btcec.PublicKey + + // ToRemoteKey is the non-owner's payment key in the commitment tx. + // This is the key used to generate the to_remote output within the + // commitment transaction. + // + // NOTE: Who's key this is depends on the current perspective. If this + // is our commitment this will be their key. + ToRemoteKey *btcec.PublicKey + + // RevocationKey is the key that can be used by the other party to + // redeem outputs from a revoked commitment transaction if it were to + // be published. + // + // NOTE: Who can sign for this key depends on the current perspective. + // If this is our commitment, it means the remote node can sign for + // this key in case of a breach. + RevocationKey *btcec.PublicKey +} + +// ScriptInfo holds a redeem script and hash. +type ScriptInfo struct { + // PkScript is the output's PkScript. + PkScript []byte + + // WitnessScript is the full script required to properly redeem the + // output. This field should be set to the full script if a p2wsh + // output is being signed. For p2wkh it should be set equal to the + // PkScript. + WitnessScript []byte +} + +// findOutputIndexesFromRemote finds the index of our and their outputs from +// the remote commitment transaction. It derives the key ring to compute the +// output scripts and compares them against the outputs inside the commitment +// to find the match. +func findOutputIndexesFromRemote(revocationPreimage *chainhash.Hash, + chanState *mig26.OpenChannel, + oldLog *mig.ChannelCommitment) (uint32, uint32, error) { + + // Init the output indexes as empty. + ourIndex := uint32(OutputIndexEmpty) + theirIndex := uint32(OutputIndexEmpty) + + chanCommit := oldLog + _, commitmentPoint := btcec.PrivKeyFromBytes(revocationPreimage[:]) + + // With the commitment point generated, we can now derive the king ring + // which will be used to generate the output scripts. + keyRing := DeriveCommitmentKeys( + commitmentPoint, false, chanState.ChanType, + &chanState.LocalChanCfg, &chanState.RemoteChanCfg, + ) + + // Since it's remote commitment chain, we'd used the mirrored values. + // + // We use the remote's channel config for the csv delay. + theirDelay := uint32(chanState.RemoteChanCfg.CsvDelay) + + // If we are the initiator of this channel, then it's be false from the + // remote's PoV. + isRemoteInitiator := !chanState.IsInitiator + + var leaseExpiry uint32 + if chanState.ChanType.HasLeaseExpiration() { + leaseExpiry = chanState.ThawHeight + } + + // Map the scripts from our PoV. When facing a local commitment, the to + // local output belongs to us and the to remote output belongs to them. + // When facing a remote commitment, the to local output belongs to them + // and the to remote output belongs to us. + + // Compute the to local script. From our PoV, when facing a remote + // commitment, the to local output belongs to them. + theirScript, err := CommitScriptToSelf( + chanState.ChanType, isRemoteInitiator, keyRing.ToLocalKey, + keyRing.RevocationKey, theirDelay, leaseExpiry, + ) + if err != nil { + return ourIndex, theirIndex, err + } + + // Compute the to remote script. From our PoV, when facing a remote + // commitment, the to remote output belongs to us. + ourScript, _, err := CommitScriptToRemote( + chanState.ChanType, isRemoteInitiator, keyRing.ToRemoteKey, + leaseExpiry, + ) + if err != nil { + return ourIndex, theirIndex, err + } + + // Now compare the scripts to find our/their output index. + for i, txOut := range chanCommit.CommitTx.TxOut { + switch { + case bytes.Equal(txOut.PkScript, ourScript.PkScript): + ourIndex = uint32(i) + case bytes.Equal(txOut.PkScript, theirScript.PkScript): + theirIndex = uint32(i) + } + } + + return ourIndex, theirIndex, nil +} + +// DeriveCommitmentKeys generates a new commitment key set using the base points +// and commitment point. The keys are derived differently depending on the type +// of channel, and whether the commitment transaction is ours or the remote +// peer's. +func DeriveCommitmentKeys(commitPoint *btcec.PublicKey, + isOurCommit bool, chanType mig25.ChannelType, + localChanCfg, remoteChanCfg *mig.ChannelConfig) *CommitmentKeyRing { + + tweaklessCommit := chanType.IsTweakless() + + // Depending on if this is our commit or not, we'll choose the correct + // base point. + localBasePoint := localChanCfg.PaymentBasePoint + if isOurCommit { + localBasePoint = localChanCfg.DelayBasePoint + } + + // First, we'll derive all the keys that don't depend on the context of + // whose commitment transaction this is. + keyRing := &CommitmentKeyRing{ + CommitPoint: commitPoint, + + LocalCommitKeyTweak: input.SingleTweakBytes( + commitPoint, localBasePoint.PubKey, + ), + LocalHtlcKeyTweak: input.SingleTweakBytes( + commitPoint, localChanCfg.HtlcBasePoint.PubKey, + ), + LocalHtlcKey: input.TweakPubKey( + localChanCfg.HtlcBasePoint.PubKey, commitPoint, + ), + RemoteHtlcKey: input.TweakPubKey( + remoteChanCfg.HtlcBasePoint.PubKey, commitPoint, + ), + } + + // We'll now compute the to_local, to_remote, and revocation key based + // on the current commitment point. All keys are tweaked each state in + // order to ensure the keys from each state are unlinkable. To create + // the revocation key, we take the opposite party's revocation base + // point and combine that with the current commitment point. + var ( + toLocalBasePoint *btcec.PublicKey + toRemoteBasePoint *btcec.PublicKey + revocationBasePoint *btcec.PublicKey + ) + if isOurCommit { + toLocalBasePoint = localChanCfg.DelayBasePoint.PubKey + toRemoteBasePoint = remoteChanCfg.PaymentBasePoint.PubKey + revocationBasePoint = remoteChanCfg.RevocationBasePoint.PubKey + } else { + toLocalBasePoint = remoteChanCfg.DelayBasePoint.PubKey + toRemoteBasePoint = localChanCfg.PaymentBasePoint.PubKey + revocationBasePoint = localChanCfg.RevocationBasePoint.PubKey + } + + // With the base points assigned, we can now derive the actual keys + // using the base point, and the current commitment tweak. + keyRing.ToLocalKey = input.TweakPubKey(toLocalBasePoint, commitPoint) + keyRing.RevocationKey = input.DeriveRevocationPubkey( + revocationBasePoint, commitPoint, + ) + + // If this commitment should omit the tweak for the remote point, then + // we'll use that directly, and ignore the commitPoint tweak. + if tweaklessCommit { + keyRing.ToRemoteKey = toRemoteBasePoint + + // If this is not our commitment, the above ToRemoteKey will be + // ours, and we blank out the local commitment tweak to + // indicate that the key should not be tweaked when signing. + if !isOurCommit { + keyRing.LocalCommitKeyTweak = nil + } + } else { + keyRing.ToRemoteKey = input.TweakPubKey( + toRemoteBasePoint, commitPoint, + ) + } + + return keyRing +} + +// CommitScriptToRemote derives the appropriate to_remote script based on the +// channel's commitment type. The `initiator` argument should correspond to the +// owner of the commitment transaction which we are generating the to_remote +// script for. The second return value is the CSV delay of the output script, +// what must be satisfied in order to spend the output. +func CommitScriptToRemote(chanType mig25.ChannelType, initiator bool, + key *btcec.PublicKey, leaseExpiry uint32) (*ScriptInfo, uint32, error) { + + switch { + // If we are not the initiator of a leased channel, then the remote + // party has an additional CLTV requirement in addition to the 1 block + // CSV requirement. + case chanType.HasLeaseExpiration() && !initiator: + script, err := input.LeaseCommitScriptToRemoteConfirmed( + key, leaseExpiry, + ) + if err != nil { + return nil, 0, err + } + + p2wsh, err := input.WitnessScriptHash(script) + if err != nil { + return nil, 0, err + } + + return &ScriptInfo{ + PkScript: p2wsh, + WitnessScript: script, + }, 1, nil + + // If this channel type has anchors, we derive the delayed to_remote + // script. + case chanType.HasAnchors(): + script, err := input.CommitScriptToRemoteConfirmed(key) + if err != nil { + return nil, 0, err + } + + p2wsh, err := input.WitnessScriptHash(script) + if err != nil { + return nil, 0, err + } + + return &ScriptInfo{ + PkScript: p2wsh, + WitnessScript: script, + }, 1, nil + + default: + // Otherwise the to_remote will be a simple p2wkh. + p2wkh, err := input.CommitScriptUnencumbered(key) + if err != nil { + return nil, 0, err + } + + // Since this is a regular P2WKH, the WitnessScipt and PkScript + // should both be set to the script hash. + return &ScriptInfo{ + WitnessScript: p2wkh, + PkScript: p2wkh, + }, 0, nil + } +} + +// CommitScriptToSelf constructs the public key script for the output on the +// commitment transaction paying to the "owner" of said commitment transaction. +// The `initiator` argument should correspond to the owner of the commitment +// transaction which we are generating the to_local script for. If the other +// party learns of the preimage to the revocation hash, then they can claim all +// the settled funds in the channel, plus the unsettled funds. +func CommitScriptToSelf(chanType mig25.ChannelType, initiator bool, + selfKey, revokeKey *btcec.PublicKey, csvDelay, leaseExpiry uint32) ( + *ScriptInfo, error) { + + var ( + toLocalRedeemScript []byte + err error + ) + switch { + // If we are the initiator of a leased channel, then we have an + // additional CLTV requirement in addition to the usual CSV requirement. + case initiator && chanType.HasLeaseExpiration(): + toLocalRedeemScript, err = input.LeaseCommitScriptToSelf( + selfKey, revokeKey, csvDelay, leaseExpiry, + ) + + default: + toLocalRedeemScript, err = input.CommitScriptToSelf( + csvDelay, selfKey, revokeKey, + ) + } + if err != nil { + return nil, err + } + + toLocalScriptHash, err := input.WitnessScriptHash(toLocalRedeemScript) + if err != nil { + return nil, err + } + + return &ScriptInfo{ + PkScript: toLocalScriptHash, + WitnessScript: toLocalRedeemScript, + }, nil +} diff --git a/channeldb/migration30/log.go b/channeldb/migration30/log.go new file mode 100644 index 0000000000..1a1672fe6f --- /dev/null +++ b/channeldb/migration30/log.go @@ -0,0 +1,14 @@ +package migration30 + +import ( + "github.com/btcsuite/btclog" +) + +// log is a logger that is initialized as disabled. This means the package will +// not perform any logging by default until a logger is set. +var log = btclog.Disabled + +// UseLogger uses a specified Logger to output package logging info. +func UseLogger(logger btclog.Logger) { + log = logger +} diff --git a/channeldb/migration30/migration.go b/channeldb/migration30/migration.go new file mode 100644 index 0000000000..fc551ac2a9 --- /dev/null +++ b/channeldb/migration30/migration.go @@ -0,0 +1,656 @@ +package migration30 + +import ( + "bytes" + "encoding/binary" + "fmt" + "math" + "sync" + + mig24 "github.com/lightningnetwork/lnd/channeldb/migration24" + mig26 "github.com/lightningnetwork/lnd/channeldb/migration26" + mig "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11" + "github.com/lightningnetwork/lnd/kvdb" +) + +// recordsPerTx specifies the number of records to be migrated in each database +// transaction. In the worst case, each old revocation log is 28,057 bytes. +// 20,000 records would consume 0.56 GB of ram, which is feasible for a modern +// machine. +// +// NOTE: we could've used more ram but it doesn't help with the speed of the +// migration since the most of the CPU time is used for calculating the output +// indexes. +const recordsPerTx = 20_000 + +// MigrateRevocationLog migrates the old revocation logs into the newer format +// and deletes them once finished, with the deletion only happens once ALL the +// old logs have been migrates. +func MigrateRevocationLog(db kvdb.Backend) error { + log.Infof("Migrating revocation logs, might take a while...") + + var ( + err error + + // finished is used to exit the for loop. + finished bool + + // total is the number of total records. + total uint64 + + // migrated is the number of already migrated records. + migrated uint64 + ) + + // First of all, read the stats of the revocation logs. + total, migrated, err = logMigrationStat(db) + if err != nil { + return err + } + log.Infof("Total logs=%d, migrated=%d", total, migrated) + + // Exit early if the old logs have already been migrated and deleted. + if total == 0 { + log.Info("Migration already finished!") + return nil + } + + for { + if finished { + log.Infof("Migrating old revocation logs finished, " + + "now checking the migration results...") + break + } + + // Process the migration. + err = kvdb.Update(db, func(tx kvdb.RwTx) error { + finished, err = processMigration(tx) + if err != nil { + return err + } + return nil + }, func() {}) + if err != nil { + return err + } + + // Each time we finished the above process, we'd read the stats + // again to understand the current progress. + total, migrated, err = logMigrationStat(db) + if err != nil { + return err + } + + // Calculate and log the progress if the progress is less than + // one. + progress := float64(migrated) / float64(total) * 100 + if progress >= 100 { + continue + } + + log.Infof("Migration progress: %.3f%%, still have: %d", + progress, total-migrated) + } + + // Before we can safety delete the old buckets, we perform a check to + // make sure the logs are migrated as expected. + err = kvdb.Update(db, validateMigration, func() {}) + if err != nil { + return fmt.Errorf("validate migration failed: %v", err) + } + + log.Info("Migration check passed, now deleting the old logs...") + + // Once the migration completes, we can now safety delete the old + // revocation logs. + if err := deleteOldBuckets(db); err != nil { + return fmt.Errorf("deleteOldBuckets err: %v", err) + } + + log.Info("Old revocation log buckets removed!") + return nil +} + +// processMigration finds the next un-migrated revocation logs, reads a max +// number of `recordsPerTx` records, converts them into the new revocation logs +// and save them to disk. +func processMigration(tx kvdb.RwTx) (bool, error) { + openChanBucket := tx.ReadWriteBucket(openChannelBucket) + + // If no bucket is found, we can exit early. + if openChanBucket == nil { + return false, fmt.Errorf("root bucket not found") + } + + // Locate the next migration height. + locator, err := locateNextUpdateNum(openChanBucket) + if err != nil { + return false, fmt.Errorf("locator got error: %v", err) + } + + // If the returned locator is nil, we've done migrating the logs. + if locator == nil { + return true, nil + } + + // Read a list of old revocation logs. + entryMap, err := readOldRevocationLogs(openChanBucket, locator) + if err != nil { + return false, fmt.Errorf("read old logs err: %v", err) + } + + // Migrate the revocation logs. + return false, writeRevocationLogs(openChanBucket, entryMap) +} + +// deleteOldBuckets iterates all the channel buckets and deletes the old +// revocation buckets. +func deleteOldBuckets(db kvdb.Backend) error { + // locators records all the chan buckets found in the database. + var locators []*updateLocator + + // reader is a helper closure that saves the locator found. Each + // locator is relatively small(33+32+36+8=109 bytes), assuming 1 GB of + // ram we can fit roughly 10 million records. Since each record + // corresponds to a channel, we should have more than enough memory to + // read them all. + reader := func(_ kvdb.RwBucket, l *updateLocator) error { // nolint:unparam + locators = append(locators, l) + return nil + } + + // remover is a helper closure that removes the old revocation log + // bucket under the specified chan bucket by the given locator. + remover := func(rootBucket kvdb.RwBucket, l *updateLocator) error { + chanBucket, err := l.locateChanBucket(rootBucket) + if err != nil { + return err + } + + return chanBucket.DeleteNestedBucket( + revocationLogBucketDeprecated, + ) + } + + // Perform the deletion in one db transaction. This should not cause + // any memory issue as the deletion doesn't load any data from the + // buckets. + return kvdb.Update(db, func(tx kvdb.RwTx) error { + openChanBucket := tx.ReadWriteBucket(openChannelBucket) + + // Exit early if there's no bucket. + if openChanBucket == nil { + return nil + } + + // Iterate the buckets to find all the locators. + err := iterateBuckets(openChanBucket, nil, reader) + if err != nil { + return err + } + + // Iterate the locators and delete all the old revocation log + // buckets. + for _, l := range locators { + err := remover(openChanBucket, l) + // If the bucket doesn't exist, we can exit safety. + if err != nil && err != kvdb.ErrBucketNotFound { + return err + } + } + + return nil + }, func() {}) +} + +// writeRevocationLogs unwraps the entryMap and writes the new revocation logs. +func writeRevocationLogs(openChanBucket kvdb.RwBucket, + entryMap logEntries) error { + + for locator, logs := range entryMap { + // Find the channel bucket. + chanBucket, err := locator.locateChanBucket(openChanBucket) + if err != nil { + return fmt.Errorf("locateChanBucket err: %v", err) + } + + // Create the new log bucket. + logBucket, err := chanBucket.CreateBucketIfNotExists( + revocationLogBucket, + ) + if err != nil { + return fmt.Errorf("create log bucket err: %v", err) + } + + // Write the new logs. + for _, entry := range logs { + var b bytes.Buffer + err := serializeRevocationLog(&b, entry.log) + if err != nil { + return err + } + + logEntrykey := mig24.MakeLogKey(entry.commitHeight) + err = logBucket.Put(logEntrykey[:], b.Bytes()) + if err != nil { + return fmt.Errorf("putRevocationLog err: %v", + err) + } + } + } + + return nil +} + +// logMigrationStat reads the buckets to provide stats over current migration +// progress. The returned values are the numbers of total records and already +// migrated records. +func logMigrationStat(db kvdb.Backend) (uint64, uint64, error) { + var ( + err error + + // total is the number of total records. + total uint64 + + // unmigrated is the number of unmigrated records. + unmigrated uint64 + ) + + err = kvdb.Update(db, func(tx kvdb.RwTx) error { + total, unmigrated, err = fetchLogStats(tx) + return err + }, func() {}) + + log.Debugf("Total logs=%d, unmigrated=%d", total, unmigrated) + return total, total - unmigrated, err +} + +// fetchLogStats iterates all the chan buckets to provide stats about the logs. +// The returned values are num of total records, and num of un-migrated +// records. +func fetchLogStats(tx kvdb.RwTx) (uint64, uint64, error) { + var ( + total uint64 + totalUnmigrated uint64 + ) + + openChanBucket := tx.ReadWriteBucket(openChannelBucket) + + // If no bucket is found, we can exit early. + if openChanBucket == nil { + return 0, 0, fmt.Errorf("root bucket not found") + } + + // counter is a helper closure used to count the number of records + // based on the given bucket. + counter := func(chanBucket kvdb.RwBucket, bucket []byte) uint64 { + // Read the sub-bucket level 4. + logBucket := chanBucket.NestedReadBucket(bucket) + + // Exit early if we don't have the bucket. + if logBucket == nil { + return 0 + } + + // Jump to the end of the cursor. + key, _ := logBucket.ReadCursor().Last() + + // Since the CommitHeight is a zero-based monotonically + // increased index, its value plus one reflects the total + // records under this chan bucket. + lastHeight := binary.BigEndian.Uint64(key) + 1 + + return lastHeight + } + + // countTotal is a callback function used to count the total number of + // records. + countTotal := func(chanBucket kvdb.RwBucket, l *updateLocator) error { + total += counter(chanBucket, revocationLogBucketDeprecated) + return nil + } + + // countUnmigrated is a callback function used to count the total + // number of un-migrated records. + countUnmigrated := func(chanBucket kvdb.RwBucket, + l *updateLocator) error { + + totalUnmigrated += counter( + chanBucket, revocationLogBucketDeprecated, + ) + return nil + } + + // Locate the next migration height. + locator, err := locateNextUpdateNum(openChanBucket) + if err != nil { + return 0, 0, fmt.Errorf("locator got error: %v", err) + } + + // If the returned locator is not nil, we still have un-migrated + // records so we need to count them. Otherwise we've done migrating the + // logs. + if locator != nil { + err = iterateBuckets(openChanBucket, locator, countUnmigrated) + if err != nil { + return 0, 0, err + } + } + + // Count the total number of records by supplying a nil locator. + err = iterateBuckets(openChanBucket, nil, countTotal) + if err != nil { + return 0, 0, err + } + + return total, totalUnmigrated, err +} + +// logEntry houses the info needed to write a new revocation log. +type logEntry struct { + log *RevocationLog + commitHeight uint64 + ourIndex uint32 + theirIndex uint32 + locator *updateLocator +} + +// logEntries maps a bucket locator to a list of entries under that bucket. +type logEntries map[*updateLocator][]*logEntry + +// result is made of two channels that's used to send back the constructed new +// revocation log or an error. +type result struct { + newLog chan *logEntry + errChan chan error +} + +// readOldRevocationLogs finds a list of old revocation logs and converts them +// into the new revocation logs. +func readOldRevocationLogs(openChanBucket kvdb.RwBucket, + locator *updateLocator) (logEntries, error) { + + entries := make(logEntries) + results := make([]*result, 0) + + var wg sync.WaitGroup + + // collectLogs is a helper closure that reads all newly created + // revocation logs sent over the result channels. + // + // NOTE: the order of the logs cannot be guaranteed, which is fine as + // boltdb will take care of the orders when saving them. + collectLogs := func() error { + wg.Wait() + + for _, r := range results { + select { + case entry := <-r.newLog: + entries[entry.locator] = append( + entries[entry.locator], entry, + ) + + case err := <-r.errChan: + return err + } + } + + return nil + } + + // createLog is a helper closure that constructs a new revocation log. + // + // NOTE: used as a goroutine. + createLog := func(chanState *mig26.OpenChannel, + c mig.ChannelCommitment, l *updateLocator, r *result) { + + defer wg.Done() + + // Find the output indexes. + ourIndex, theirIndex, err := findOutputIndexes(chanState, &c) + if err != nil { + r.errChan <- err + } + + // Convert the old logs into the new logs. We do this early in + // the read tx so the old large revocation log can be set to + // nil here so save us some memory space. + newLog, err := convertRevocationLog(&c, ourIndex, theirIndex) + if err != nil { + r.errChan <- err + } + // Create the entry that will be used to create the new log. + entry := &logEntry{ + log: newLog, + commitHeight: c.CommitHeight, + ourIndex: ourIndex, + theirIndex: theirIndex, + locator: l, + } + + r.newLog <- entry + } + + // innerCb is the stepping function used when iterating the old log + // bucket. + innerCb := func(chanState *mig26.OpenChannel, l *updateLocator, + _, v []byte) error { + + reader := bytes.NewReader(v) + c, err := mig.DeserializeChanCommit(reader) + if err != nil { + return err + } + + r := &result{ + newLog: make(chan *logEntry, 1), + errChan: make(chan error, 1), + } + results = append(results, r) + + // We perform the log creation in a goroutine as it takes some + // time to compute and find output indexes. + wg.Add(1) + go createLog(chanState, c, l, r) + + // Check the records read so far and signals exit when we've + // reached our memory cap. + if len(results) >= recordsPerTx { + return errExit + } + + return nil + } + + // cb is the callback function to be used when iterating the buckets. + cb := func(chanBucket kvdb.RwBucket, l *updateLocator) error { + // Read the open channel. + c := &mig26.OpenChannel{} + err := mig26.FetchChanInfo(chanBucket, c, false) + if err != nil { + return fmt.Errorf("unable to fetch chan info: %v", err) + } + + err = fetchChanRevocationState(chanBucket, c) + if err != nil { + return fmt.Errorf("unable to fetch revocation "+ + "state: %v", err) + } + + // Read the sub-bucket level 4. + logBucket := chanBucket.NestedReadBucket( + revocationLogBucketDeprecated, + ) + // Exit early if we don't have the old bucket. + if logBucket == nil { + return nil + } + + // Init the map key when needed. + _, ok := entries[l] + if !ok { + entries[l] = make([]*logEntry, 0, recordsPerTx) + } + + return iterator( + logBucket, locator.nextHeight, + func(k, v []byte) error { + // Reset the nextHeight for following chan + // buckets. + locator.nextHeight = nil + return innerCb(c, l, k, v) + }, + ) + } + + err := iterateBuckets(openChanBucket, locator, cb) + // If there's an error and it's not exit signal, we won't collect the + // logs from the result channels. + if err != nil && err != errExit { + return nil, err + } + + // Otherwise, collect the logs. + err = collectLogs() + + return entries, err +} + +// convertRevocationLog uses the fields `CommitTx` and `Htlcs` from a +// ChannelCommitment to construct a revocation log entry. +func convertRevocationLog(commit *mig.ChannelCommitment, + ourOutputIndex, theirOutputIndex uint32) (*RevocationLog, error) { + + // Sanity check that the output indexes can be safely converted. + if ourOutputIndex > math.MaxUint16 { + return nil, ErrOutputIndexTooBig + } + if theirOutputIndex > math.MaxUint16 { + return nil, ErrOutputIndexTooBig + } + + rl := &RevocationLog{ + OurOutputIndex: uint16(ourOutputIndex), + TheirOutputIndex: uint16(theirOutputIndex), + CommitTxHash: commit.CommitTx.TxHash(), + HTLCEntries: make([]*HTLCEntry, 0, len(commit.Htlcs)), + } + + for _, htlc := range commit.Htlcs { + // Skip dust HTLCs. + if htlc.OutputIndex < 0 { + continue + } + + // Sanity check that the output indexes can be safely + // converted. + if htlc.OutputIndex > math.MaxUint16 { + return nil, ErrOutputIndexTooBig + } + + entry := &HTLCEntry{ + RHash: htlc.RHash, + RefundTimeout: htlc.RefundTimeout, + Incoming: htlc.Incoming, + OutputIndex: uint16(htlc.OutputIndex), + Amt: htlc.Amt.ToSatoshis(), + } + rl.HTLCEntries = append(rl.HTLCEntries, entry) + } + + return rl, nil +} + +// validateMigration checks that the data saved in the new buckets match those +// saved in the old buckets. It does so by checking the last keys saved in both +// buckets can match, given the assumption that the `CommitHeight` is +// monotonically increased value so the last key represents the total number of +// records saved. +func validateMigration(tx kvdb.RwTx) error { + openChanBucket := tx.ReadWriteBucket(openChannelBucket) + + // If no bucket is found, we can exit early. + if openChanBucket == nil { + return nil + } + + // exitWithErr is a helper closure that prepends an error message with + // the locator info. + exitWithErr := func(l *updateLocator, msg string) error { + return fmt.Errorf("unmatched records found under : %v", l.nodePub, + l.chainHash, l.fundingOutpoint, msg) + } + + // cb is the callback function to be used when iterating the buckets. + cb := func(chanBucket kvdb.RwBucket, l *updateLocator) error { + // Read both the old and new revocation log buckets. + oldBucket := chanBucket.NestedReadBucket( + revocationLogBucketDeprecated, + ) + newBucket := chanBucket.NestedReadBucket(revocationLogBucket) + + // Exit early if the old bucket is nil. + // + // NOTE: the new bucket may not be nil here as new logs might + // have been created using lnd@v0.15.0. + if oldBucket == nil { + return nil + } + + // Return an error if the expected new bucket cannot be found. + if newBucket == nil { + return exitWithErr(l, "expected new bucket") + } + + // Acquire the cursors. + oldCursor := oldBucket.ReadCursor() + newCursor := newBucket.ReadCursor() + + // Jump to the end of the cursors to do a quick check. + newKey, _ := oldCursor.Last() + oldKey, _ := newCursor.Last() + + // We expected the CommitHeights to be matched for nodes prior + // to v0.15.0. + if bytes.Equal(newKey, oldKey) { + return nil + } + + // If the keys do not match, it's likely the node is running + // v0.15.0 and have new logs created. In this case, we will + // validate that every record in the old bucket can be found in + // the new bucket. + oldKey, _ = oldCursor.First() + + for { + // Try to locate the old key in the new bucket and we + // expect it to be found. + newKey, _ := newCursor.Seek(oldKey) + + // If the old key is not found in the new bucket, + // return an error. + // + // NOTE: because Seek will return the next key when the + // passed key cannot be found, we need to compare the + // keys to deicde whether the old key is found or not. + if !bytes.Equal(newKey, oldKey) { + errMsg := fmt.Sprintf("old bucket has "+ + "CommitHeight=%v cannot be found in "+ + "new bucket", oldKey) + return exitWithErr(l, errMsg) + } + + // Otherwise, keep iterating the old bucket. + oldKey, _ = oldCursor.Next() + + // If we've done iterating, all keys have been matched + // and we can safely exit. + if oldKey == nil { + return nil + } + } + } + + return iterateBuckets(openChanBucket, nil, cb) +} diff --git a/channeldb/migration30/migration_test.go b/channeldb/migration30/migration_test.go new file mode 100644 index 0000000000..573598ec3c --- /dev/null +++ b/channeldb/migration30/migration_test.go @@ -0,0 +1,574 @@ +package migration30 + +import ( + "bytes" + "fmt" + "testing" + + mig25 "github.com/lightningnetwork/lnd/channeldb/migration25" + mig26 "github.com/lightningnetwork/lnd/channeldb/migration26" + mig "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11" + "github.com/lightningnetwork/lnd/channeldb/migtest" + "github.com/lightningnetwork/lnd/kvdb" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +type ( + beforeMigrationFunc func(db kvdb.Backend) error + afterMigrationFunc func(t *testing.T, db kvdb.Backend) +) + +// TestMigrateRevocationLog provide a comprehensive test for the revocation log +// migration. The revocation logs are stored inside a deeply nested bucket, and +// can be accessed via nodePub:chainHash:fundingOutpoint:revocationLogBucket. +// Based on each value in the chain, we'd end up in a different db state. This +// test alters nodePub, fundingOutpoint, and revocationLogBucket to test +// against possible db states, leaving the chainHash staying the same as it's +// less likely to be changed. In specific, we test based on whether we have one +// or two peers(nodePub). For each peer, we test whether we have one or two +// channels(fundingOutpoint). And for each channel, we test 5 cases based on +// the revocation migration states(see buildChannelCases). The total states +// grow quickly and the test may take longer than 5min. +func TestMigrateRevocationLog(t *testing.T) { + t.Parallel() + + testCases := make([]*testCase, 0) + + // Create two peers, each has two channels. + alice1, alice2 := createTwoChannels() + bob1, bob2 := createTwoChannels() + + // Sort the two peers to match the order saved in boltdb. + if bytes.Compare( + alice1.IdentityPub.SerializeCompressed(), + bob1.IdentityPub.SerializeCompressed(), + ) > 0 { + + alice1, bob1 = bob1, alice1 + alice2, bob2 = bob2, alice2 + } + + // Build test cases for two peers. Each peer is independent so we + // combine the test cases based on its current db state. This would + // create a total of 30x30=900 cases. + for _, p1 := range buildPeerCases(alice1, alice2, false) { + for _, p2 := range buildPeerCases(bob1, bob2, p1.unfinished) { + setups := make([]beforeMigrationFunc, 0) + setups = append(setups, p1.setups...) + setups = append(setups, p2.setups...) + + asserters := make([]afterMigrationFunc, 0) + asserters = append(asserters, p1.asserters...) + asserters = append(asserters, p2.asserters...) + + name := fmt.Sprintf("alice: %s, bob: %s", + p1.name, p2.name) + + tc := &testCase{ + name: name, + setups: setups, + asserters: asserters, + } + testCases = append(testCases, tc) + } + } + + fmt.Printf("Running %d test cases...\n", len(testCases)) + + for i, tc := range testCases { + tc := tc + + // Construct a test case name that can be easily traced. + name := fmt.Sprintf("case_%d", i) + fmt.Println(name, tc.name) + + success := t.Run(name, func(t *testing.T) { + // Log the test's actual name on failure. + t.Log("Test setup: ", tc.name) + + beforeMigration := func(db kvdb.Backend) error { + for _, setup := range tc.setups { + if err := setup(db); err != nil { + return err + } + } + return nil + } + + afterMigration := func(db kvdb.Backend) error { + for _, asserter := range tc.asserters { + asserter(t, db) + } + return nil + } + + migtest.ApplyMigrationWithDb( + t, + beforeMigration, + afterMigration, + MigrateRevocationLog, + ) + }) + if !success { + return + } + } +} + +// TestValidateMigration checks that the function `validateMigration` behaves +// as expected. +func TestValidateMigration(t *testing.T) { + c := createTestChannel(nil) + + testCases := []struct { + name string + setup func(db kvdb.Backend) error + expectFail bool + }{ + { + // Finished prior to v0.15.0. + name: "valid migration", + setup: func(db kvdb.Backend) error { + return createFinished(db, c, true) + }, + expectFail: false, + }, + { + // Finished after to v0.15.0. + name: "valid migration after v0.15.0", + setup: func(db kvdb.Backend) error { + return createFinished(db, c, false) + }, + expectFail: false, + }, + { + // Missing logs prior to v0.15.0. + name: "invalid migration", + setup: func(db kvdb.Backend) error { + return createNotFinished(db, c, true) + }, + expectFail: true, + }, + { + // Missing logs after to v0.15.0. + name: "invalid migration after v0.15.0", + setup: func(db kvdb.Backend) error { + return createNotFinished(db, c, false) + }, + expectFail: true, + }, + } + + for _, tc := range testCases { + tc := tc + + // Create a test db. + cdb, cleanUp, err := migtest.MakeDB() + defer cleanUp() + require.NoError(t, err, "failed to create test db") + + t.Run(tc.name, func(t *testing.T) { + // Setup test logs. + err := tc.setup(cdb) + require.NoError(t, err, "failed to setup") + + // Call the actual function and check the error is + // returned as expected. + err = kvdb.Update(cdb, validateMigration, func() {}) + + if tc.expectFail { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +} + +// createTwoChannels creates two channels that have the same chainHash and +// IdentityPub, simulating having two channels under the same peer. +func createTwoChannels() (*mig26.OpenChannel, *mig26.OpenChannel) { + // Create two channels under the same peer. + c1 := createTestChannel(nil) + c2 := createTestChannel(c1.IdentityPub) + + // If c1 is greater than c2, boltdb will put c2 before c1. + if bytes.Compare( + c1.FundingOutpoint.Hash[:], + c2.FundingOutpoint.Hash[:], + ) > 0 { + + c1, c2 = c2, c1 + } + + return c1, c2 +} + +// channelTestCase defines a single test case given a particular channel state. +type channelTestCase struct { + name string + setup beforeMigrationFunc + asserter afterMigrationFunc + unfinished bool +} + +// buildChannelCases builds five channel test cases. These cases can be viewed +// as basic units that are used to build more complex test cases based on +// number of channels and peers. +func buildChannelCases(c *mig26.OpenChannel, + overwrite bool) []*channelTestCase { + + // assertNewLogs is a helper closure that checks the old bucket and the + // two new logs are saved. + assertNewLogs := func(t *testing.T, db kvdb.Backend) { + // Check that the old bucket is removed. + assertOldLogBucketDeleted(t, db, c) + + l := fetchNewLog(t, db, c, logHeight1) + assertRevocationLog(t, newLog1, l) + + l = fetchNewLog(t, db, c, logHeight2) + assertRevocationLog(t, newLog2, l) + } + + // case1 defines a case where we don't have a chanBucket. + case1 := &channelTestCase{ + name: "no channel", + setup: func(db kvdb.Backend) error { + return setupTestLogs(db, nil, nil, nil) + }, + // No need to assert anything. + asserter: func(t *testing.T, db kvdb.Backend) {}, + } + + // case2 defines a case when the chanBucket has no old revocation logs. + case2 := &channelTestCase{ + name: "empty old logs", + setup: func(db kvdb.Backend) error { + return setupTestLogs(db, c, nil, nil) + }, + // No need to assert anything. + asserter: func(t *testing.T, db kvdb.Backend) {}, + } + + // case3 defines a case when the chanBucket has finished its migration. + case3 := &channelTestCase{ + name: "finished migration", + setup: func(db kvdb.Backend) error { + return createFinished(db, c, true) + }, + asserter: func(t *testing.T, db kvdb.Backend) { + // Check that the old bucket is removed. + assertOldLogBucketDeleted(t, db, c) + + // Fetch the new log. We should see + // OurOutputIndex matching the testOurIndex + // value, indicating that for migrated logs we + // won't touch them. + // + // NOTE: when the log is created before + // migration, OurOutputIndex would be + // testOurIndex rather than OutputIndexEmpty. + l := fetchNewLog(t, db, c, logHeight1) + require.EqualValues( + t, testOurIndex, l.OurOutputIndex, + "expected log to be NOT overwritten", + ) + + // Fetch the new log. We should see + // TheirOutputIndex matching the testTheirIndex + // value, indicating that for migrated logs we + // won't touch them. + // + // NOTE: when the log is created before + // migration, TheirOutputIndex would be + // testTheirIndex rather than OutputIndexEmpty. + l = fetchNewLog(t, db, c, logHeight2) + require.EqualValues( + t, testTheirIndex, l.TheirOutputIndex, + "expected log to be NOT overwritten", + ) + }, + } + + // case4 defines a case when the chanBucket has both old and new logs, + // which happens when the migration is ongoing. + case4 := &channelTestCase{ + name: "unfinished migration", + setup: func(db kvdb.Backend) error { + return createNotFinished(db, c, true) + }, + asserter: func(t *testing.T, db kvdb.Backend) { + // Check that the old bucket is removed. + assertOldLogBucketDeleted(t, db, c) + + // Fetch the new log. We should see + // OurOutputIndex matching the testOurIndex + // value, indicating that for migrated logs we + // won't touch them. + // + // NOTE: when the log is created before + // migration, OurOutputIndex would be + // testOurIndex rather than OutputIndexEmpty. + l := fetchNewLog(t, db, c, logHeight1) + require.EqualValues( + t, testOurIndex, l.OurOutputIndex, + "expected log to be NOT overwritten", + ) + + // We expect to have one new log. + l = fetchNewLog(t, db, c, logHeight2) + assertRevocationLog(t, newLog2, l) + }, + unfinished: true, + } + + // case5 defines a case when the chanBucket has no new logs, which + // happens when we haven't migrated anything for this bucket yet. + case5 := &channelTestCase{ + name: "initial migration", + setup: func(db kvdb.Backend) error { + return createNotStarted(db, c, true) + }, + asserter: assertNewLogs, + unfinished: true, + } + + // Check that the already migrated logs are overwritten. For two + // channels sorted and stored in boltdb, when the first channel has + // unfinished migrations, even channel two has migrated logs, they will + // be overwritten to make sure the data stay consistent. + if overwrite { + case3.name += " overwritten" + case3.asserter = assertNewLogs + + case4.name += " overwritten" + case4.asserter = assertNewLogs + } + + return []*channelTestCase{case1, case2, case3, case4, case5} +} + +// testCase defines a case for a particular db state that we want to test based +// on whether we have one or two peers, one or two channels for each peer, and +// the particular state for each channel. +type testCase struct { + // name has the format: peer: [channel state]. + name string + + // setups is a list of setup functions we'd run sequentially to provide + // the initial db state. + setups []beforeMigrationFunc + + // asserters is a list of assertions we'd perform after the migration + // function has been called. + asserters []afterMigrationFunc + + // unfinished specifies that the test case is testing a case where the + // revocation migration is considered unfinished. This is useful if + // it's used to construct a larger test case where there's a following + // case with a state of finished, we can then test that the revocation + // logs are overwritten even if the state says finished. + unfinished bool +} + +// buildPeerCases builds test cases based on whether we have one or two +// channels saved under this peer. When there's one channel, we have 5 states, +// and when there are two, we have 25 states, a total of 30 cases. +func buildPeerCases(c1, c2 *mig26.OpenChannel, unfinished bool) []*testCase { + testCases := make([]*testCase, 0) + + // Single peer with one channel. + for _, c := range buildChannelCases(c1, unfinished) { + name := fmt.Sprintf("[channel: %s]", c.name) + tc := &testCase{ + name: name, + setups: []beforeMigrationFunc{c.setup}, + asserters: []afterMigrationFunc{c.asserter}, + unfinished: c.unfinished, + } + testCases = append(testCases, tc) + } + + // Single peer with two channels. + testCases = append( + testCases, buildTwoChannelCases(c1, c2, unfinished)..., + ) + + return testCases +} + +// buildTwoChannelCases takes two channels to build test cases that covers all +// combinations of the two channels' state. Since each channel has 5 states, +// this will give us a total 25 states. +func buildTwoChannelCases(c1, c2 *mig26.OpenChannel, + unfinished bool) []*testCase { + + testCases := make([]*testCase, 0) + + // buildCase is a helper closure that contructs a test case based on + // the two smaller test cases. + buildCase := func(tc1, tc2 *channelTestCase) { + setups := make([]beforeMigrationFunc, 0) + setups = append(setups, tc1.setup) + setups = append(setups, tc2.setup) + + asserters := make([]afterMigrationFunc, 0) + asserters = append(asserters, tc1.asserter) + asserters = append(asserters, tc2.asserter) + + // If any of the test cases has unfinished state, the test case + // would have a state of unfinished, indicating any peers after + // this one must overwrite their revocation logs. + unfinished := tc1.unfinished || tc2.unfinished + + name := fmt.Sprintf("[channelOne: %s] [channelTwo: %s]", + tc1.name, tc2.name) + + tc := &testCase{ + name: name, + setups: setups, + asserters: asserters, + unfinished: unfinished, + } + testCases = append(testCases, tc) + } + + // Build channel cases for both of the channels and combine them. + for _, tc1 := range buildChannelCases(c1, unfinished) { + // The second channel's already migrated logs will be + // overwritten if the first channel has unfinished state, which + // are case4 and case5. + unfinished := unfinished || tc1.unfinished + for _, tc2 := range buildChannelCases(c2, unfinished) { + buildCase(tc1, tc2) + } + } + + return testCases +} + +// assertOldLogBucketDeleted asserts that the given channel's old revocation +// log bucket doesn't exist. +func assertOldLogBucketDeleted(t testing.TB, cdb kvdb.Backend, + c *mig26.OpenChannel) { + + var logBucket kvdb.RBucket + err := kvdb.Update(cdb, func(tx kvdb.RwTx) error { + chanBucket, err := mig25.FetchChanBucket(tx, &c.OpenChannel) + if err != nil { + return err + } + + logBucket = chanBucket.NestedReadBucket( + revocationLogBucketDeprecated, + ) + return err + }, func() {}) + + require.NoError(t, err, "read bucket failed") + require.Nil(t, logBucket, "expected old bucket to be deleted") +} + +// fetchNewLog asserts a revocation log can be found using the given updateNum +// for the specified channel. +func fetchNewLog(t testing.TB, cdb kvdb.Backend, + c *mig26.OpenChannel, updateNum uint64) RevocationLog { + + var newLog RevocationLog + err := kvdb.Update(cdb, func(tx kvdb.RwTx) error { + chanBucket, err := mig25.FetchChanBucket(tx, &c.OpenChannel) + if err != nil { + return err + } + + logBucket, err := fetchLogBucket(chanBucket) + if err != nil { + return err + } + + newLog, err = fetchRevocationLog(logBucket, updateNum) + return err + }, func() {}) + + require.NoError(t, err, "failed to query revocation log") + + return newLog +} + +// assertRevocationLog asserts two revocation logs are equal. +func assertRevocationLog(t testing.TB, want, got RevocationLog) { + require.Equal(t, want.OurOutputIndex, got.OurOutputIndex, + "wrong OurOutputIndex") + require.Equal(t, want.TheirOutputIndex, got.TheirOutputIndex, + "wrong TheirOutputIndex") + require.Equal(t, want.CommitTxHash, got.CommitTxHash, + "wrong CommitTxHash") + require.Equal(t, len(want.HTLCEntries), len(got.HTLCEntries), + "wrong HTLCEntries length") + + for i, expectedHTLC := range want.HTLCEntries { + htlc := got.HTLCEntries[i] + require.Equal(t, expectedHTLC.Amt, htlc.Amt, "wrong Amt") + require.Equal(t, expectedHTLC.RHash, htlc.RHash, "wrong RHash") + require.Equal(t, expectedHTLC.Incoming, htlc.Incoming, + "wrong Incoming") + require.Equal(t, expectedHTLC.OutputIndex, htlc.OutputIndex, + "wrong OutputIndex") + require.Equal(t, expectedHTLC.RefundTimeout, htlc.RefundTimeout, + "wrong RefundTimeout") + } +} + +// BenchmarkMigration creates a benchmark test for the migration. The test uses +// the flag `-benchtime` to specify how many revocation logs we want to test. +func BenchmarkMigration(b *testing.B) { + // Stop the timer and start it again later when the actual migration + // starts. + b.StopTimer() + + // Gather number of records by reading `-benchtime` flag. + numLogs := b.N + + // Create a mock store. + mockStore := &mockStore{} + mockStore.On("AddNextEntry", mock.Anything).Return(nil) + mockStore.On("Encode", mock.Anything).Return(nil) + + // Build the test data. + oldLogs := make([]mig.ChannelCommitment, numLogs) + beforeMigration := func(db kvdb.Backend) error { + fmt.Printf("\nBuilding test data for %d logs...\n", numLogs) + defer fmt.Println("Finished building test data, migrating...") + + // We use a mock store here to bypass the check in + // `AddNextEntry` so we don't need a "read" preimage here. This + // shouldn't affect our benchmark result as the migration will + // load the actual store from db. + c := createTestChannel(nil) + c.RevocationStore = mockStore + + // Create the test logs. + for i := 0; i < numLogs; i++ { + oldLog := oldLog2 + oldLog.CommitHeight = uint64(i) + oldLogs[i] = oldLog + } + + return setupTestLogs(db, c, oldLogs, nil) + } + + // Run the migration test. + migtest.ApplyMigrationWithDb( + b, + beforeMigration, + nil, + func(db kvdb.Backend) error { + b.StartTimer() + defer b.StopTimer() + + return MigrateRevocationLog(db) + }, + ) +} diff --git a/channeldb/migration30/revocation_log.go b/channeldb/migration30/revocation_log.go new file mode 100644 index 0000000000..e220131dd3 --- /dev/null +++ b/channeldb/migration30/revocation_log.go @@ -0,0 +1,551 @@ +package migration30 + +import ( + "bytes" + "errors" + "io" + "math" + + mig24 "github.com/lightningnetwork/lnd/channeldb/migration24" + mig25 "github.com/lightningnetwork/lnd/channeldb/migration25" + mig26 "github.com/lightningnetwork/lnd/channeldb/migration26" + mig "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11" + + "github.com/btcsuite/btcd/btcutil" + "github.com/lightningnetwork/lnd/kvdb" + "github.com/lightningnetwork/lnd/lntypes" + "github.com/lightningnetwork/lnd/tlv" +) + +// OutputIndexEmpty is used when the output index doesn't exist. +const OutputIndexEmpty = math.MaxUint16 + +var ( + // revocationLogBucketDeprecated is dedicated for storing the necessary + // delta state between channel updates required to re-construct a past + // state in order to punish a counterparty attempting a non-cooperative + // channel closure. This key should be accessed from within the + // sub-bucket of a target channel, identified by its channel point. + // + // Deprecated: This bucket is kept for read-only in case the user + // choose not to migrate the old data. + revocationLogBucketDeprecated = []byte("revocation-log-key") + + // revocationLogBucket is a sub-bucket under openChannelBucket. This + // sub-bucket is dedicated for storing the minimal info required to + // re-construct a past state in order to punish a counterparty + // attempting a non-cooperative channel closure. + revocationLogBucket = []byte("revocation-log") + + // revocationStateKey stores their current revocation hash, our + // preimage producer and their preimage store. + revocationStateKey = []byte("revocation-state-key") + + // ErrNoRevocationsFound is returned when revocation state for a + // particular channel cannot be found. + ErrNoRevocationsFound = errors.New("no revocations found") + + // ErrLogEntryNotFound is returned when we cannot find a log entry at + // the height requested in the revocation log. + ErrLogEntryNotFound = errors.New("log entry not found") + + // ErrOutputIndexTooBig is returned when the output index is greater + // than uint16. + ErrOutputIndexTooBig = errors.New("output index is over uint16") +) + +// HTLCEntry specifies the minimal info needed to be stored on disk for ALL the +// historical HTLCs, which is useful for constructing RevocationLog when a +// breach is detected. +// The actual size of each HTLCEntry varies based on its RHash and Amt(sat), +// summarized as follows, +// +// | RHash empty | Amt<=252 | Amt<=65,535 | Amt<=4,294,967,295 | otherwise | +// |:-----------:|:--------:|:-----------:|:------------------:|:---------:| +// | true | 19 | 21 | 23 | 26 | +// | false | 51 | 53 | 55 | 58 | +// +// So the size varies from 19 bytes to 58 bytes, where most likely to be 23 or +// 55 bytes. +// +// NOTE: all the fields saved to disk use the primitive go types so they can be +// made into tlv records without further conversion. +type HTLCEntry struct { + // RHash is the payment hash of the HTLC. + RHash [32]byte + + // RefundTimeout is the absolute timeout on the HTLC that the sender + // must wait before reclaiming the funds in limbo. + RefundTimeout uint32 + + // OutputIndex is the output index for this particular HTLC output + // within the commitment transaction. + // + // NOTE: we use uint16 instead of int32 here to save us 2 bytes, which + // gives us a max number of HTLCs of 65K. + OutputIndex uint16 + + // Incoming denotes whether we're the receiver or the sender of this + // HTLC. + // + // NOTE: this field is the memory representation of the field + // incomingUint. + Incoming bool + + // Amt is the amount of satoshis this HTLC escrows. + // + // NOTE: this field is the memory representation of the field amtUint. + Amt btcutil.Amount + + // amtTlv is the uint64 format of Amt. This field is created so we can + // easily make it into a tlv record and save it to disk. + // + // NOTE: we keep this field for accounting purpose only. If the disk + // space becomes an issue, we could delete this field to save us extra + // 8 bytes. + amtTlv uint64 + + // incomingTlv is the uint8 format of Incoming. This field is created + // so we can easily make it into a tlv record and save it to disk. + incomingTlv uint8 +} + +// RHashLen is used by MakeDynamicRecord to return the size of the RHash. +// +// NOTE: for zero hash, we return a length 0. +func (h *HTLCEntry) RHashLen() uint64 { + if h.RHash == lntypes.ZeroHash { + return 0 + } + return 32 +} + +// RHashEncoder is the customized encoder which skips encoding the empty hash. +func RHashEncoder(w io.Writer, val interface{}, buf *[8]byte) error { + v, ok := val.(*[32]byte) + if !ok { + return tlv.NewTypeForEncodingErr(val, "RHash") + } + + // If the value is an empty hash, we will skip encoding it. + if *v == lntypes.ZeroHash { + return nil + } + + return tlv.EBytes32(w, v, buf) +} + +// RHashDecoder is the customized decoder which skips decoding the empty hash. +func RHashDecoder(r io.Reader, val interface{}, buf *[8]byte, l uint64) error { + v, ok := val.(*[32]byte) + if !ok { + return tlv.NewTypeForEncodingErr(val, "RHash") + } + + // If the length is zero, we will skip encoding the empty hash. + if l == 0 { + return nil + } + + return tlv.DBytes32(r, v, buf, 32) +} + +// toTlvStream converts an HTLCEntry record into a tlv representation. +func (h *HTLCEntry) toTlvStream() (*tlv.Stream, error) { + const ( + // A set of tlv type definitions used to serialize htlc entries + // to the database. We define it here instead of the head of + // the file to avoid naming conflicts. + // + // NOTE: A migration should be added whenever this list + // changes. + rHashType tlv.Type = 0 + refundTimeoutType tlv.Type = 1 + outputIndexType tlv.Type = 2 + incomingType tlv.Type = 3 + amtType tlv.Type = 4 + ) + + return tlv.NewStream( + tlv.MakeDynamicRecord( + rHashType, &h.RHash, h.RHashLen, + RHashEncoder, RHashDecoder, + ), + tlv.MakePrimitiveRecord( + refundTimeoutType, &h.RefundTimeout, + ), + tlv.MakePrimitiveRecord( + outputIndexType, &h.OutputIndex, + ), + tlv.MakePrimitiveRecord(incomingType, &h.incomingTlv), + // We will save 3 bytes if the amount is less or equal to + // 4,294,967,295 msat, or roughly 0.043 bitcoin. + tlv.MakeBigSizeRecord(amtType, &h.amtTlv), + ) +} + +// RevocationLog stores the info needed to construct a breach retribution. Its +// fields can be viewed as a subset of a ChannelCommitment's. In the database, +// all historical versions of the RevocationLog are saved using the +// CommitHeight as the key. +// +// NOTE: all the fields use the primitive go types so they can be made into tlv +// records without further conversion. +type RevocationLog struct { + // OurOutputIndex specifies our output index in this commitment. In a + // remote commitment transaction, this is the to remote output index. + OurOutputIndex uint16 + + // TheirOutputIndex specifies their output index in this commitment. In + // a remote commitment transaction, this is the to local output index. + TheirOutputIndex uint16 + + // CommitTxHash is the hash of the latest version of the commitment + // state, broadcast able by us. + CommitTxHash [32]byte + + // HTLCEntries is the set of HTLCEntry's that are pending at this + // particular commitment height. + HTLCEntries []*HTLCEntry +} + +// toTlvStream converts an RevocationLog record into a tlv representation. +func (rl *RevocationLog) toTlvStream() (*tlv.Stream, error) { + const ( + // A set of tlv type definitions used to serialize the body of + // revocation logs to the database. We define it here instead + // of the head of the file to avoid naming conflicts. + // + // NOTE: A migration should be added whenever this list + // changes. + ourOutputIndexType tlv.Type = 0 + theirOutputIndexType tlv.Type = 1 + commitTxHashType tlv.Type = 2 + ) + + return tlv.NewStream( + tlv.MakePrimitiveRecord(ourOutputIndexType, &rl.OurOutputIndex), + tlv.MakePrimitiveRecord( + theirOutputIndexType, &rl.TheirOutputIndex, + ), + tlv.MakePrimitiveRecord(commitTxHashType, &rl.CommitTxHash), + ) +} + +// putRevocationLog uses the fields `CommitTx` and `Htlcs` from a +// ChannelCommitment to construct a revocation log entry and saves them to +// disk. It also saves our output index and their output index, which are +// useful when creating breach retribution. +func putRevocationLog(bucket kvdb.RwBucket, commit *mig.ChannelCommitment, + ourOutputIndex, theirOutputIndex uint32) error { + + // Sanity check that the output indexes can be safely converted. + if ourOutputIndex > math.MaxUint16 { + return ErrOutputIndexTooBig + } + if theirOutputIndex > math.MaxUint16 { + return ErrOutputIndexTooBig + } + + rl := &RevocationLog{ + OurOutputIndex: uint16(ourOutputIndex), + TheirOutputIndex: uint16(theirOutputIndex), + CommitTxHash: commit.CommitTx.TxHash(), + HTLCEntries: make([]*HTLCEntry, 0, len(commit.Htlcs)), + } + + for _, htlc := range commit.Htlcs { + // Skip dust HTLCs. + if htlc.OutputIndex < 0 { + continue + } + + // Sanity check that the output indexes can be safely + // converted. + if htlc.OutputIndex > math.MaxUint16 { + return ErrOutputIndexTooBig + } + + entry := &HTLCEntry{ + RHash: htlc.RHash, + RefundTimeout: htlc.RefundTimeout, + Incoming: htlc.Incoming, + OutputIndex: uint16(htlc.OutputIndex), + Amt: htlc.Amt.ToSatoshis(), + } + rl.HTLCEntries = append(rl.HTLCEntries, entry) + } + + var b bytes.Buffer + err := serializeRevocationLog(&b, rl) + if err != nil { + return err + } + + logEntrykey := mig24.MakeLogKey(commit.CommitHeight) + return bucket.Put(logEntrykey[:], b.Bytes()) +} + +// fetchRevocationLog queries the revocation log bucket to find an log entry. +// Return an error if not found. +func fetchRevocationLog(log kvdb.RBucket, + updateNum uint64) (RevocationLog, error) { + + logEntrykey := mig24.MakeLogKey(updateNum) + commitBytes := log.Get(logEntrykey[:]) + if commitBytes == nil { + return RevocationLog{}, ErrLogEntryNotFound + } + + commitReader := bytes.NewReader(commitBytes) + + return deserializeRevocationLog(commitReader) +} + +// serializeRevocationLog serializes a RevocationLog record based on tlv +// format. +func serializeRevocationLog(w io.Writer, rl *RevocationLog) error { + // Create the tlv stream. + tlvStream, err := rl.toTlvStream() + if err != nil { + return err + } + + // Write the tlv stream. + if err := writeTlvStream(w, tlvStream); err != nil { + return err + } + + // Write the HTLCs. + return serializeHTLCEntries(w, rl.HTLCEntries) +} + +// serializeHTLCEntries serializes a list of HTLCEntry records based on tlv +// format. +func serializeHTLCEntries(w io.Writer, htlcs []*HTLCEntry) error { + for _, htlc := range htlcs { + // Patch the incomingTlv field. + if htlc.Incoming { + htlc.incomingTlv = 1 + } + + // Patch the amtTlv field. + htlc.amtTlv = uint64(htlc.Amt) + + // Create the tlv stream. + tlvStream, err := htlc.toTlvStream() + if err != nil { + return err + } + + // Write the tlv stream. + if err := writeTlvStream(w, tlvStream); err != nil { + return err + } + } + + return nil +} + +// deserializeRevocationLog deserializes a RevocationLog based on tlv format. +func deserializeRevocationLog(r io.Reader) (RevocationLog, error) { + var rl RevocationLog + + // Create the tlv stream. + tlvStream, err := rl.toTlvStream() + if err != nil { + return rl, err + } + + // Read the tlv stream. + if err := readTlvStream(r, tlvStream); err != nil { + return rl, err + } + + // Read the HTLC entries. + rl.HTLCEntries, err = deserializeHTLCEntries(r) + + return rl, err +} + +// deserializeHTLCEntries deserializes a list of HTLC entries based on tlv +// format. +func deserializeHTLCEntries(r io.Reader) ([]*HTLCEntry, error) { + var htlcs []*HTLCEntry + + for { + var htlc HTLCEntry + + // Create the tlv stream. + tlvStream, err := htlc.toTlvStream() + if err != nil { + return nil, err + } + + // Read the HTLC entry. + if err := readTlvStream(r, tlvStream); err != nil { + // We've reached the end when hitting an EOF. + if err == io.ErrUnexpectedEOF { + break + } + return nil, err + } + + // Patch the Incoming field. + if htlc.incomingTlv == 1 { + htlc.Incoming = true + } + + // Patch the Amt field. + htlc.Amt = btcutil.Amount(htlc.amtTlv) + + // Append the entry. + htlcs = append(htlcs, &htlc) + } + + return htlcs, nil +} + +// writeTlvStream is a helper function that encodes the tlv stream into the +// writer. +func writeTlvStream(w io.Writer, s *tlv.Stream) error { + var b bytes.Buffer + if err := s.Encode(&b); err != nil { + return err + } + // Write the stream's length as a varint. + err := tlv.WriteVarInt(w, uint64(b.Len()), &[8]byte{}) + if err != nil { + return err + } + + if _, err = w.Write(b.Bytes()); err != nil { + return err + } + + return nil +} + +// readTlvStream is a helper function that decodes the tlv stream from the +// reader. +func readTlvStream(r io.Reader, s *tlv.Stream) error { + var bodyLen uint64 + + // Read the stream's length. + bodyLen, err := tlv.ReadVarInt(r, &[8]byte{}) + switch { + // We'll convert any EOFs to ErrUnexpectedEOF, since this results in an + // invalid record. + case err == io.EOF: + return io.ErrUnexpectedEOF + + // Other unexpected errors. + case err != nil: + return err + } + + // TODO(yy): add overflow check. + lr := io.LimitReader(r, int64(bodyLen)) + return s.Decode(lr) +} + +// fetchLogBucket returns a read bucket by visiting both the new and the old +// bucket. +func fetchLogBucket(chanBucket kvdb.RBucket) (kvdb.RBucket, error) { + logBucket := chanBucket.NestedReadBucket(revocationLogBucket) + if logBucket == nil { + logBucket = chanBucket.NestedReadBucket( + revocationLogBucketDeprecated, + ) + if logBucket == nil { + return nil, mig25.ErrNoPastDeltas + } + } + + return logBucket, nil +} + +// putOldRevocationLog saves a revocation log using the old format. +func putOldRevocationLog(log kvdb.RwBucket, + commit *mig.ChannelCommitment) error { + + var b bytes.Buffer + if err := mig.SerializeChanCommit(&b, commit); err != nil { + return err + } + + logEntrykey := mig24.MakeLogKey(commit.CommitHeight) + return log.Put(logEntrykey[:], b.Bytes()) +} + +func putChanRevocationState(chanBucket kvdb.RwBucket, + channel *mig26.OpenChannel) error { + + var b bytes.Buffer + err := mig.WriteElements( + &b, channel.RemoteCurrentRevocation, channel.RevocationProducer, + channel.RevocationStore, + ) + if err != nil { + return err + } + + // TODO(roasbeef): don't keep producer on disk + + // If the next revocation is present, which is only the case after the + // FundingLocked message has been sent, then we'll write it to disk. + if channel.RemoteNextRevocation != nil { + err = mig.WriteElements(&b, channel.RemoteNextRevocation) + if err != nil { + return err + } + } + + return chanBucket.Put(revocationStateKey, b.Bytes()) +} + +func fetchChanRevocationState(chanBucket kvdb.RBucket, + c *mig26.OpenChannel) error { + + revBytes := chanBucket.Get(revocationStateKey) + if revBytes == nil { + return ErrNoRevocationsFound + } + r := bytes.NewReader(revBytes) + + err := mig.ReadElements( + r, &c.RemoteCurrentRevocation, &c.RevocationProducer, + &c.RevocationStore, + ) + if err != nil { + return err + } + + // If there aren't any bytes left in the buffer, then we don't yet have + // the next remote revocation, so we can exit early here. + if r.Len() == 0 { + return nil + } + + // Otherwise we'll read the next revocation for the remote party which + // is always the last item within the buffer. + return mig.ReadElements(r, &c.RemoteNextRevocation) +} + +func findOutputIndexes(chanState *mig26.OpenChannel, + oldLog *mig.ChannelCommitment) (uint32, uint32, error) { + + // With the state number broadcast known, we can now derive/restore the + // proper revocation preimage necessary to sweep the remote party's + // output. + revocationPreimage, err := chanState.RevocationStore.LookUp( + oldLog.CommitHeight, + ) + if err != nil { + return 0, 0, err + } + + return findOutputIndexesFromRemote( + revocationPreimage, chanState, oldLog, + ) +} diff --git a/channeldb/migration30/test_mock.go b/channeldb/migration30/test_mock.go new file mode 100644 index 0000000000..0246ca90d8 --- /dev/null +++ b/channeldb/migration30/test_mock.go @@ -0,0 +1,51 @@ +package migration30 + +import ( + "encoding/binary" + "io" + + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/lightningnetwork/lnd/shachain" + "github.com/stretchr/testify/mock" +) + +// mockStore mocks the shachain.Store. +type mockStore struct { + mock.Mock +} + +// A compile time check to ensure mockStore implements the Store interface. +var _ shachain.Store = (*mockStore)(nil) + +func (m *mockStore) LookUp(height uint64) (*chainhash.Hash, error) { + args := m.Called(height) + + if args.Get(0) == nil { + return nil, args.Error(1) + } + + return args.Get(0).(*chainhash.Hash), args.Error(1) +} + +func (m *mockStore) AddNextEntry(preimage *chainhash.Hash) error { + args := m.Called(preimage) + + return args.Error(0) +} + +// Encode encodes a series of dummy values to pass the serialize/deserialize +// process. +func (m *mockStore) Encode(w io.Writer) error { + err := binary.Write(w, binary.BigEndian, int8(1)) + if err != nil { + return err + } + + if err := binary.Write(w, binary.BigEndian, uint64(0)); err != nil { + return err + } + if _, err = w.Write(preimage2); err != nil { + return err + } + return binary.Write(w, binary.BigEndian, uint64(0)) +} diff --git a/channeldb/migration30/test_utils.go b/channeldb/migration30/test_utils.go new file mode 100644 index 0000000000..c964dfb75b --- /dev/null +++ b/channeldb/migration30/test_utils.go @@ -0,0 +1,554 @@ +package migration30 + +import ( + "bytes" + "fmt" + + "github.com/btcsuite/btcd/btcec/v2" + "github.com/btcsuite/btcd/btcutil" + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" + "github.com/lightningnetwork/lnd/keychain" + "github.com/lightningnetwork/lnd/kvdb" + "github.com/lightningnetwork/lnd/shachain" + + lnwire "github.com/lightningnetwork/lnd/channeldb/migration/lnwire21" + mig25 "github.com/lightningnetwork/lnd/channeldb/migration25" + mig26 "github.com/lightningnetwork/lnd/channeldb/migration26" + mig "github.com/lightningnetwork/lnd/channeldb/migration_01_to_11" +) + +var ( + testChainHash = chainhash.Hash{1, 2, 3} + testChanType = mig25.SingleFunderTweaklessBit + + // testOurIndex and testTheirIndex are artificial indexes that're saved + // to db during test setup. They are different from indexes populated + // from the actual migration process so we can check whether a new + // revocation log is overwritten or not. + testOurIndex = uint32(100) + testTheirIndex = uint32(200) + + // dummyInput is used in our commit tx. + dummyInput = &wire.TxIn{ + PreviousOutPoint: wire.OutPoint{ + Hash: chainhash.Hash{}, + Index: 0xffffffff, + }, + Sequence: 0xffffffff, + } + + // htlcScript is the PkScript used in the HTLC output. This script + // corresponds to revocation preimage2. + htlcScript = []byte{ + 0x0, 0x20, 0x3d, 0x51, 0x66, 0xda, 0x39, 0x93, + 0x7b, 0x49, 0xaf, 0x2, 0xf2, 0x2f, 0x90, 0x52, + 0x8e, 0x45, 0x24, 0x34, 0x8f, 0xd8, 0x76, 0x7, + 0x5a, 0xfc, 0x52, 0x8d, 0x68, 0xdd, 0xbc, 0xce, + 0x3e, 0x5d, + } + + // toLocalScript is the PkScript used in to-local output. + toLocalScript = []byte{ + 0x0, 0x14, 0xc6, 0x9, 0x62, 0xab, 0x60, 0xbe, + 0x40, 0xd, 0xab, 0x31, 0xc, 0x13, 0x14, 0x15, + 0x93, 0xe6, 0xa2, 0x94, 0xe4, 0x2a, + } + + // preimage1 defines a revocation preimage, generated from itest. + preimage1 = []byte{ + 0x95, 0xb4, 0x7c, 0x5a, 0x2b, 0xfd, 0x6f, 0xf4, + 0x70, 0x8, 0xc, 0x70, 0x82, 0x36, 0xc8, 0x5, + 0x88, 0x16, 0xaf, 0x29, 0xb5, 0x8, 0xfd, 0x5a, + 0x40, 0x28, 0x24, 0xc, 0x2a, 0x7f, 0x96, 0xcd, + } + + // commitTx1 is the tx saved in the first old revocation. + commitTx1 = &wire.MsgTx{ + Version: 2, + // Add a dummy input. + TxIn: []*wire.TxIn{dummyInput}, + TxOut: []*wire.TxOut{ + { + Value: 990_950, + PkScript: toLocalScript, + }, + }, + } + + // logHeight1 is the CommitHeight used by oldLog1. + logHeight1 = uint64(0) + + // oldLog1 defines an old revocation that has no HTLCs. + oldLog1 = mig.ChannelCommitment{ + CommitHeight: logHeight1, + LocalLogIndex: 0, + LocalHtlcIndex: 0, + RemoteLogIndex: 0, + RemoteHtlcIndex: 0, + LocalBalance: lnwire.MilliSatoshi(990_950_000), + RemoteBalance: 0, + CommitTx: commitTx1, + } + + // newLog1 is the new version of oldLog1. + newLog1 = RevocationLog{ + OurOutputIndex: 0, + TheirOutputIndex: OutputIndexEmpty, + CommitTxHash: commitTx1.TxHash(), + } + + // preimage2 defines the second revocation preimage used in the test, + // generated from itest. + preimage2 = []byte{ + 0xac, 0x60, 0x7a, 0x59, 0x9, 0xd6, 0x11, 0xb2, + 0xf5, 0x6e, 0xaa, 0xc6, 0xb9, 0x0, 0x12, 0xdc, + 0xf0, 0x89, 0x58, 0x90, 0x8a, 0xa2, 0xc6, 0xfc, + 0xf1, 0x2, 0x74, 0x87, 0x30, 0x51, 0x5e, 0xea, + } + + // commitTx2 is the tx saved in the second old revocation. + commitTx2 = &wire.MsgTx{ + Version: 2, + // Add a dummy input. + TxIn: []*wire.TxIn{dummyInput}, + TxOut: []*wire.TxOut{ + { + Value: 100_000, + PkScript: htlcScript, + }, + { + Value: 888_800, + PkScript: toLocalScript, + }, + }, + } + + // rHash is the payment hash used in the htlc below. + rHash = [32]byte{ + 0x42, 0x5e, 0xd4, 0xe4, 0xa3, 0x6b, 0x30, 0xea, + 0x21, 0xb9, 0xe, 0x21, 0xc7, 0x12, 0xc6, 0x49, + 0xe8, 0x21, 0x4c, 0x29, 0xb7, 0xea, 0xf6, 0x80, + 0x89, 0xd1, 0x3, 0x9c, 0x6e, 0x55, 0x38, 0x4c, + } + + // htlc defines an HTLC that's saved in the old revocation log. + htlc = mig.HTLC{ + RHash: rHash, + Amt: lnwire.MilliSatoshi(100_000_000), + RefundTimeout: 489, + OutputIndex: 0, + Incoming: false, + OnionBlob: bytes.Repeat([]byte{0xff}, 1366), + HtlcIndex: 0, + LogIndex: 0, + } + + // logHeight2 is the CommitHeight used by oldLog2. + logHeight2 = uint64(1) + + // oldLog2 defines an old revocation that has one HTLC. + oldLog2 = mig.ChannelCommitment{ + CommitHeight: logHeight2, + LocalLogIndex: 1, + LocalHtlcIndex: 1, + RemoteLogIndex: 0, + RemoteHtlcIndex: 0, + LocalBalance: lnwire.MilliSatoshi(888_800_000), + RemoteBalance: 0, + CommitTx: commitTx2, + Htlcs: []mig.HTLC{htlc}, + } + + // newLog2 is the new version of the oldLog2. + newLog2 = RevocationLog{ + OurOutputIndex: 1, + TheirOutputIndex: OutputIndexEmpty, + CommitTxHash: commitTx2.TxHash(), + HTLCEntries: []*HTLCEntry{ + { + RHash: rHash, + RefundTimeout: 489, + OutputIndex: 0, + Incoming: false, + Amt: btcutil.Amount(100_000), + }, + }, + } + + // newLog3 defines an revocation log that's been created after v0.15.0. + newLog3 = mig.ChannelCommitment{ + CommitHeight: logHeight2 + 1, + LocalLogIndex: 1, + LocalHtlcIndex: 1, + RemoteLogIndex: 0, + RemoteHtlcIndex: 0, + LocalBalance: lnwire.MilliSatoshi(888_800_000), + RemoteBalance: 0, + CommitTx: commitTx2, + Htlcs: []mig.HTLC{htlc}, + } + + // The following public keys are taken from the itest results. + localMusigKey, _ = btcec.ParsePubKey([]byte{ + 0x2, + 0xda, 0x42, 0xa4, 0x4a, 0x6b, 0x42, 0xfe, 0xcb, + 0x2f, 0x7e, 0x35, 0x89, 0x99, 0xdd, 0x43, 0xba, + 0x4b, 0xf1, 0x9c, 0xf, 0x18, 0xef, 0x9, 0x83, + 0x35, 0x31, 0x59, 0xa4, 0x3b, 0xde, 0xa, 0xde, + }) + localRevocationBasePoint, _ = btcec.ParsePubKey([]byte{ + 0x2, + 0x6, 0x16, 0xd1, 0xb1, 0x4f, 0xee, 0x11, 0x86, + 0x55, 0xfe, 0x31, 0x66, 0x6f, 0x43, 0x1, 0x80, + 0xa8, 0xa7, 0x5c, 0x2, 0x92, 0xe5, 0x7c, 0x4, + 0x31, 0xa6, 0xcf, 0x43, 0xb6, 0xdb, 0xe6, 0x10, + }) + localPaymentBasePoint, _ = btcec.ParsePubKey([]byte{ + 0x2, + 0x88, 0x65, 0x16, 0xc2, 0x37, 0x3f, 0xc5, 0x16, + 0x62, 0x71, 0x0, 0xdd, 0x4d, 0x43, 0x28, 0x43, + 0x32, 0x91, 0x75, 0xcc, 0xd8, 0x81, 0xb6, 0xb0, + 0xd8, 0x96, 0x78, 0xad, 0x18, 0x3b, 0x16, 0xe1, + }) + localDelayBasePoint, _ = btcec.ParsePubKey([]byte{ + 0x2, + 0xea, 0x41, 0x48, 0x11, 0x2, 0x59, 0xe3, 0x5c, + 0x51, 0x15, 0x90, 0x25, 0x4a, 0x61, 0x5, 0x51, + 0xb3, 0x8, 0xe9, 0xd5, 0xf, 0xc6, 0x91, 0x25, + 0x14, 0xd2, 0xcf, 0xc8, 0xc5, 0x5b, 0xd9, 0x88, + }) + localHtlcBasePoint, _ = btcec.ParsePubKey([]byte{ + 0x3, + 0xfa, 0x1f, 0x6, 0x3a, 0xa4, 0x75, 0x2e, 0x74, + 0x3e, 0x55, 0x9, 0x20, 0x6e, 0xf6, 0xa8, 0xe1, + 0xd7, 0x61, 0x50, 0x75, 0xa8, 0x34, 0x15, 0xc3, + 0x6b, 0xdc, 0xb0, 0xbf, 0xaa, 0x66, 0xd7, 0xa7, + }) + + remoteMultiSigKey, _ = btcec.ParsePubKey([]byte{ + 0x2, + 0x2b, 0x88, 0x7c, 0x6a, 0xf8, 0xb3, 0x51, 0x61, + 0xd3, 0x1c, 0xf1, 0xe4, 0x43, 0xc2, 0x8c, 0x5e, + 0xfa, 0x8e, 0xb5, 0xe9, 0xd0, 0x14, 0xb5, 0x33, + 0x6a, 0xcc, 0xd, 0x11, 0x42, 0xb8, 0x4b, 0x7d, + }) + remoteRevocationBasePoint, _ = btcec.ParsePubKey([]byte{ + 0x2, + 0x6c, 0x39, 0xa3, 0x6d, 0x93, 0x69, 0xac, 0x14, + 0x1f, 0xbb, 0x4, 0x86, 0x3, 0x82, 0x5, 0xe2, + 0xcb, 0xb0, 0x62, 0x41, 0xa, 0x93, 0x3, 0x6c, + 0x8d, 0xc0, 0x42, 0x4d, 0x9e, 0x51, 0x9b, 0x36, + }) + remotePaymentBasePoint, _ = btcec.ParsePubKey([]byte{ + 0x3, + 0xab, 0x74, 0x1e, 0x83, 0x48, 0xe3, 0xb5, 0x6, + 0x25, 0x1c, 0x80, 0xe7, 0xf2, 0x3e, 0x7d, 0xb7, + 0x7a, 0xc7, 0xd, 0x6, 0x3b, 0xbc, 0x74, 0x96, + 0x8e, 0x9b, 0x2d, 0xd1, 0x42, 0x71, 0xa5, 0x2a, + }) + remoteDelayBasePoint, _ = btcec.ParsePubKey([]byte{ + 0x2, + 0x4b, 0xdd, 0x52, 0x46, 0x1b, 0x50, 0x89, 0xb9, + 0x49, 0x4, 0xf2, 0xd2, 0x98, 0x7d, 0x51, 0xa1, + 0xa6, 0x3f, 0x9b, 0xd0, 0x40, 0x7c, 0x93, 0x74, + 0x3b, 0x8c, 0x4d, 0x63, 0x32, 0x90, 0xa, 0xca, + }) + remoteHtlcBasePoint, _ = btcec.ParsePubKey([]byte{ + 0x3, + 0x5b, 0x8f, 0x4a, 0x71, 0x4c, 0x2e, 0x71, 0x14, + 0x86, 0x1f, 0x30, 0x96, 0xc0, 0xd4, 0x11, 0x76, + 0xf8, 0xc3, 0xfc, 0x7, 0x2d, 0x15, 0x99, 0x55, + 0x8, 0x69, 0xf6, 0x1, 0xa2, 0xcd, 0x6b, 0xa7, + }) +) + +// setupTestLogs takes care of creating the related buckets and inserts testing +// records. +func setupTestLogs(db kvdb.Backend, c *mig26.OpenChannel, + oldLogs, newLogs []mig.ChannelCommitment) error { + + return kvdb.Update(db, func(tx kvdb.RwTx) error { + // If the open channel is nil, only create the root + // bucket and skip creating the channel bucket. + if c == nil { + _, err := tx.CreateTopLevelBucket(openChannelBucket) + return err + } + + // Create test buckets. + chanBucket, err := mig25.CreateChanBucket(tx, &c.OpenChannel) + if err != nil { + return err + } + + // Save channel info. + if err := mig26.PutChanInfo(chanBucket, c, false); err != nil { + return fmt.Errorf("PutChanInfo got %v", err) + } + + // Save revocation state. + if err := putChanRevocationState(chanBucket, c); err != nil { + return fmt.Errorf("putChanRevocationState got %v", err) + } + + // Create old logs. + err = writeOldRevocationLogs(chanBucket, oldLogs) + if err != nil { + return fmt.Errorf("write old logs: %v", err) + } + + // Create new logs. + return writeNewRevocationLogs(chanBucket, newLogs) + }, func() {}) +} + +// createTestChannel creates an OpenChannel using the specified nodePub and +// outpoint. If any of the params is nil, a random value is populated. +func createTestChannel(nodePub *btcec.PublicKey) *mig26.OpenChannel { + // Create a random private key that's used to provide randomness. + priv, _ := btcec.NewPrivateKey() + + // If passed public key is nil, use the random public key. + if nodePub == nil { + nodePub = priv.PubKey() + } + + // Create a random channel point. + var op wire.OutPoint + copy(op.Hash[:], priv.Serialize()) + + testProducer := shachain.NewRevocationProducer(op.Hash) + store, _ := createTestStore() + + localCfg := mig.ChannelConfig{ + ChannelConstraints: mig.ChannelConstraints{ + DustLimit: btcutil.Amount(354), + MaxAcceptedHtlcs: 483, + CsvDelay: 4, + }, + MultiSigKey: keychain.KeyDescriptor{ + KeyLocator: keychain.KeyLocator{ + Family: 0, + Index: 0, + }, + PubKey: localMusigKey, + }, + RevocationBasePoint: keychain.KeyDescriptor{ + KeyLocator: keychain.KeyLocator{ + Family: 1, + Index: 0, + }, + PubKey: localRevocationBasePoint, + }, + HtlcBasePoint: keychain.KeyDescriptor{ + KeyLocator: keychain.KeyLocator{ + Family: 2, + Index: 0, + }, + PubKey: localHtlcBasePoint, + }, + PaymentBasePoint: keychain.KeyDescriptor{ + KeyLocator: keychain.KeyLocator{ + Family: 3, + Index: 0, + }, + PubKey: localPaymentBasePoint, + }, + DelayBasePoint: keychain.KeyDescriptor{ + KeyLocator: keychain.KeyLocator{ + Family: 4, + Index: 0, + }, + PubKey: localDelayBasePoint, + }, + } + + remoteCfg := mig.ChannelConfig{ + ChannelConstraints: mig.ChannelConstraints{ + DustLimit: btcutil.Amount(354), + MaxAcceptedHtlcs: 483, + CsvDelay: 4, + }, + MultiSigKey: keychain.KeyDescriptor{ + KeyLocator: keychain.KeyLocator{ + Family: 0, + Index: 0, + }, + PubKey: remoteMultiSigKey, + }, + RevocationBasePoint: keychain.KeyDescriptor{ + KeyLocator: keychain.KeyLocator{ + Family: 0, + Index: 0, + }, + PubKey: remoteRevocationBasePoint, + }, + HtlcBasePoint: keychain.KeyDescriptor{ + KeyLocator: keychain.KeyLocator{ + Family: 0, + Index: 0, + }, + PubKey: remoteHtlcBasePoint, + }, + PaymentBasePoint: keychain.KeyDescriptor{ + KeyLocator: keychain.KeyLocator{ + Family: 0, + Index: 0, + }, + PubKey: remotePaymentBasePoint, + }, + DelayBasePoint: keychain.KeyDescriptor{ + KeyLocator: keychain.KeyLocator{ + Family: 0, + Index: 0, + }, + PubKey: remoteDelayBasePoint, + }, + } + + c := &mig26.OpenChannel{ + OpenChannel: mig25.OpenChannel{ + OpenChannel: mig.OpenChannel{ + ChainHash: testChainHash, + IdentityPub: nodePub, + FundingOutpoint: op, + LocalChanCfg: localCfg, + RemoteChanCfg: remoteCfg, + // Assign dummy values. + RemoteCurrentRevocation: nodePub, + RevocationProducer: testProducer, + RevocationStore: store, + }, + ChanType: testChanType, + }, + } + + return c +} + +// writeOldRevocationLogs saves an old revocation log to db. +func writeOldRevocationLogs(chanBucket kvdb.RwBucket, + oldLogs []mig.ChannelCommitment) error { + + // Don't bother continue if the logs are empty. + if len(oldLogs) == 0 { + return nil + } + + logBucket, err := chanBucket.CreateBucketIfNotExists( + revocationLogBucketDeprecated, + ) + if err != nil { + return err + } + + for _, c := range oldLogs { + if err := putOldRevocationLog(logBucket, &c); err != nil { + return err + } + } + return nil +} + +// writeNewRevocationLogs saves a new revocation log to db. +func writeNewRevocationLogs(chanBucket kvdb.RwBucket, + oldLogs []mig.ChannelCommitment) error { + + // Don't bother continue if the logs are empty. + if len(oldLogs) == 0 { + return nil + } + + logBucket, err := chanBucket.CreateBucketIfNotExists( + revocationLogBucket, + ) + if err != nil { + return err + } + + for _, c := range oldLogs { + // NOTE: we just blindly write the output indexes to db here + // whereas normally, we would find the correct indexes from the + // old commit tx. We do this intentionally so we can + // distinguish a newly created log from an already saved one. + err := putRevocationLog( + logBucket, &c, testOurIndex, testTheirIndex, + ) + if err != nil { + return err + } + } + return nil +} + +// createTestStore creates a revocation store and always saves the above +// defined two preimages into the store. +func createTestStore() (shachain.Store, error) { + var p chainhash.Hash + copy(p[:], preimage1) + + testStore := shachain.NewRevocationStore() + if err := testStore.AddNextEntry(&p); err != nil { + return nil, err + } + + copy(p[:], preimage2) + if err := testStore.AddNextEntry(&p); err != nil { + return nil, err + } + + return testStore, nil +} + +// createNotStarted will setup a situation where we haven't started the +// migration for the channel. We use the legacy to denote whether to simulate a +// node with v0.15.0. +func createNotStarted(cdb kvdb.Backend, c *mig26.OpenChannel, + legacy bool) error { + + var newLogs []mig.ChannelCommitment + + // Create test logs. + oldLogs := []mig.ChannelCommitment{oldLog1, oldLog2} + + // Add a new log if the node is running with v0.15.0. + if !legacy { + newLogs = []mig.ChannelCommitment{newLog3} + } + return setupTestLogs(cdb, c, oldLogs, newLogs) +} + +// createNotFinished will setup a situation where we have un-migrated logs and +// return the next migration height. We use the legacy to denote whether to +// simulate a node with v0.15.0. +func createNotFinished(cdb kvdb.Backend, c *mig26.OpenChannel, + legacy bool) error { + + // Create test logs. + oldLogs := []mig.ChannelCommitment{oldLog1, oldLog2} + newLogs := []mig.ChannelCommitment{oldLog1} + + // Add a new log if the node is running with v0.15.0. + if !legacy { + newLogs = append(newLogs, newLog3) + } + return setupTestLogs(cdb, c, oldLogs, newLogs) +} + +// createFinished will setup a situation where all the old logs have been +// migrated and return a nil. We use the legacy to denote whether to simulate a +// node with v0.15.0. +func createFinished(cdb kvdb.Backend, c *mig26.OpenChannel, + legacy bool) error { + + // Create test logs. + oldLogs := []mig.ChannelCommitment{oldLog1, oldLog2} + newLogs := []mig.ChannelCommitment{oldLog1, oldLog2} + + // Add a new log if the node is running with v0.15.0. + if !legacy { + newLogs = append(newLogs, newLog3) + } + return setupTestLogs(cdb, c, oldLogs, newLogs) +} diff --git a/channeldb/migtest/migtest.go b/channeldb/migtest/migtest.go index ba723fa7d9..3173128422 100644 --- a/channeldb/migtest/migtest.go +++ b/channeldb/migtest/migtest.go @@ -84,6 +84,45 @@ func ApplyMigration(t *testing.T, } } +// ApplyMigrationWithDb is a helper test function that encapsulates the general +// steps which are needed to properly check the result of applying migration +// function. This function differs from ApplyMigration as it requires the +// supplied migration functions to take a db instance and construct their own +// database transactions. +func ApplyMigrationWithDb(t testing.TB, beforeMigration, afterMigration, + migrationFunc func(db kvdb.Backend) error) { + + t.Helper() + + cdb, cleanUp, err := MakeDB() + defer cleanUp() + if err != nil { + t.Fatal(err) + } + + // beforeMigration usually used for populating the database + // with test data. + if err := beforeMigration(cdb); err != nil { + t.Fatalf("beforeMigration error: %v", err) + } + + // Apply migration. + if err := migrationFunc(cdb); err != nil { + t.Fatalf("migrationFunc error: %v", err) + } + + // If there's no afterMigration, exit here. + if afterMigration == nil { + return + } + + // afterMigration usually used for checking the database state + // and throwing the error if something went wrong. + if err := afterMigration(cdb); err != nil { + t.Fatalf("afterMigration error: %v", err) + } +} + func newError(e interface{}) error { var err error switch e := e.(type) { diff --git a/channeldb/options.go b/channeldb/options.go index 7e121d2545..a3af349d29 100644 --- a/channeldb/options.go +++ b/channeldb/options.go @@ -25,9 +25,18 @@ const ( DefaultPreAllocCacheNumNodes = 15000 ) +// OptionalMiragtionConfig defines the flags used to signal whether a +// particular migration needs to be applied. +type OptionalMiragtionConfig struct { + // PruneRevocationLog specifies that the revocation log migration needs + // to be applied. + PruneRevocationLog bool +} + // Options holds parameters for tuning and customizing a channeldb.DB. type Options struct { kvdb.BoltBackendConfig + OptionalMiragtionConfig // RejectCacheSize is the maximum number of rejectCacheEntries to hold // in the rejection cache. @@ -76,12 +85,13 @@ func DefaultOptions() Options { AutoCompactMinAge: kvdb.DefaultBoltAutoCompactMinAge, DBTimeout: kvdb.DefaultDBTimeout, }, - RejectCacheSize: DefaultRejectCacheSize, - ChannelCacheSize: DefaultChannelCacheSize, - PreAllocCacheNumNodes: DefaultPreAllocCacheNumNodes, - UseGraphCache: true, - NoMigration: false, - clock: clock.NewDefaultClock(), + OptionalMiragtionConfig: OptionalMiragtionConfig{}, + RejectCacheSize: DefaultRejectCacheSize, + ChannelCacheSize: DefaultChannelCacheSize, + PreAllocCacheNumNodes: DefaultPreAllocCacheNumNodes, + UseGraphCache: true, + NoMigration: false, + clock: clock.NewDefaultClock(), } } @@ -176,3 +186,11 @@ func OptionKeepFailedPaymentAttempts(keepFailedPaymentAttempts bool) OptionModif o.keepFailedPaymentAttempts = keepFailedPaymentAttempts } } + +// OptionPruneRevocationLog specifies whether the migration for pruning +// revocation logs needs to be applied or not. +func OptionPruneRevocationLog(prune bool) OptionModifier { + return func(o *Options) { + o.OptionalMiragtionConfig.PruneRevocationLog = prune + } +} diff --git a/config_builder.go b/config_builder.go index d43834e0dd..2148981578 100644 --- a/config_builder.go +++ b/config_builder.go @@ -857,6 +857,7 @@ func (d *DefaultDatabaseBuilder) BuildDatabase( channeldb.OptionDryRunMigration(cfg.DryRunMigration), channeldb.OptionSetUseGraphCache(!cfg.DB.NoGraphCache), channeldb.OptionKeepFailedPaymentAttempts(cfg.KeepFailedPaymentAttempts), + channeldb.OptionPruneRevocationLog(cfg.DB.PruneRevocation), } // We want to pre-allocate the channel graph cache according to what we diff --git a/docs/release-notes/release-notes-0.15.0.md b/docs/release-notes/release-notes-0.15.0.md index e3a29b8fe1..0aec864735 100644 --- a/docs/release-notes/release-notes-0.15.0.md +++ b/docs/release-notes/release-notes-0.15.0.md @@ -454,4 +454,4 @@ gRPC performance metrics (latency to process `GetInfo`, etc)](https://github.com * Torkel Rogstad * Vsevolod Kaganovych * Yong Yu -* Ziggie \ No newline at end of file +* Ziggie diff --git a/docs/release-notes/release-notes-0.15.1.md b/docs/release-notes/release-notes-0.15.1.md index 74c2e8b105..894402a492 100644 --- a/docs/release-notes/release-notes-0.15.1.md +++ b/docs/release-notes/release-notes-0.15.1.md @@ -30,6 +30,23 @@ * [Delete failed payment attempts](https://github.com/lightningnetwork/lnd/pull/6438) once payments are settled, unless specified with `keep-failed-payment-attempts` flag. +* [A new db configuration flag + `db.prune-revocation`](https://github.com/lightningnetwork/lnd/pull/6469) is + introduced to take the advantage enabled by [a recent space + optimization](https://github.com/lightningnetwork/lnd/pull/6347). Users can + set this flag to `true` to run an optional db migration during `lnd`'s + startup. This flag will prune the old revocation logs and save them using the + new format that can save large amount of disk space. + For a busy channel with millions of updates, this migration can take quite + some time. The benchmark shows it takes roughly 70 seconds to finish a + migration with 1 million logs. Of course the actual time taken can vary from + machine to machine. Users can run the following benchmark test to get an + accurate time it'll take for a channel with 1 millions updates to plan ahead, + ```sh + cd ./channeldb/migration30 + go test -bench=. -run=TestMigrateRevocationLogMemCap -benchtime=1000000x -timeout=10m -benchmem + ``` + ## Documentation * [Add minor comment](https://github.com/lightningnetwork/lnd/pull/6559) on diff --git a/lncfg/db.go b/lncfg/db.go index 34c7f21374..d60e5609cc 100644 --- a/lncfg/db.go +++ b/lncfg/db.go @@ -61,6 +61,8 @@ type DB struct { Postgres *postgres.Config `group:"postgres" namespace:"postgres" description:"Postgres settings."` NoGraphCache bool `long:"no-graph-cache" description:"Don't use the in-memory graph cache for path finding. Much slower but uses less RAM. Can only be used with a bolt database backend."` + + PruneRevocation bool `long:"prune-revocation" description:"Run the optional migration that prunes the revocation logs to save disk space."` } // DefaultDB creates and returns a new default DB config. diff --git a/sample-lnd.conf b/sample-lnd.conf index 0fd701c668..5904918aba 100644 --- a/sample-lnd.conf +++ b/sample-lnd.conf @@ -1205,6 +1205,12 @@ litecoin.node=ltcd ; less RAM. Can only be used with a bolt database backend. ; db.no-graph-cache=true +; Specify whether the optional migration for pruning old revocation logs +; should be applied. This migration will only save disk space if there are open +; channels prior to lnd@v0.15.0. +; db.prune-revocation=false + + [etcd] ; Etcd database host.