diff --git a/bin/v0.34.x/db/common/types.go b/bin/v0.34.x/db/common/types.go deleted file mode 100644 index 4f4ee2c..0000000 --- a/bin/v0.34.x/db/common/types.go +++ /dev/null @@ -1,131 +0,0 @@ -package common - -// DB defines mantlemint specific db interface, based off tm-db@0.6.4. -// All future tmdb compat versions need to be provided in each version provider -type DB interface { - // Get fetches the value of the given key, or nil if it does not exist. - // CONTRACT: key, value readonly []byte - Get([]byte) ([]byte, error) - - // Has checks if a key exists. - // CONTRACT: key, value readonly []byte - Has(key []byte) (bool, error) - - // Set sets the value for the given key, replacing it if it already exists. - // CONTRACT: key, value readonly []byte - Set([]byte, []byte) error - - // SetSync sets the value for the given key, and flushes it to storage before returning. - SetSync([]byte, []byte) error - - // Delete deletes the key, or does nothing if the key does not exist. - // CONTRACT: key readonly []byte - Delete([]byte) error - - // DeleteSync deletes the key, and flushes the delete to storage before returning. - DeleteSync([]byte) error - - // Iterator returns an iterator over a domain of keys, in ascending order. The caller must call - // Close when done. End is exclusive, and start must be less than end. A nil start iterates - // from the first key, and a nil end iterates to the last key (inclusive). - // CONTRACT: No writes may happen within a domain while an iterator exists over it. - // CONTRACT: start, end readonly []byte - Iterator(start, end []byte) (Iterator, error) - - // ReverseIterator returns an iterator over a domain of keys, in descending order. The caller - // must call Close when done. End is exclusive, and start must be less than end. A nil end - // iterates from the last key (inclusive), and a nil start iterates to the first key (inclusive). - // CONTRACT: No writes may happen within a domain while an iterator exists over it. - // CONTRACT: start, end readonly []byte - ReverseIterator(start, end []byte) (Iterator, error) - - // Close closes the database connection. - Close() error - - // NewBatch creates a batch for atomic updates. The caller must call Batch.Close. - NewBatch() Batch - - // Print is used for debugging. - Print() error - - // Stats returns a map of property values for all keys and the size of the cache. - Stats() map[string]string -} - -// Batch represents a group of writes. They may or may not be written atomically depending on the -// backend. Callers must call Close on the batch when done. -// -// As with DB, given keys and values should be considered read-only, and must not be modified after -// passing them to the batch. -type Batch interface { - // Set sets a key/value pair. - // CONTRACT: key, value readonly []byte - Set(key, value []byte) error - - // Delete deletes a key/value pair. - // CONTRACT: key readonly []byte - Delete(key []byte) error - - // Write writes the batch, possibly without flushing to disk. Only Close() can be called after, - // other methods will error. - Write() error - - // WriteSync writes the batch and flushes it to disk. Only Close() can be called after, other - // methods will error. - WriteSync() error - - // Close closes the batch. It is idempotent, but calls to other methods afterwards will error. - Close() error -} - -// Iterator represents an iterator over a domain of keys. Callers must call Close when done. -// No writes can happen to a domain while there exists an iterator over it, some backends may take -// out database locks to ensure this will not happen. -// -// Callers must make sure the iterator is valid before calling any methods on it, otherwise -// these methods will panic. This is in part caused by most backend databases using this convention. -// -// As with DB, keys and values should be considered read-only, and must be copied before they are -// modified. -// -// Typical usage: -// -// var itr Iterator = ... -// defer itr.Close() -// -// for ; itr.Valid(); itr.Next() { -// k, v := itr.Key(); itr.Value() -// ... -// } -// if err := itr.Error(); err != nil { -// ... -// } -type Iterator interface { - // Domain returns the start (inclusive) and end (exclusive) limits of the iterator. - // CONTRACT: start, end readonly []byte - Domain() (start []byte, end []byte) - - // Valid returns whether the current iterator is valid. Once invalid, the Iterator remains - // invalid forever. - Valid() bool - - // Next moves the iterator to the next key in the database, as defined by order of iteration. - // If Valid returns false, this method will panic. - Next() - - // Key returns the key at the current position. Panics if the iterator is invalid. - // CONTRACT: key readonly []byte - Key() (key []byte) - - // Value returns the value at the current position. Panics if the iterator is invalid. - // CONTRACT: value readonly []byte - Value() (value []byte) - - // Error returns the last error encountered by the iterator, if any. - Error() error - - // Close closes the iterator, relasing any allocated resources. - Close() error -} - - diff --git a/bin/v0.34.x/db/heleveldb/config.go b/bin/v0.34.x/db/heleveldb/config.go new file mode 100644 index 0000000..0cf840e --- /dev/null +++ b/bin/v0.34.x/db/heleveldb/config.go @@ -0,0 +1,7 @@ +package heleveldb + +type DriverConfig struct { + Name string + Dir string + Mode int +} diff --git a/bin/v0.34.x/db/heleveldb/leveldb_batch.go b/bin/v0.34.x/db/heleveldb/leveldb_batch.go new file mode 100644 index 0000000..ccb9f76 --- /dev/null +++ b/bin/v0.34.x/db/heleveldb/leveldb_batch.go @@ -0,0 +1,77 @@ +package heleveldb + +import ( + "math" + + tmdb "github.com/tendermint/tm-db" + "github.com/terra-money/mantlemint-provider-v0.34.x/db/hld" + "github.com/terra-money/mantlemint/lib" +) + +var _ hld.HeightLimitEnabledBatch = (*LevelBatch)(nil) + +type LevelBatch struct { + height int64 + batch tmdb.Batch + mode int +} + +func (b *LevelBatch) keyBytesWithHeight(key []byte) []byte { + if b.mode == DriverModeKeySuffixAsc { + return append(prefixDataWithHeightKey(key), lib.UintToBigEndian(uint64(b.height))...) + } else { + return append(prefixDataWithHeightKey(key), lib.UintToBigEndian(math.MaxUint64-uint64(b.height))...) + } + +} + +func NewLevelDBBatch(atHeight int64, driver *Driver) *LevelBatch { + return &LevelBatch{ + height: atHeight, + batch: driver.session.NewBatch(), + mode: driver.mode, + } +} + +func (b *LevelBatch) Set(key, value []byte) error { + newKey := b.keyBytesWithHeight(key) + + // make fixed size byte slice for performance + buf := make([]byte, 0, len(value)+1) + buf = append(buf, byte(0)) // 0 => not deleted + buf = append(buf, value...) + + if err := b.batch.Set(prefixCurrentDataKey(key), buf[1:]); err != nil { + return err + } + if err := b.batch.Set(prefixKeysForIteratorKey(key), []byte{}); err != nil { + return err + } + return b.batch.Set(newKey, buf) +} + +func (b *LevelBatch) Delete(key []byte) error { + newKey := b.keyBytesWithHeight(key) + + buf := []byte{1} + + if err := b.batch.Delete(prefixCurrentDataKey(key)); err != nil { + return err + } + if err := b.batch.Set(prefixKeysForIteratorKey(key), buf); err != nil { + return err + } + return b.batch.Set(newKey, buf) +} + +func (b *LevelBatch) Write() error { + return b.batch.Write() +} + +func (b *LevelBatch) WriteSync() error { + return b.batch.WriteSync() +} + +func (b *LevelBatch) Close() error { + return b.batch.Close() +} diff --git a/bin/v0.34.x/db/heleveldb/leveldb_driver.go b/bin/v0.34.x/db/heleveldb/leveldb_driver.go new file mode 100644 index 0000000..b999b61 --- /dev/null +++ b/bin/v0.34.x/db/heleveldb/leveldb_driver.go @@ -0,0 +1,155 @@ +package heleveldb + +import ( + "fmt" + "math" + + tmdb "github.com/tendermint/tm-db" + "github.com/terra-money/mantlemint-provider-v0.34.x/db/hld" + "github.com/terra-money/mantlemint/lib" +) + +type Driver struct { + session *tmdb.GoLevelDB + mode int +} + +func NewLevelDBDriver(config *DriverConfig) (*Driver, error) { + ldb, err := tmdb.NewGoLevelDB(config.Name, config.Dir) + if err != nil { + return nil, err + } + + return &Driver{ + session: ldb, + mode: config.Mode, + }, nil +} + +func (d *Driver) newInnerIterator(requestHeight int64, pdb *tmdb.PrefixDB) (tmdb.Iterator, error) { + if d.mode == DriverModeKeySuffixAsc { + heightEnd := lib.UintToBigEndian(uint64(requestHeight + 1)) + return pdb.ReverseIterator(nil, heightEnd) + } else { + heightStart := lib.UintToBigEndian(math.MaxUint64 - uint64(requestHeight)) + return pdb.Iterator(heightStart, nil) + } +} + +func (d *Driver) Get(maxHeight int64, key []byte) ([]byte, error) { + if maxHeight == 0 { + return d.session.Get(prefixCurrentDataKey(key)) + } + var requestHeight = hld.Height(maxHeight).CurrentOrLatest().ToInt64() + var requestHeightMin = hld.Height(0).CurrentOrNever().ToInt64() + + // check if requestHeightMin is + if requestHeightMin > requestHeight { + return nil, fmt.Errorf("invalid height") + } + + pdb := tmdb.NewPrefixDB(d.session, prefixDataWithHeightKey(key)) + + iter, _ := d.newInnerIterator(requestHeight, pdb) + defer iter.Close() + + // in tm-db@v0.6.4, key not found is NOT an error + if !iter.Valid() { + return nil, nil + } + + value := iter.Value() + deleted := value[0] + if deleted == 1 { + return nil, nil + } else { + if len(value) > 1 { + return value[1:], nil + } + return []byte{}, nil + } +} + +func (d *Driver) Has(maxHeight int64, key []byte) (bool, error) { + if maxHeight == 0 { + return d.session.Has(prefixCurrentDataKey(key)) + } + var requestHeight = hld.Height(maxHeight).CurrentOrLatest().ToInt64() + var requestHeightMin = hld.Height(0).CurrentOrNever().ToInt64() + + // check if requestHeightMin is + if requestHeightMin > requestHeight { + return false, fmt.Errorf("invalid height") + } + + pdb := tmdb.NewPrefixDB(d.session, prefixDataWithHeightKey(key)) + + iter, _ := d.newInnerIterator(requestHeight, pdb) + defer iter.Close() + + // in tm-db@v0.6.4, key not found is NOT an error + if !iter.Valid() { + return false, nil + } + + deleted := iter.Value()[0] + + if deleted == 1 { + return false, nil + } else { + return true, nil + } +} + +func (d *Driver) Set(atHeight int64, key, value []byte) error { + // should never reach here, all should be batched in tiered+hld + panic("should never reach here") +} + +func (d *Driver) SetSync(atHeight int64, key, value []byte) error { + // should never reach here, all should be batched in tiered+hld + panic("should never reach here") +} + +func (d *Driver) Delete(atHeight int64, key []byte) error { + // should never reach here, all should be batched in tiered+hld + panic("should never reach here") +} + +func (d *Driver) DeleteSync(atHeight int64, key []byte) error { + return d.Delete(atHeight, key) +} + +func (d *Driver) Iterator(maxHeight int64, start, end []byte) (hld.HeightLimitEnabledIterator, error) { + if maxHeight == 0 { + pdb := tmdb.NewPrefixDB(d.session, cCurrentDataPrefix) + return pdb.Iterator(start, end) + } + return NewLevelDBIterator(d, maxHeight, start, end) +} + +func (d *Driver) ReverseIterator(maxHeight int64, start, end []byte) (hld.HeightLimitEnabledIterator, error) { + if maxHeight == 0 { + pdb := tmdb.NewPrefixDB(d.session, cCurrentDataPrefix) + return pdb.ReverseIterator(start, end) + } + return NewLevelDBReverseIterator(d, maxHeight, start, end) +} + +func (d *Driver) Close() error { + d.session.Close() + return nil +} + +func (d *Driver) NewBatch(atHeight int64) hld.HeightLimitEnabledBatch { + return NewLevelDBBatch(atHeight, d) +} + +// TODO: Implement me +func (d *Driver) Print() error { + return nil +} + +func (d *Driver) Stats() map[string]string { + return nil +} diff --git a/bin/v0.34.x/db/heleveldb/leveldb_iterator.go b/bin/v0.34.x/db/heleveldb/leveldb_iterator.go new file mode 100644 index 0000000..2a02495 --- /dev/null +++ b/bin/v0.34.x/db/heleveldb/leveldb_iterator.go @@ -0,0 +1,92 @@ +package heleveldb + +import ( + "bytes" + + tmdb "github.com/tendermint/tm-db" + "github.com/terra-money/mantlemint-provider-v0.34.x/db/hld" +) + +var _ hld.HeightLimitEnabledIterator = (*Iterator)(nil) + +type Iterator struct { + driver *Driver + tmdb.Iterator + + maxHeight int64 + start []byte + end []byte + + // caching last validated key and value + // since Valid and Value functions are expensive but called repeatedly + lastValidKey []byte + lastValidValue []byte +} + +func NewLevelDBIterator(d *Driver, maxHeight int64, start, end []byte) (*Iterator, error) { + pdb := tmdb.NewPrefixDB(d.session, cKeysForIteratorPrefix) + iter, err := pdb.Iterator(start, end) + if err != nil { + return nil, err + } + + return &Iterator{ + driver: d, + Iterator: iter, + + maxHeight: maxHeight, + start: start, + end: end, + }, nil +} +func NewLevelDBReverseIterator(d *Driver, maxHeight int64, start, end []byte) (*Iterator, error) { + pdb := tmdb.NewPrefixDB(d.session, cKeysForIteratorPrefix) + iter, err := pdb.ReverseIterator(start, end) + if err != nil { + return nil, err + } + + return &Iterator{ + driver: d, + Iterator: iter, + + maxHeight: maxHeight, + start: start, + end: end, + }, nil +} + +func (i *Iterator) Domain() (start []byte, end []byte) { + panic("implement me") +} + +func (i *Iterator) Valid() bool { + // filter out items with Deleted = true + // it should return somewhere during the loop + // otherwise iterator has reached the end without finding any record + // with Delete = false, return false in such case. + + for ; i.Iterator.Valid(); i.Iterator.Next() { + if bytes.Equal(i.lastValidKey, i.Key()) { + return true + } + if val, _ := i.driver.Get(i.maxHeight, i.Key()); val != nil { + i.lastValidKey = i.Key() + i.lastValidValue = val + return true + } + } + return false + +} + +func (i *Iterator) Value() (value []byte) { + if bytes.Equal(i.lastValidKey, i.Key()) { + return i.lastValidValue + } + val, err := i.driver.Get(i.maxHeight, i.Key()) + if err != nil { + panic(err) + } + return val +} diff --git a/bin/v0.34.x/db/heleveldb/types.go b/bin/v0.34.x/db/heleveldb/types.go new file mode 100644 index 0000000..cbe457b --- /dev/null +++ b/bin/v0.34.x/db/heleveldb/types.go @@ -0,0 +1,27 @@ +package heleveldb + +const ( + DriverModeKeySuffixAsc = iota + DriverModeKeySuffixDesc +) + +var ( + cCurrentDataPrefix = []byte{0} + cKeysForIteratorPrefix = []byte{1} + cDataWithHeightPrefix = []byte{2} +) + +func prefixCurrentDataKey(key []byte) []byte { + return append(cCurrentDataPrefix, key...) +} + +func prefixKeysForIteratorKey(key []byte) []byte { + return append(cKeysForIteratorPrefix, key...) +} + +func prefixDataWithHeightKey(key []byte) []byte { + result := make([]byte, 0, len(cDataWithHeightPrefix)+len(key)) + result = append(result, cDataWithHeightPrefix...) + result = append(result, key...) + return result +} diff --git a/bin/v0.34.x/db/hld/height_limited_db.go b/bin/v0.34.x/db/hld/height_limited_db.go index 654a862..9bf7939 100644 --- a/bin/v0.34.x/db/hld/height_limited_db.go +++ b/bin/v0.34.x/db/hld/height_limited_db.go @@ -3,9 +3,11 @@ package hld import ( "bytes" "fmt" - "github.com/terra-money/mantlemint-provider-v0.34.x/db/common" - "github.com/terra-money/mantlemint/lib" "sync" + + "github.com/terra-money/mantlemint/lib" + + tmdb "github.com/tendermint/tm-db" ) const ( @@ -55,6 +57,12 @@ func ApplyHeightLimitedDB(db HeightLimitEnabledDB, config *HeightLimitedDBConfig } } +func (hld *HeightLimitedDB) BranchHeightLimitedDB(height int64) *HeightLimitedDB { + newOne := ApplyHeightLimitedDB(hld.odb, hld.config) + newOne.SetReadHeight(height) + return newOne +} + // SetReadHeight sets a target read height in the db driver. // It acts differently if the db mode is writer or reader: // - Reader uses readHeight as the max height at which the retrieved key/value pair is limited to, @@ -143,7 +151,7 @@ func (hld *HeightLimitedDB) DeleteSync(key []byte) error { // from the first key, and a nil end iterates to the last key (inclusive). // CONTRACT: No writes may happen within a domain while an iterator exists over it. // CONTRACT: start, end readonly []byte -func (hld *HeightLimitedDB) Iterator(start, end []byte) (common.Iterator, error) { +func (hld *HeightLimitedDB) Iterator(start, end []byte) (tmdb.Iterator, error) { if bytes.Compare(start, end) == 0 { return nil, fmt.Errorf("invalid iterator operation; start_store_key=%v, end_store_key=%v", start, end) } @@ -156,12 +164,12 @@ func (hld *HeightLimitedDB) Iterator(start, end []byte) (common.Iterator, error) // iterates from the last key (inclusive), and a nil start iterates to the first key (inclusive). // CONTRACT: No writes may happen within a domain while an iterator exists over it. // CONTRACT: start, end readonly []byte -func (hld *HeightLimitedDB) ReverseIterator(start, end []byte) (common.Iterator, error) { +func (hld *HeightLimitedDB) ReverseIterator(start, end []byte) (tmdb.Iterator, error) { if bytes.Compare(start, end) == 0 { return nil, fmt.Errorf("invalid iterator operation; start_store_key=%v, end_store_key=%v", start, end) } - return hld.odb.Iterator(hld.GetCurrentReadHeight(), start, end) + return hld.odb.ReverseIterator(hld.GetCurrentReadHeight(), start, end) } // Close closes the database connection. @@ -170,7 +178,7 @@ func (hld *HeightLimitedDB) Close() error { } // NewBatch creates a batch for atomic updates. The caller must call Batch.Close. -func (hld *HeightLimitedDB) NewBatch() common.Batch { +func (hld *HeightLimitedDB) NewBatch() tmdb.Batch { // if hld.writeBatch != nil { // // TODO: fix me // return hld.writeBatch @@ -179,11 +187,10 @@ func (hld *HeightLimitedDB) NewBatch() common.Batch { // hld.writeBatch = hld.odb.NewBatch(hld.GetCurrentWriteHeight()) // return hld.writeBatch // } - // + // return hld.odb.NewBatch(hld.GetCurrentWriteHeight()) } - // // func (hld *HeightLimitedDB) FlushBatch() error { // hld.writeBatch diff --git a/bin/v0.34.x/db/hld/height_limited_iterator.go b/bin/v0.34.x/db/hld/height_limited_iterator.go index c516c00..62bca9d 100644 --- a/bin/v0.34.x/db/hld/height_limited_iterator.go +++ b/bin/v0.34.x/db/hld/height_limited_iterator.go @@ -1,17 +1,17 @@ package hld -import "github.com/terra-money/mantlemint-provider-v0.34.x/db/common" +import tmdb "github.com/tendermint/tm-db" -var _ common.Iterator = (*HeightLimitedDBIterator)(nil) +var _ tmdb.Iterator = (*HeightLimitedDBIterator)(nil) type HeightLimitedDBIterator struct { - oit common.Iterator + oit tmdb.Iterator atHeight int64 } -func NewHeightLimitedIterator(atHeight int64, oit common.Iterator) common.Iterator { +func NewHeightLimitedIterator(atHeight int64, oit tmdb.Iterator) tmdb.Iterator { return &HeightLimitedDBIterator{ - oit: oit, + oit: oit, atHeight: atHeight, } } diff --git a/bin/v0.34.x/db/hld/types.go b/bin/v0.34.x/db/hld/types.go index c387d39..d9b896b 100644 --- a/bin/v0.34.x/db/hld/types.go +++ b/bin/v0.34.x/db/hld/types.go @@ -1,9 +1,11 @@ package hld -import "github.com/terra-money/mantlemint-provider-v0.34.x/db/common" +import ( + tmdb "github.com/tendermint/tm-db" +) type HLD interface { - common.DB + tmdb.DB SetReadHeight(int64) ClearReadHeight() int64 SetWriteHeight(int64) @@ -61,10 +63,9 @@ type HeightLimitEnabledDB interface { } type HeightLimitEnabledIterator interface { - common.Iterator + tmdb.Iterator } type HeightLimitEnabledBatch interface { - common.Batch + tmdb.Batch } - diff --git a/bin/v0.34.x/db/safe_batch/safe_batch.go b/bin/v0.34.x/db/safe_batch/safe_batch.go index fc39fa9..5a656b8 100644 --- a/bin/v0.34.x/db/safe_batch/safe_batch.go +++ b/bin/v0.34.x/db/safe_batch/safe_batch.go @@ -2,6 +2,7 @@ package safe_batch import ( "fmt" + tmdb "github.com/tendermint/tm-db" ) diff --git a/bin/v0.34.x/go.mod b/bin/v0.34.x/go.mod index fb86d91..bcb30c4 100644 --- a/bin/v0.34.x/go.mod +++ b/bin/v0.34.x/go.mod @@ -4,6 +4,8 @@ go 1.17 require ( github.com/cosmos/cosmos-sdk v0.44.2 + github.com/cosmos/iavl v0.17.1 + github.com/gogo/protobuf v1.3.3 github.com/gorilla/mux v1.8.0 github.com/gorilla/websocket v1.4.2 github.com/hashicorp/golang-lru v0.5.4 @@ -32,7 +34,6 @@ require ( github.com/cespare/xxhash/v2 v2.1.1 // indirect github.com/confio/ics23/go v0.6.6 // indirect github.com/cosmos/go-bip39 v1.0.0 // indirect - github.com/cosmos/iavl v0.17.1 // indirect github.com/cosmos/ibc-go v1.1.0 // indirect github.com/cosmos/ledger-cosmos-go v0.11.1 // indirect github.com/cosmos/ledger-go v0.9.2 // indirect @@ -50,7 +51,6 @@ require ( github.com/go-logfmt/logfmt v0.5.0 // indirect github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect github.com/gogo/gateway v1.1.0 // indirect - github.com/gogo/protobuf v1.3.3 // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/btree v1.0.0 // indirect diff --git a/bin/v0.34.x/mantlemint/reactor.go b/bin/v0.34.x/mantlemint/reactor.go index 3deb67f..284fecf 100644 --- a/bin/v0.34.x/mantlemint/reactor.go +++ b/bin/v0.34.x/mantlemint/reactor.go @@ -10,10 +10,11 @@ import ( "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/store" - tendermint "github.com/tendermint/tendermint/types" - tmdb "github.com/tendermint/tm-db" "log" "sync" + + tendermint "github.com/tendermint/tendermint/types" + tmdb "github.com/tendermint/tm-db" ) var _ Mantlemint = (*Instance)(nil) @@ -103,15 +104,21 @@ func (mm *Instance) Init(genesis *tendermint.GenesisDoc) error { return err } - if lastState, err := mm.stateStore.Load(); err != nil { - return err - } else { - mm.lastState = lastState - } + } - mm.lastState.LastResultsHash = merkle.HashFromByteSlices(nil) + return nil +} + +func (mm *Instance) LoadInitialState() error { + if lastState, err := mm.stateStore.Load(); err != nil { + return err + } else { + mm.lastState = lastState } + if mm.lastHeight == 0 { + mm.lastState.LastResultsHash = merkle.HashFromByteSlices(nil) + } return nil } @@ -202,5 +209,5 @@ func (mm *Instance) safeRunAfter(block *tendermint.Block, events *EventCollector } // ---- -func NopRunBefore(block *tendermint.Block) error { return nil } -func NopRunAfter(block *tendermint.Block, events *EventCollector) error { return nil } \ No newline at end of file +func NopRunBefore(block *tendermint.Block) error { return nil } +func NopRunAfter(block *tendermint.Block, events *EventCollector) error { return nil } diff --git a/bin/v0.34.x/mantlemint/types.go b/bin/v0.34.x/mantlemint/types.go index 95489be..a88bcfd 100644 --- a/bin/v0.34.x/mantlemint/types.go +++ b/bin/v0.34.x/mantlemint/types.go @@ -8,6 +8,7 @@ import ( type Mantlemint interface { Inject(*tendermint.Block) error Init(*tendermint.GenesisDoc) error + LoadInitialState() error GetCurrentHeight() int64 GetCurrentBlock() *tendermint.Block GetCurrentState() state.State diff --git a/bin/v0.34.x/rpc/cache.go b/bin/v0.34.x/rpc/cache.go index 52f0fdf..1e84494 100644 --- a/bin/v0.34.x/rpc/cache.go +++ b/bin/v0.34.x/rpc/cache.go @@ -18,10 +18,11 @@ type CacheBackend struct { evictionCount uint64 cacheServeCount uint64 serveCount uint64 + cacheType string mtx *sync.Mutex } -func NewCacheBackend(cacheSize int) *CacheBackend { +func NewCacheBackend(cacheSize int, cacheType string) *CacheBackend { // lru.New cache, err := lru.New(cacheSize) if err != nil { @@ -33,6 +34,7 @@ func NewCacheBackend(cacheSize int) *CacheBackend { evictionCount: 0, cacheServeCount: 0, serveCount: 0, + cacheType: cacheType, mtx: new(sync.Mutex), } } @@ -56,21 +58,23 @@ func (cb *CacheBackend) Get(cacheKey string) *ResponseCache { return data } -func (cb *CacheBackend) Purge() int { - fmt.Printf("[rpc/cache] cache eviction count %d, serveCount %d, cacheServeCount %d\n", +func (cb *CacheBackend) Metric() { + fmt.Printf("[rpc/%s] cache length %d, eviction count %d, serveCount %d, cacheServeCount %d\n", + cb.cacheType, + cb.lru.Len(), cb.evictionCount, cb.serveCount, cb.cacheServeCount, ) +} +func (cb *CacheBackend) Purge() { cb.mtx.Lock() - cacheLen := cb.lru.Len() cb.lru.Purge() cb.evictionCount = 0 cb.cacheServeCount = 0 cb.serveCount = 0 cb.mtx.Unlock() - return cacheLen } func (cb *CacheBackend) HandleCachedHTTP(writer http.ResponseWriter, request *http.Request, handler http.Handler) { diff --git a/bin/v0.34.x/rpc/register.go b/bin/v0.34.x/rpc/register.go index b4f71e5..48d315c 100644 --- a/bin/v0.34.x/rpc/register.go +++ b/bin/v0.34.x/rpc/register.go @@ -2,6 +2,11 @@ package rpc import ( "fmt" + "io/ioutil" + "net/http" + "strconv" + "time" + "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/server/api" "github.com/cosmos/cosmos-sdk/server/config" @@ -13,9 +18,6 @@ import ( rpcclient "github.com/tendermint/tendermint/rpc/client" terra "github.com/terra-money/core/app" "github.com/terra-money/core/app/params" - "io/ioutil" - "net/http" - "time" ) func StartRPC( @@ -42,13 +44,23 @@ func StartRPC( WithHomeDir(terra.DefaultNodeHome). WithChainID(chainId) - cache := NewCacheBackend(1024000) + // create backends for response cache + // - cache: used for latest states without `height` parameter + // - archivalCache: used for historical states with `height` parameter; never flushed + cache := NewCacheBackend(16384, "latest") + archivalCache := NewCacheBackend(16384, "archival") // register cache invalidator go func() { for { height := <-invalidateTrigger - fmt.Printf("[cache-middleware] purging cache at height %d, lastLength=%d\n", height, cache.Purge()) + fmt.Printf("[cache-middleware] purging cache at height %d\n", height) + + cache.Metric() + archivalCache.Metric() + + // only purge latest cache + cache.Purge() } }() @@ -78,7 +90,14 @@ func StartRPC( // caching middleware apiSrv.Router.Use(func(next http.Handler) http.Handler { return http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { - cache.HandleCachedHTTP(writer, request, next) + height, err := strconv.ParseInt(request.URL.Query().Get("height"), 10, 64) + + // don't use archival cache if height is 0 or error + if err == nil && height > 0 { + archivalCache.HandleCachedHTTP(writer, request, next) + } else { + cache.HandleCachedHTTP(writer, request, next) + } }) }) diff --git a/bin/v0.34.x/store/rootmulti/dbadapter.go b/bin/v0.34.x/store/rootmulti/dbadapter.go new file mode 100644 index 0000000..47a3f0d --- /dev/null +++ b/bin/v0.34.x/store/rootmulti/dbadapter.go @@ -0,0 +1,45 @@ +package rootmulti + +import ( + "github.com/cosmos/cosmos-sdk/store/dbadapter" + "github.com/cosmos/cosmos-sdk/store/types" + dbm "github.com/tendermint/tm-db" +) + +var commithash = []byte("FAKE_HASH") + +//---------------------------------------- +// commitDBStoreWrapper should only be used for simulation/debugging, +// as it doesn't compute any commit hash, and it cannot load older state. + +// Wrapper type for dbm.Db with implementation of KVStore +type commitDBStoreAdapter struct { + dbadapter.Store + prefix []byte +} + +func (cdsa commitDBStoreAdapter) Commit() types.CommitID { + return types.CommitID{ + Version: -1, + Hash: commithash, + } +} + +func (cdsa commitDBStoreAdapter) LastCommitID() types.CommitID { + return types.CommitID{ + Version: -1, + Hash: commithash, + } +} + +func (cdsa commitDBStoreAdapter) SetPruning(_ types.PruningOptions) {} + +// GetPruning is a no-op as pruning options cannot be directly set on this store. +// They must be set on the root commit multi-store. +func (cdsa commitDBStoreAdapter) GetPruning() types.PruningOptions { return types.PruningOptions{} } + +func (cdsa *commitDBStoreAdapter) BranchStoreWithHeightLimitedDB(hldb dbm.DB) types.CommitKVStore { + var db = dbm.NewPrefixDB(hldb, cdsa.prefix) + + return commitDBStoreAdapter{Store: dbadapter.Store{DB: db}, prefix: cdsa.prefix} +} diff --git a/bin/v0.34.x/store/rootmulti/proof.go b/bin/v0.34.x/store/rootmulti/proof.go new file mode 100644 index 0000000..fc8925b --- /dev/null +++ b/bin/v0.34.x/store/rootmulti/proof.go @@ -0,0 +1,27 @@ +package rootmulti + +import ( + "github.com/tendermint/tendermint/crypto/merkle" + + storetypes "github.com/cosmos/cosmos-sdk/store/types" +) + +// RequireProof returns whether proof is required for the subpath. +func RequireProof(subpath string) bool { + // XXX: create a better convention. + // Currently, only when query subpath is "/key", will proof be included in + // response. If there are some changes about proof building in iavlstore.go, + // we must change code here to keep consistency with iavlStore#Query. + return subpath == "/key" +} + +//----------------------------------------------------------------------------- + +// XXX: This should be managed by the rootMultiStore which may want to register +// more proof ops? +func DefaultProofRuntime() (prt *merkle.ProofRuntime) { + prt = merkle.NewProofRuntime() + prt.RegisterOpDecoder(storetypes.ProofOpIAVLCommitment, storetypes.CommitmentOpDecoder) + prt.RegisterOpDecoder(storetypes.ProofOpSimpleMerkleCommitment, storetypes.CommitmentOpDecoder) + return +} diff --git a/bin/v0.34.x/store/rootmulti/store.go b/bin/v0.34.x/store/rootmulti/store.go new file mode 100644 index 0000000..55b15b3 --- /dev/null +++ b/bin/v0.34.x/store/rootmulti/store.go @@ -0,0 +1,1081 @@ +package rootmulti + +import ( + "bufio" + "compress/zlib" + "encoding/binary" + "fmt" + "io" + "math" + "sort" + "strings" + + iavltree "github.com/cosmos/iavl" + protoio "github.com/gogo/protobuf/io" + gogotypes "github.com/gogo/protobuf/types" + "github.com/pkg/errors" + abci "github.com/tendermint/tendermint/abci/types" + dbm "github.com/tendermint/tm-db" + "github.com/terra-money/mantlemint-provider-v0.34.x/db/hld" + + "github.com/cosmos/cosmos-sdk/snapshots" + snapshottypes "github.com/cosmos/cosmos-sdk/snapshots/types" + "github.com/cosmos/cosmos-sdk/store/cachemulti" + "github.com/cosmos/cosmos-sdk/store/dbadapter" + "github.com/cosmos/cosmos-sdk/store/iavl" + "github.com/cosmos/cosmos-sdk/store/listenkv" + "github.com/cosmos/cosmos-sdk/store/mem" + "github.com/cosmos/cosmos-sdk/store/tracekv" + "github.com/cosmos/cosmos-sdk/store/transient" + "github.com/cosmos/cosmos-sdk/store/types" + sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" +) + +const ( + latestVersionKey = "s/latest" + pruneHeightsKey = "s/pruneheights" + commitInfoKeyFmt = "s/%d" // s/ + + // Do not change chunk size without new snapshot format (must be uniform across nodes) + snapshotChunkSize = uint64(10e6) + snapshotBufferSize = int(snapshotChunkSize) + snapshotMaxItemSize = int(64e6) // SDK has no key/value size limit, so we set an arbitrary limit +) + +// Store is composed of many CommitStores. Name contrasts with +// cacheMultiStore which is used for branching other MultiStores. It implements +// the CommitMultiStore interface. +type Store struct { + db dbm.DB + hldb *hld.HeightLimitedDB + lastCommitInfo *types.CommitInfo + pruningOpts types.PruningOptions + storesParams map[types.StoreKey]storeParams + stores map[types.StoreKey]types.CommitKVStore + keysByName map[string]types.StoreKey + lazyLoading bool + pruneHeights []int64 + initialVersion int64 + + traceWriter io.Writer + traceContext types.TraceContext + + interBlockCache types.MultiStorePersistentCache + + listeners map[types.StoreKey][]types.WriteListener +} + +var ( + _ types.CommitMultiStore = (*Store)(nil) + _ types.Queryable = (*Store)(nil) +) + +// NewStore returns a reference to a new Store object with the provided DB. The +// store will be created with a PruneNothing pruning strategy by default. After +// a store is created, KVStores must be mounted and finally LoadLatestVersion or +// LoadVersion must be called. +func NewStore(db dbm.DB, hldb *hld.HeightLimitedDB) *Store { + return &Store{ + db: db, + hldb: hldb, + pruningOpts: types.PruneNothing, + storesParams: make(map[types.StoreKey]storeParams), + stores: make(map[types.StoreKey]types.CommitKVStore), + keysByName: make(map[string]types.StoreKey), + pruneHeights: make([]int64, 0), + listeners: make(map[types.StoreKey][]types.WriteListener), + } +} + +// GetPruning fetches the pruning strategy from the root store. +func (rs *Store) GetPruning() types.PruningOptions { + return rs.pruningOpts +} + +// SetPruning sets the pruning strategy on the root store and all the sub-stores. +// Note, calling SetPruning on the root store prior to LoadVersion or +// LoadLatestVersion performs a no-op as the stores aren't mounted yet. +func (rs *Store) SetPruning(pruningOpts types.PruningOptions) { + rs.pruningOpts = pruningOpts +} + +// SetLazyLoading sets if the iavl store should be loaded lazily or not +func (rs *Store) SetLazyLoading(lazyLoading bool) { + rs.lazyLoading = lazyLoading +} + +// GetStoreType implements Store. +func (rs *Store) GetStoreType() types.StoreType { + return types.StoreTypeMulti +} + +// MountStoreWithDB implements CommitMultiStore. +func (rs *Store) MountStoreWithDB(key types.StoreKey, typ types.StoreType, db dbm.DB) { + if key == nil { + panic("MountIAVLStore() key cannot be nil") + } + if _, ok := rs.storesParams[key]; ok { + panic(fmt.Sprintf("store duplicate store key %v", key)) + } + if _, ok := rs.keysByName[key.Name()]; ok { + panic(fmt.Sprintf("store duplicate store key name %v", key)) + } + rs.storesParams[key] = storeParams{ + key: key, + typ: typ, + db: db, + } + rs.keysByName[key.Name()] = key +} + +// GetCommitStore returns a mounted CommitStore for a given StoreKey. If the +// store is wrapped in an inter-block cache, it will be unwrapped before returning. +func (rs *Store) GetCommitStore(key types.StoreKey) types.CommitStore { + return rs.GetCommitKVStore(key) +} + +// GetCommitKVStore returns a mounted CommitKVStore for a given StoreKey. If the +// store is wrapped in an inter-block cache, it will be unwrapped before returning. +func (rs *Store) GetCommitKVStore(key types.StoreKey) types.CommitKVStore { + // If the Store has an inter-block cache, first attempt to lookup and unwrap + // the underlying CommitKVStore by StoreKey. If it does not exist, fallback to + // the main mapping of CommitKVStores. + if rs.interBlockCache != nil { + if store := rs.interBlockCache.Unwrap(key); store != nil { + return store + } + } + + return rs.stores[key] +} + +// LoadLatestVersionAndUpgrade implements CommitMultiStore +func (rs *Store) LoadLatestVersionAndUpgrade(upgrades *types.StoreUpgrades) error { + ver := getLatestVersion(rs.db) + return rs.loadVersion(ver, upgrades) +} + +// LoadVersionAndUpgrade allows us to rename substores while loading an older version +func (rs *Store) LoadVersionAndUpgrade(ver int64, upgrades *types.StoreUpgrades) error { + return rs.loadVersion(ver, upgrades) +} + +// LoadLatestVersion implements CommitMultiStore. +func (rs *Store) LoadLatestVersion() error { + ver := getLatestVersion(rs.db) + return rs.loadVersion(ver, nil) +} + +// LoadVersion implements CommitMultiStore. +func (rs *Store) LoadVersion(ver int64) error { + return rs.loadVersion(ver, nil) +} + +func (rs *Store) loadVersion(ver int64, upgrades *types.StoreUpgrades) error { + infos := make(map[string]types.StoreInfo) + + cInfo := &types.CommitInfo{} + + // load old data if we are not version 0 + if ver != 0 { + var err error + cInfo, err = getCommitInfo(rs.db, ver) + if err != nil { + return err + } + + // convert StoreInfos slice to map + for _, storeInfo := range cInfo.StoreInfos { + infos[storeInfo.Name] = storeInfo + } + } + + // load each Store (note this doesn't panic on unmounted keys now) + var newStores = make(map[types.StoreKey]types.CommitKVStore) + + storesKeys := make([]types.StoreKey, 0, len(rs.storesParams)) + + for key := range rs.storesParams { + storesKeys = append(storesKeys, key) + } + if upgrades != nil { + // deterministic iteration order for upgrades + // (as the underlying store may change and + // upgrades make store changes where the execution order may matter) + sort.Slice(storesKeys, func(i, j int) bool { + return storesKeys[i].Name() < storesKeys[j].Name() + }) + } + + for _, key := range storesKeys { + storeParams := rs.storesParams[key] + commitID := rs.getCommitID(infos, key.Name()) + + // If it has been added, set the initial version + if upgrades.IsAdded(key.Name()) { + storeParams.initialVersion = uint64(ver) + 1 + } + + store, err := rs.loadCommitStoreFromParams(key, commitID, storeParams) + if err != nil { + return errors.Wrap(err, "failed to load store") + } + + newStores[key] = store + + // If it was deleted, remove all data + if upgrades.IsDeleted(key.Name()) { + if err := deleteKVStore(store.(types.KVStore)); err != nil { + return errors.Wrapf(err, "failed to delete store %s", key.Name()) + } + } else if oldName := upgrades.RenamedFrom(key.Name()); oldName != "" { + // handle renames specially + // make an unregistered key to satify loadCommitStore params + oldKey := types.NewKVStoreKey(oldName) + oldParams := storeParams + oldParams.key = oldKey + + // load from the old name + oldStore, err := rs.loadCommitStoreFromParams(oldKey, rs.getCommitID(infos, oldName), oldParams) + if err != nil { + return errors.Wrapf(err, "failed to load old store %s", oldName) + } + + // move all data + if err := moveKVStoreData(oldStore.(types.KVStore), store.(types.KVStore)); err != nil { + return errors.Wrapf(err, "failed to move store %s -> %s", oldName, key.Name()) + } + } + } + + rs.lastCommitInfo = cInfo + rs.stores = newStores + + // load any pruned heights we missed from disk to be pruned on the next run + ph, err := getPruningHeights(rs.db) + if err == nil && len(ph) > 0 { + rs.pruneHeights = ph + } + + return nil +} + +func (rs *Store) getCommitID(infos map[string]types.StoreInfo, name string) types.CommitID { + info, ok := infos[name] + if !ok { + return types.CommitID{} + } + + return info.CommitId +} + +func deleteKVStore(kv types.KVStore) error { + // Note that we cannot write while iterating, so load all keys here, delete below + var keys [][]byte + itr := kv.Iterator(nil, nil) + for itr.Valid() { + keys = append(keys, itr.Key()) + itr.Next() + } + itr.Close() + + for _, k := range keys { + kv.Delete(k) + } + return nil +} + +// we simulate move by a copy and delete +func moveKVStoreData(oldDB types.KVStore, newDB types.KVStore) error { + // we read from one and write to another + itr := oldDB.Iterator(nil, nil) + for itr.Valid() { + newDB.Set(itr.Key(), itr.Value()) + itr.Next() + } + itr.Close() + + // then delete the old store + return deleteKVStore(oldDB) +} + +// SetInterBlockCache sets the Store's internal inter-block (persistent) cache. +// When this is defined, all CommitKVStores will be wrapped with their respective +// inter-block cache. +func (rs *Store) SetInterBlockCache(c types.MultiStorePersistentCache) { + rs.interBlockCache = c +} + +// SetTracer sets the tracer for the MultiStore that the underlying +// stores will utilize to trace operations. A MultiStore is returned. +func (rs *Store) SetTracer(w io.Writer) types.MultiStore { + rs.traceWriter = w + return rs +} + +// SetTracingContext updates the tracing context for the MultiStore by merging +// the given context with the existing context by key. Any existing keys will +// be overwritten. It is implied that the caller should update the context when +// necessary between tracing operations. It returns a modified MultiStore. +func (rs *Store) SetTracingContext(tc types.TraceContext) types.MultiStore { + if rs.traceContext != nil { + for k, v := range tc { + rs.traceContext[k] = v + } + } else { + rs.traceContext = tc + } + + return rs +} + +// TracingEnabled returns if tracing is enabled for the MultiStore. +func (rs *Store) TracingEnabled() bool { + return rs.traceWriter != nil +} + +// AddListeners adds listeners for a specific KVStore +func (rs *Store) AddListeners(key types.StoreKey, listeners []types.WriteListener) { + if ls, ok := rs.listeners[key]; ok { + rs.listeners[key] = append(ls, listeners...) + } else { + rs.listeners[key] = listeners + } +} + +// ListeningEnabled returns if listening is enabled for a specific KVStore +func (rs *Store) ListeningEnabled(key types.StoreKey) bool { + if ls, ok := rs.listeners[key]; ok { + return len(ls) != 0 + } + return false +} + +// LastCommitID implements Committer/CommitStore. +func (rs *Store) LastCommitID() types.CommitID { + if rs.lastCommitInfo == nil { + return types.CommitID{ + Version: getLatestVersion(rs.db), + } + } + + return rs.lastCommitInfo.CommitID() +} + +// Commit implements Committer/CommitStore. +func (rs *Store) Commit() types.CommitID { + var previousHeight, version int64 + if rs.lastCommitInfo.GetVersion() == 0 && rs.initialVersion > 1 { + // This case means that no commit has been made in the store, we + // start from initialVersion. + version = rs.initialVersion + + } else { + // This case can means two things: + // - either there was already a previous commit in the store, in which + // case we increment the version from there, + // - or there was no previous commit, and initial version was not set, + // in which case we start at version 1. + previousHeight = rs.lastCommitInfo.GetVersion() + version = previousHeight + 1 + } + + rs.lastCommitInfo = commitStores(version, rs.stores) + + // Determine if pruneHeight height needs to be added to the list of heights to + // be pruned, where pruneHeight = (commitHeight - 1) - KeepRecent. + if int64(rs.pruningOpts.KeepRecent) < previousHeight { + pruneHeight := previousHeight - int64(rs.pruningOpts.KeepRecent) + // We consider this height to be pruned iff: + // + // - KeepEvery is zero as that means that all heights should be pruned. + // - KeepEvery % (height - KeepRecent) != 0 as that means the height is not + // a 'snapshot' height. + if rs.pruningOpts.KeepEvery == 0 || pruneHeight%int64(rs.pruningOpts.KeepEvery) != 0 { + rs.pruneHeights = append(rs.pruneHeights, pruneHeight) + } + } + + // batch prune if the current height is a pruning interval height + if rs.pruningOpts.Interval > 0 && version%int64(rs.pruningOpts.Interval) == 0 { + rs.pruneStores() + } + + flushMetadata(rs.db, version, rs.lastCommitInfo, rs.pruneHeights) + + return types.CommitID{ + Version: version, + Hash: rs.lastCommitInfo.Hash(), + } +} + +// pruneStores will batch delete a list of heights from each mounted sub-store. +// Afterwards, pruneHeights is reset. +func (rs *Store) pruneStores() { + if len(rs.pruneHeights) == 0 { + return + } + + for key, store := range rs.stores { + if store.GetStoreType() == types.StoreTypeIAVL { + // If the store is wrapped with an inter-block cache, we must first unwrap + // it to get the underlying IAVL store. + store = rs.GetCommitKVStore(key) + + if err := store.(*iavl.Store).DeleteVersions(rs.pruneHeights...); err != nil { + if errCause := errors.Cause(err); errCause != nil && errCause != iavltree.ErrVersionDoesNotExist { + panic(err) + } + } + } + } + + rs.pruneHeights = make([]int64, 0) +} + +// CacheWrap implements CacheWrapper/Store/CommitStore. +func (rs *Store) CacheWrap() types.CacheWrap { + return rs.CacheMultiStore().(types.CacheWrap) +} + +// CacheWrapWithTrace implements the CacheWrapper interface. +func (rs *Store) CacheWrapWithTrace(_ io.Writer, _ types.TraceContext) types.CacheWrap { + return rs.CacheWrap() +} + +// CacheWrapWithListeners implements the CacheWrapper interface. +func (rs *Store) CacheWrapWithListeners(_ types.StoreKey, _ []types.WriteListener) types.CacheWrap { + return rs.CacheWrap() +} + +// CacheMultiStore creates ephemeral branch of the multi-store and returns a CacheMultiStore. +// It implements the MultiStore interface. +func (rs *Store) CacheMultiStore() types.CacheMultiStore { + stores := make(map[types.StoreKey]types.CacheWrapper) + for k, v := range rs.stores { + stores[k] = v + } + return cachemulti.NewStore(rs.db, stores, rs.keysByName, rs.traceWriter, rs.traceContext, rs.listeners) +} + +// CacheMultiStoreWithVersion is analogous to CacheMultiStore except that it +// attempts to load stores at a given version (height). An error is returned if +// any store cannot be loaded. This should only be used for querying and +// iterating at past heights. +func (rs *Store) CacheMultiStoreWithVersion(version int64) (types.CacheMultiStore, error) { + var hldb = rs.hldb.BranchHeightLimitedDB(version) + + cachedStores := make(map[types.StoreKey]types.CacheWrapper) + for key, store := range rs.stores { + switch store.GetStoreType() { + case types.StoreTypeIAVL: + // If the store is wrapped with an inter-block cache, we must first unwrap + // it to get the underlying IAVL store. + store = rs.GetCommitKVStore(key) + + // Attempt to lazy-load an already saved IAVL store version. If the + // version does not exist or is pruned, an error should be returned. + iavlStore, err := store.(*iavl.Store).GetImmutable(version) + if err != nil { + return nil, err + } + + cachedStores[key] = iavlStore + + case types.StoreTypeDB: + if version == rs.lastCommitInfo.Version { + cachedStores[key] = store + } else { + s := rs.GetCommitKVStore(key).(commitDBStoreAdapter) + // connect new adapter with height limited + cachedStores[key] = s.BranchStoreWithHeightLimitedDB(hldb) + } + + default: + cachedStores[key] = store + } + } + + return cachemulti.NewStore(hldb, cachedStores, rs.keysByName, rs.traceWriter, rs.traceContext, rs.listeners), nil +} + +// GetStore returns a mounted Store for a given StoreKey. If the StoreKey does +// not exist, it will panic. If the Store is wrapped in an inter-block cache, it +// will be unwrapped prior to being returned. +// +// TODO: This isn't used directly upstream. Consider returning the Store as-is +// instead of unwrapping. +func (rs *Store) GetStore(key types.StoreKey) types.Store { + store := rs.GetCommitKVStore(key) + if store == nil { + panic(fmt.Sprintf("store does not exist for key: %s", key.Name())) + } + + return store +} + +// GetKVStore returns a mounted KVStore for a given StoreKey. If tracing is +// enabled on the KVStore, a wrapped TraceKVStore will be returned with the root +// store's tracer, otherwise, the original KVStore will be returned. +// +// NOTE: The returned KVStore may be wrapped in an inter-block cache if it is +// set on the root store. +func (rs *Store) GetKVStore(key types.StoreKey) types.KVStore { + s := rs.stores[key] + if s == nil { + panic(fmt.Sprintf("store does not exist for key: %s", key.Name())) + } + store := s.(types.KVStore) + + if rs.TracingEnabled() { + store = tracekv.NewStore(store, rs.traceWriter, rs.traceContext) + } + if rs.ListeningEnabled(key) { + store = listenkv.NewStore(store, key, rs.listeners[key]) + } + + return store +} + +// getStoreByName performs a lookup of a StoreKey given a store name typically +// provided in a path. The StoreKey is then used to perform a lookup and return +// a Store. If the Store is wrapped in an inter-block cache, it will be unwrapped +// prior to being returned. If the StoreKey does not exist, nil is returned. +func (rs *Store) getStoreByName(name string) types.Store { + key := rs.keysByName[name] + if key == nil { + return nil + } + + return rs.GetCommitKVStore(key) +} + +// Query calls substore.Query with the same `req` where `req.Path` is +// modified to remove the substore prefix. +// Ie. `req.Path` here is `//`, and trimmed to `/` for the substore. +// TODO: add proof for `multistore -> substore`. +func (rs *Store) Query(req abci.RequestQuery) abci.ResponseQuery { + path := req.Path + storeName, subpath, err := parsePath(path) + if err != nil { + return sdkerrors.QueryResult(err) + } + + store := rs.getStoreByName(storeName) + if store == nil { + return sdkerrors.QueryResult(sdkerrors.Wrapf(sdkerrors.ErrUnknownRequest, "no such store: %s", storeName)) + } + + queryable, ok := store.(types.Queryable) + if !ok { + return sdkerrors.QueryResult(sdkerrors.Wrapf(sdkerrors.ErrUnknownRequest, "store %s (type %T) doesn't support queries", storeName, store)) + } + + // trim the path and make the query + req.Path = subpath + res := queryable.Query(req) + + if !req.Prove || !RequireProof(subpath) { + return res + } + + if res.ProofOps == nil || len(res.ProofOps.Ops) == 0 { + return sdkerrors.QueryResult(sdkerrors.Wrap(sdkerrors.ErrInvalidRequest, "proof is unexpectedly empty; ensure height has not been pruned")) + } + + // If the request's height is the latest height we've committed, then utilize + // the store's lastCommitInfo as this commit info may not be flushed to disk. + // Otherwise, we query for the commit info from disk. + var commitInfo *types.CommitInfo + + if res.Height == rs.lastCommitInfo.Version { + commitInfo = rs.lastCommitInfo + } else { + commitInfo, err = getCommitInfo(rs.db, res.Height) + if err != nil { + return sdkerrors.QueryResult(err) + } + } + + // Restore origin path and append proof op. + res.ProofOps.Ops = append(res.ProofOps.Ops, commitInfo.ProofOp(storeName)) + + return res +} + +// SetInitialVersion sets the initial version of the IAVL tree. It is used when +// starting a new chain at an arbitrary height. +func (rs *Store) SetInitialVersion(version int64) error { + rs.initialVersion = version + + // Loop through all the stores, if it's an IAVL store, then set initial + // version on it. + for key, store := range rs.stores { + if store.GetStoreType() == types.StoreTypeIAVL { + // If the store is wrapped with an inter-block cache, we must first unwrap + // it to get the underlying IAVL store. + store = rs.GetCommitKVStore(key) + store.(*iavl.Store).SetInitialVersion(version) + } + } + + return nil +} + +// parsePath expects a format like /[/] +// Must start with /, subpath may be empty +// Returns error if it doesn't start with / +func parsePath(path string) (storeName string, subpath string, err error) { + if !strings.HasPrefix(path, "/") { + return storeName, subpath, sdkerrors.Wrapf(sdkerrors.ErrUnknownRequest, "invalid path: %s", path) + } + + paths := strings.SplitN(path[1:], "/", 2) + storeName = paths[0] + + if len(paths) == 2 { + subpath = "/" + paths[1] + } + + return storeName, subpath, nil +} + +//---------------------- Snapshotting ------------------ + +// Snapshot implements snapshottypes.Snapshotter. The snapshot output for a given format must be +// identical across nodes such that chunks from different sources fit together. If the output for a +// given format changes (at the byte level), the snapshot format must be bumped - see +// TestMultistoreSnapshot_Checksum test. +func (rs *Store) Snapshot(height uint64, format uint32) (<-chan io.ReadCloser, error) { + if format != snapshottypes.CurrentFormat { + return nil, sdkerrors.Wrapf(snapshottypes.ErrUnknownFormat, "format %v", format) + } + if height == 0 { + return nil, sdkerrors.Wrap(sdkerrors.ErrLogic, "cannot snapshot height 0") + } + if height > uint64(rs.LastCommitID().Version) { + return nil, sdkerrors.Wrapf(sdkerrors.ErrLogic, "cannot snapshot future height %v", height) + } + + // Collect stores to snapshot (only IAVL stores are supported) + type namedStore struct { + *iavl.Store + name string + } + stores := []namedStore{} + for key := range rs.stores { + switch store := rs.GetCommitKVStore(key).(type) { + case *iavl.Store: + stores = append(stores, namedStore{name: key.Name(), Store: store}) + case *transient.Store, *mem.Store: + // Non-persisted stores shouldn't be snapshotted + continue + default: + return nil, sdkerrors.Wrapf(sdkerrors.ErrLogic, + "don't know how to snapshot store %q of type %T", key.Name(), store) + } + } + sort.Slice(stores, func(i, j int) bool { + return strings.Compare(stores[i].name, stores[j].name) == -1 + }) + + // Spawn goroutine to generate snapshot chunks and pass their io.ReadClosers through a channel + ch := make(chan io.ReadCloser) + go func() { + // Set up a stream pipeline to serialize snapshot nodes: + // ExportNode -> delimited Protobuf -> zlib -> buffer -> chunkWriter -> chan io.ReadCloser + chunkWriter := snapshots.NewChunkWriter(ch, snapshotChunkSize) + defer chunkWriter.Close() + bufWriter := bufio.NewWriterSize(chunkWriter, snapshotBufferSize) + defer func() { + if err := bufWriter.Flush(); err != nil { + chunkWriter.CloseWithError(err) + } + }() + zWriter, err := zlib.NewWriterLevel(bufWriter, 7) + if err != nil { + chunkWriter.CloseWithError(sdkerrors.Wrap(err, "zlib failure")) + return + } + defer func() { + if err := zWriter.Close(); err != nil { + chunkWriter.CloseWithError(err) + } + }() + protoWriter := protoio.NewDelimitedWriter(zWriter) + defer func() { + if err := protoWriter.Close(); err != nil { + chunkWriter.CloseWithError(err) + } + }() + + // Export each IAVL store. Stores are serialized as a stream of SnapshotItem Protobuf + // messages. The first item contains a SnapshotStore with store metadata (i.e. name), + // and the following messages contain a SnapshotNode (i.e. an ExportNode). Store changes + // are demarcated by new SnapshotStore items. + for _, store := range stores { + exporter, err := store.Export(int64(height)) + if err != nil { + chunkWriter.CloseWithError(err) + return + } + defer exporter.Close() + err = protoWriter.WriteMsg(&types.SnapshotItem{ + Item: &types.SnapshotItem_Store{ + Store: &types.SnapshotStoreItem{ + Name: store.name, + }, + }, + }) + if err != nil { + chunkWriter.CloseWithError(err) + return + } + + for { + node, err := exporter.Next() + if err == iavltree.ExportDone { + break + } else if err != nil { + chunkWriter.CloseWithError(err) + return + } + err = protoWriter.WriteMsg(&types.SnapshotItem{ + Item: &types.SnapshotItem_IAVL{ + IAVL: &types.SnapshotIAVLItem{ + Key: node.Key, + Value: node.Value, + Height: int32(node.Height), + Version: node.Version, + }, + }, + }) + if err != nil { + chunkWriter.CloseWithError(err) + return + } + } + exporter.Close() + } + }() + + return ch, nil +} + +// Restore implements snapshottypes.Snapshotter. +func (rs *Store) Restore( + height uint64, format uint32, chunks <-chan io.ReadCloser, ready chan<- struct{}, +) error { + if format != snapshottypes.CurrentFormat { + return sdkerrors.Wrapf(snapshottypes.ErrUnknownFormat, "format %v", format) + } + if height == 0 { + return sdkerrors.Wrap(sdkerrors.ErrLogic, "cannot restore snapshot at height 0") + } + if height > uint64(math.MaxInt64) { + return sdkerrors.Wrapf(snapshottypes.ErrInvalidMetadata, + "snapshot height %v cannot exceed %v", height, int64(math.MaxInt64)) + } + + // Signal readiness. Must be done before the readers below are set up, since the zlib + // reader reads from the stream on initialization, potentially causing deadlocks. + if ready != nil { + close(ready) + } + + // Set up a restore stream pipeline + // chan io.ReadCloser -> chunkReader -> zlib -> delimited Protobuf -> ExportNode + chunkReader := snapshots.NewChunkReader(chunks) + defer chunkReader.Close() + zReader, err := zlib.NewReader(chunkReader) + if err != nil { + return sdkerrors.Wrap(err, "zlib failure") + } + defer zReader.Close() + protoReader := protoio.NewDelimitedReader(zReader, snapshotMaxItemSize) + defer protoReader.Close() + + // Import nodes into stores. The first item is expected to be a SnapshotItem containing + // a SnapshotStoreItem, telling us which store to import into. The following items will contain + // SnapshotNodeItem (i.e. ExportNode) until we reach the next SnapshotStoreItem or EOF. + var importer *iavltree.Importer + for { + item := &types.SnapshotItem{} + err := protoReader.ReadMsg(item) + if err == io.EOF { + break + } else if err != nil { + return sdkerrors.Wrap(err, "invalid protobuf message") + } + + switch item := item.Item.(type) { + case *types.SnapshotItem_Store: + if importer != nil { + err = importer.Commit() + if err != nil { + return sdkerrors.Wrap(err, "IAVL commit failed") + } + importer.Close() + } + store, ok := rs.getStoreByName(item.Store.Name).(*iavl.Store) + if !ok || store == nil { + return sdkerrors.Wrapf(sdkerrors.ErrLogic, "cannot import into non-IAVL store %q", item.Store.Name) + } + importer, err = store.Import(int64(height)) + if err != nil { + return sdkerrors.Wrap(err, "import failed") + } + defer importer.Close() + + case *types.SnapshotItem_IAVL: + if importer == nil { + return sdkerrors.Wrap(sdkerrors.ErrLogic, "received IAVL node item before store item") + } + if item.IAVL.Height > math.MaxInt8 { + return sdkerrors.Wrapf(sdkerrors.ErrLogic, "node height %v cannot exceed %v", + item.IAVL.Height, math.MaxInt8) + } + node := &iavltree.ExportNode{ + Key: item.IAVL.Key, + Value: item.IAVL.Value, + Height: int8(item.IAVL.Height), + Version: item.IAVL.Version, + } + // Protobuf does not differentiate between []byte{} as nil, but fortunately IAVL does + // not allow nil keys nor nil values for leaf nodes, so we can always set them to empty. + if node.Key == nil { + node.Key = []byte{} + } + if node.Height == 0 && node.Value == nil { + node.Value = []byte{} + } + err := importer.Add(node) + if err != nil { + return sdkerrors.Wrap(err, "IAVL node import failed") + } + + default: + return sdkerrors.Wrapf(sdkerrors.ErrLogic, "unknown snapshot item %T", item) + } + } + + if importer != nil { + err := importer.Commit() + if err != nil { + return sdkerrors.Wrap(err, "IAVL commit failed") + } + importer.Close() + } + + flushMetadata(rs.db, int64(height), rs.buildCommitInfo(int64(height)), []int64{}) + return rs.LoadLatestVersion() +} + +func (rs *Store) loadCommitStoreFromParams(key types.StoreKey, id types.CommitID, params storeParams) (types.CommitKVStore, error) { + var db dbm.DB + + var prefix []byte + if params.db != nil { + prefix = []byte("s/_/") + db = dbm.NewPrefixDB(params.db, prefix) + } else { + prefix = []byte("s/k:" + params.key.Name() + "/") + db = dbm.NewPrefixDB(rs.db, prefix) + } + + switch params.typ { + case types.StoreTypeMulti: + panic("recursive MultiStores not yet supported") + + case types.StoreTypeIAVL: + var store types.CommitKVStore + var err error + + if params.initialVersion == 0 { + store, err = iavl.LoadStore(db, id, rs.lazyLoading) + } else { + store, err = iavl.LoadStoreWithInitialVersion(db, id, rs.lazyLoading, params.initialVersion) + } + + if err != nil { + return nil, err + } + + if rs.interBlockCache != nil { + // Wrap and get a CommitKVStore with inter-block caching. Note, this should + // only wrap the primary CommitKVStore, not any store that is already + // branched as that will create unexpected behavior. + store = rs.interBlockCache.GetStoreCache(key, store) + } + + return store, err + + case types.StoreTypeDB: + da := commitDBStoreAdapter{Store: dbadapter.Store{DB: db}, prefix: prefix} + return da, nil + + case types.StoreTypeTransient: + _, ok := key.(*types.TransientStoreKey) + if !ok { + return nil, fmt.Errorf("invalid StoreKey for StoreTypeTransient: %s", key.String()) + } + + return transient.NewStore(), nil + + case types.StoreTypeMemory: + if _, ok := key.(*types.MemoryStoreKey); !ok { + return nil, fmt.Errorf("unexpected key type for a MemoryStoreKey; got: %s", key.String()) + } + + return mem.NewStore(), nil + + default: + panic(fmt.Sprintf("unrecognized store type %v", params.typ)) + } +} + +func (rs *Store) buildCommitInfo(version int64) *types.CommitInfo { + storeInfos := []types.StoreInfo{} + for key, store := range rs.stores { + if store.GetStoreType() == types.StoreTypeTransient { + continue + } + storeInfos = append(storeInfos, types.StoreInfo{ + Name: key.Name(), + CommitId: store.LastCommitID(), + }) + } + return &types.CommitInfo{ + Version: version, + StoreInfos: storeInfos, + } +} + +type storeParams struct { + key types.StoreKey + db dbm.DB + typ types.StoreType + initialVersion uint64 +} + +func getLatestVersion(db dbm.DB) int64 { + bz, err := db.Get([]byte(latestVersionKey)) + if err != nil { + panic(err) + } else if bz == nil { + return 0 + } + + var latestVersion int64 + + if err := gogotypes.StdInt64Unmarshal(&latestVersion, bz); err != nil { + panic(err) + } + + return latestVersion +} + +// Commits each store and returns a new commitInfo. +func commitStores(version int64, storeMap map[types.StoreKey]types.CommitKVStore) *types.CommitInfo { + storeInfos := make([]types.StoreInfo, 0, len(storeMap)) + + for key, store := range storeMap { + commitID := store.Commit() + + if store.GetStoreType() == types.StoreTypeTransient { + continue + } + + si := types.StoreInfo{} + si.Name = key.Name() + si.CommitId = commitID + storeInfos = append(storeInfos, si) + } + + return &types.CommitInfo{ + Version: version, + StoreInfos: storeInfos, + } +} + +// Gets commitInfo from disk. +func getCommitInfo(db dbm.DB, ver int64) (*types.CommitInfo, error) { + cInfoKey := fmt.Sprintf(commitInfoKeyFmt, ver) + + bz, err := db.Get([]byte(cInfoKey)) + if err != nil { + return nil, errors.Wrap(err, "failed to get commit info") + } else if bz == nil { + return nil, errors.New("no commit info found") + } + + cInfo := &types.CommitInfo{} + if err = cInfo.Unmarshal(bz); err != nil { + return nil, errors.Wrap(err, "failed unmarshal commit info") + } + + return cInfo, nil +} + +func setCommitInfo(batch dbm.Batch, version int64, cInfo *types.CommitInfo) { + bz, err := cInfo.Marshal() + if err != nil { + panic(err) + } + + cInfoKey := fmt.Sprintf(commitInfoKeyFmt, version) + batch.Set([]byte(cInfoKey), bz) +} + +func setLatestVersion(batch dbm.Batch, version int64) { + bz, err := gogotypes.StdInt64Marshal(version) + if err != nil { + panic(err) + } + + batch.Set([]byte(latestVersionKey), bz) +} + +func setPruningHeights(batch dbm.Batch, pruneHeights []int64) { + bz := make([]byte, 0) + for _, ph := range pruneHeights { + buf := make([]byte, 8) + binary.BigEndian.PutUint64(buf, uint64(ph)) + bz = append(bz, buf...) + } + + batch.Set([]byte(pruneHeightsKey), bz) +} + +func getPruningHeights(db dbm.DB) ([]int64, error) { + bz, err := db.Get([]byte(pruneHeightsKey)) + if err != nil { + return nil, fmt.Errorf("failed to get pruned heights: %w", err) + } + if len(bz) == 0 { + return nil, errors.New("no pruned heights found") + } + + prunedHeights := make([]int64, len(bz)/8) + i, offset := 0, 0 + for offset < len(bz) { + prunedHeights[i] = int64(binary.BigEndian.Uint64(bz[offset : offset+8])) + i++ + offset += 8 + } + + return prunedHeights, nil +} + +func flushMetadata(db dbm.DB, version int64, cInfo *types.CommitInfo, pruneHeights []int64) { + batch := db.NewBatch() + defer batch.Close() + + setCommitInfo(batch, version, cInfo) + setLatestVersion(batch, version) + setPruningHeights(batch, pruneHeights) + + if err := batch.Write(); err != nil { + panic(fmt.Errorf("error on batch write %w", err)) + } +} diff --git a/bin/v0.34.x/sync.go b/bin/v0.34.x/sync.go index b308b8c..82c35f9 100644 --- a/bin/v0.34.x/sync.go +++ b/bin/v0.34.x/sync.go @@ -4,6 +4,12 @@ import ( "crypto/sha1" "encoding/hex" "fmt" + "io/ioutil" + "log" + "os" + "path/filepath" + "runtime/debug" + "github.com/cosmos/cosmos-sdk/baseapp" "github.com/cosmos/cosmos-sdk/simapp" sdk "github.com/cosmos/cosmos-sdk/types" @@ -12,23 +18,20 @@ import ( tmlog "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/proxy" tendermint "github.com/tendermint/tendermint/types" - tmdb "github.com/tendermint/tm-db" terra "github.com/terra-money/core/app" core "github.com/terra-money/core/types" wasmconfig "github.com/terra-money/core/x/wasm/config" blockFeeder "github.com/terra-money/mantlemint-provider-v0.34.x/block_feed" "github.com/terra-money/mantlemint-provider-v0.34.x/config" + "github.com/terra-money/mantlemint-provider-v0.34.x/db/heleveldb" + "github.com/terra-money/mantlemint-provider-v0.34.x/db/hld" "github.com/terra-money/mantlemint-provider-v0.34.x/db/safe_batch" "github.com/terra-money/mantlemint-provider-v0.34.x/indexer" "github.com/terra-money/mantlemint-provider-v0.34.x/indexer/block" "github.com/terra-money/mantlemint-provider-v0.34.x/indexer/tx" "github.com/terra-money/mantlemint-provider-v0.34.x/mantlemint" "github.com/terra-money/mantlemint-provider-v0.34.x/rpc" - "io/ioutil" - "log" - "os" - "path/filepath" - "runtime/debug" + "github.com/terra-money/mantlemint-provider-v0.34.x/store/rootmulti" ) // initialize mantlemint for v0.34.x @@ -53,16 +56,26 @@ func main() { sdkConfig.SetAddressVerifier(core.AddressVerifier) sdkConfig.Seal() - ldb, ldbErr := tmdb.NewGoLevelDB(mantlemintConfig.MantlemintDB, mantlemintConfig.Home) + ldb, ldbErr := heleveldb.NewLevelDBDriver(&heleveldb.DriverConfig{mantlemintConfig.MantlemintDB, mantlemintConfig.Home, heleveldb.DriverModeKeySuffixDesc}) if ldbErr != nil { panic(ldbErr) } - batched := safe_batch.NewSafeBatchDB(ldb) + var hldb = hld.ApplyHeightLimitedDB( + ldb, + &hld.HeightLimitedDBConfig{ + Debug: true, + }, + ) + + batched := safe_batch.NewSafeBatchDB(hldb) batchedOrigin := batched.(safe_batch.SafeBatchDBCloser) logger := tmlog.NewTMLogger(os.Stdout) codec := terra.MakeEncodingConfig() + // customize CMS to limit kv store's read height on query + cms := rootmulti.NewStore(batched, hldb) + var app = terra.NewTerraApp( logger, batched, @@ -79,6 +92,9 @@ func main() { ContractMemoryCacheSize: 2048, }, fauxMerkleModeOpt, + func(ba *baseapp.BaseApp) { + ba.SetCMS(cms) + }, ) // create app... @@ -109,10 +125,26 @@ func main() { ) // initialize using provided genesis - if initErr := mm.Init(getGenesisDoc(mantlemintConfig.GenesisPath)); initErr != nil { + genesisDoc := getGenesisDoc(mantlemintConfig.GenesisPath) + initialHeight := genesisDoc.InitialHeight + hldb.SetWriteHeight(initialHeight) + batchedOrigin.Open() + + if initErr := mm.Init(genesisDoc); initErr != nil { panic(initErr) } + if flushErr := batchedOrigin.Flush(); flushErr != nil { + debug.PrintStack() + panic(flushErr) + } + + if loadErr := mm.LoadInitialState(); loadErr != nil { + panic(loadErr) + } + + hldb.ClearWriteHeight() + // get blocks over some sort of transport, inject to mantlemint blockFeed := blockFeeder.NewAggregateBlockFeed( mm.GetCurrentHeight(), @@ -152,7 +184,6 @@ func main() { }). StartSideSync(mantlemintConfig.IndexerSideSyncPort) }, - // inject flag checker for synced blockFeed.IsSynced, ) @@ -173,6 +204,7 @@ func main() { feed := <-cBlockFeed // open db batch + hldb.SetWriteHeight(feed.Block.Height) batchedOrigin.Open() if injectErr := mm.Inject(feed.Block); injectErr != nil { debug.PrintStack() @@ -185,6 +217,8 @@ func main() { panic(flushErr) } + hldb.ClearWriteHeight() + // run indexer if indexerErr := indexerInstance.Run(feed.Block, feed.BlockID, mm.GetCurrentEventCollector()); indexerErr != nil { debug.PrintStack() @@ -219,6 +253,5 @@ func getGenesisDoc(genesisPath string) *tendermint.GenesisDoc { } func forever() { - for { - } + <-(chan int)(nil) }