diff --git a/.gitignore b/.gitignore index d83c25f5c..2494171ba 100644 --- a/.gitignore +++ b/.gitignore @@ -15,6 +15,9 @@ *.swo *.swp +# Profiling +*.prof + .idea/ vendor/ build/ diff --git a/cmd/importer/importer.go b/cmd/importer/importer.go index ae2adcead..f5c95e255 100644 --- a/cmd/importer/importer.go +++ b/cmd/importer/importer.go @@ -15,17 +15,22 @@ import ( var ( blockchainPath = flag.String("blockchain-path", "", "Path to binary blockchain file.") balancesPath = flag.String("balances-path", "", "Path to JSON with correct balances after applying blocks.") + dataDirPath = flag.String("data-path", "", "Path to directory with previously created state.") nBlocks = flag.Int("blocks-number", 1000, "Number of blocks to import.") ) func main() { flag.Parse() - if len(*blockchainPath) == 0 { + if *blockchainPath == "" { log.Fatalf("You must specify blockchain-path option.") } - dataDir, err := ioutil.TempDir(os.TempDir(), "dataDir") - if err != nil { - log.Fatalf("Faied to create temp dir for data: %v\n", err) + dataDir := *dataDirPath + if dataDir == "" { + tempDir, err := ioutil.TempDir(os.TempDir(), "dataDir") + if err != nil { + log.Fatalf("Faied to create temp dir for data: %v\n", err) + } + dataDir = tempDir } manager, err := state.NewStateManager(dataDir, state.DefaultBlockStorageParams()) if err != nil { @@ -36,13 +41,19 @@ func main() { if err := manager.Close(); err != nil { log.Fatalf("Failed to close StateManager: %v\n", err) } - if err := os.RemoveAll(dataDir); err != nil { - log.Fatalf("Failed to clean data dir: %v\n", err) + if *dataDirPath == "" { + if err := os.RemoveAll(dataDir); err != nil { + log.Fatalf("Failed to clean data dir: %v\n", err) + } } }() + height, err := manager.Height() + if err != nil { + log.Fatalf("Failed to get current height: %v\n", err) + } start := time.Now() - if err := importer.ApplyFromFile(manager, *blockchainPath, *nBlocks, false); err != nil { + if err := importer.ApplyFromFile(manager, *blockchainPath, uint64(*nBlocks), height, false); err != nil { log.Fatalf("Failed to apply blocks: %v\n", err) } elapsed := time.Since(start) diff --git a/pkg/importer/importer.go b/pkg/importer/importer.go index f8758eb90..2e01db9c5 100644 --- a/pkg/importer/importer.go +++ b/pkg/importer/importer.go @@ -12,22 +12,26 @@ import ( "github.com/wavesplatform/gowaves/pkg/proto" ) +const ( + maxBlockSize = 2 * 1024 * 1024 +) + type State interface { AcceptAndVerifyBlockBinary(block []byte, initialisation bool) error GetBlockByHeight(height uint64) (*proto.Block, error) - WavesAddressesNumber() (uint64, error) + AddressesNumber() (uint64, error) AccountBalance(addr proto.Address, asset []byte) (uint64, error) } -func ApplyFromFile(st State, blockchainPath string, nBlocks int, checkBlocks bool) error { +func ApplyFromFile(st State, blockchainPath string, nBlocks, startHeight uint64, checkBlocks bool) error { blockchain, err := os.Open(blockchainPath) if err != nil { return errors.Errorf("failed to open blockchain file: %v\n", err) } sb := make([]byte, 4) - buf := make([]byte, 2*1024*1024) + var buf [maxBlockSize]byte r := bufio.NewReader(blockchain) - for i := 0; i < nBlocks; i++ { + for height := uint64(0); height < nBlocks; height++ { if _, err := io.ReadFull(r, sb); err != nil { return err } @@ -36,20 +40,22 @@ func ApplyFromFile(st State, blockchainPath string, nBlocks int, checkBlocks boo if _, err := io.ReadFull(r, block); err != nil { return err } - if err := st.AcceptAndVerifyBlockBinary(block, true); err != nil { - return err - } - if checkBlocks { - savedBlock, err := st.GetBlockByHeight(uint64(i)) - if err != nil { - return err - } - savedBlockBytes, err := savedBlock.MarshalBinary() - if err != nil { + if height >= startHeight { + if err := st.AcceptAndVerifyBlockBinary(block, true); err != nil { return err } - if bytes.Compare(block, savedBlockBytes) != 0 { - return errors.New("accepted and returned blocks differ\n") + if checkBlocks { + savedBlock, err := st.GetBlockByHeight(height) + if err != nil { + return err + } + savedBlockBytes, err := savedBlock.MarshalBinary() + if err != nil { + return err + } + if bytes.Compare(block, savedBlockBytes) != 0 { + return errors.New("accepted and returned blocks differ\n") + } } } } @@ -69,7 +75,7 @@ func CheckBalances(st State, balancesPath string) error { if err := jsonParser.Decode(&state); err != nil { return errors.Errorf("failed to decode state: %v\n", err) } - addressesNumber, err := st.WavesAddressesNumber() + addressesNumber, err := st.AddressesNumber() if err != nil { return errors.Errorf("failed to get number of waves addresses: %v\n", err) } diff --git a/pkg/keyvalue/common.go b/pkg/keyvalue/common.go index baace3fb6..ef4851074 100644 --- a/pkg/keyvalue/common.go +++ b/pkg/keyvalue/common.go @@ -3,9 +3,11 @@ package keyvalue type KeyValue interface { Has(key []byte) (bool, error) Put(key, val []byte) error + PutDirectly(key, val []byte) error Get(key []byte) ([]byte, error) Delete(key []byte) error Flush() error + Close() error } type Iterator interface { diff --git a/pkg/keyvalue/leveldb.go b/pkg/keyvalue/leveldb.go index e7ca83c88..d7df5c052 100644 --- a/pkg/keyvalue/leveldb.go +++ b/pkg/keyvalue/leveldb.go @@ -35,6 +35,13 @@ func (k *KeyVal) Delete(key []byte) error { return k.db.Delete(key, nil) } +func (k *KeyVal) PutDirectly(key, val []byte) error { + if err := k.db.Put(key, val, nil); err != nil { + return err + } + return nil +} + func (k *KeyVal) Put(key, val []byte) error { if k.batch != nil { k.batch.Put(key, val) @@ -64,3 +71,7 @@ func (k *KeyVal) NewKeyIterator(prefix []byte) (Iterator, error) { return k.db.NewIterator(nil, nil), nil } } + +func (k *KeyVal) Close() error { + return k.db.Close() +} diff --git a/pkg/state/accountsstorage.go b/pkg/state/accountsstorage.go new file mode 100644 index 000000000..7af5b1bfd --- /dev/null +++ b/pkg/state/accountsstorage.go @@ -0,0 +1,291 @@ +package state + +import ( + "encoding/binary" + + "github.com/pkg/errors" + "github.com/wavesplatform/gowaves/pkg/crypto" + "github.com/wavesplatform/gowaves/pkg/keyvalue" +) + +const ( + recordSize = crypto.SignatureSize + 8 +) + +type ID2Height interface { + HeightByBlockID(blockID crypto.Signature) (uint64, error) +} + +type AccountsStorage struct { + genesis crypto.Signature + Db keyvalue.IterableKeyVal + id2Height ID2Height + rollbackMax int +} + +var Empty = []byte{} + +func toBlockID(bytes []byte) (crypto.Signature, error) { + var res crypto.Signature + if len(bytes) != crypto.SignatureSize { + return res, errors.New("failed to convert bytes to block ID: invalid length of bytes") + } + copy(res[:], bytes) + return res, nil +} + +func NewAccountsStorage(genesis crypto.Signature, db keyvalue.IterableKeyVal) (*AccountsStorage, error) { + has, err := db.Has([]byte{DbHeightKeyPrefix}) + if err != nil { + return nil, err + } + if !has { + heightBuf := make([]byte, 8) + binary.LittleEndian.PutUint64(heightBuf, 0) + if err := db.PutDirectly([]byte{DbHeightKeyPrefix}, heightBuf); err != nil { + return nil, err + } + } + return &AccountsStorage{genesis: genesis, Db: db}, nil +} + +func (s *AccountsStorage) SetRollbackMax(rollbackMax int, id2Height ID2Height) { + s.rollbackMax = rollbackMax + s.id2Height = id2Height +} + +func (s *AccountsStorage) SetHeight(height uint64, directly bool) error { + dbHeightBytes := make([]byte, 8) + binary.LittleEndian.PutUint64(dbHeightBytes, height) + if directly { + if err := s.Db.PutDirectly([]byte{DbHeightKeyPrefix}, dbHeightBytes); err != nil { + return err + } + } else { + if err := s.Db.Put([]byte{DbHeightKeyPrefix}, dbHeightBytes); err != nil { + return err + } + } + return nil +} + +func (s *AccountsStorage) GetHeight() (uint64, error) { + dbHeightBytes, err := s.Db.Get([]byte{DbHeightKeyPrefix}) + if err != nil { + return 0, err + } + return binary.LittleEndian.Uint64(dbHeightBytes), nil +} + +func (s *AccountsStorage) cutHistory(historyKey []byte, history []byte) ([]byte, error) { + historySize := len(history) + // Always leave at least 1 record. + last := historySize - recordSize + for i := 0; i < last; i += recordSize { + record := history[i : i+recordSize] + idBytes := record[len(record)-crypto.SignatureSize:] + blockID, err := toBlockID(idBytes) + if err != nil { + return nil, err + } + if blockID != s.genesis { + blockHeight, err := s.id2Height.HeightByBlockID(blockID) + if err != nil { + return nil, err + } + currentHeight, err := s.GetHeight() + if err != nil { + return nil, err + } + if currentHeight-blockHeight > uint64(s.rollbackMax) { + history = history[i+recordSize:] + } else { + break + } + } + } + if len(history) != historySize { + // Some records were removed, so we need to update the DB. + if err := s.Db.PutDirectly(historyKey, history); err != nil { + return nil, err + } + } + return history, nil +} + +func (s *AccountsStorage) filterHistory(historyKey []byte, history []byte) ([]byte, error) { + historySize := len(history) + for i := historySize; i >= recordSize; i -= recordSize { + record := history[i-recordSize : i] + idBytes := record[len(record)-crypto.SignatureSize:] + blockID, err := toBlockID(idBytes) + if err != nil { + return nil, err + } + key := BlockIdKey{BlockID: blockID} + has, err := s.Db.Has(key.Bytes()) + if err != nil { + return nil, err + } + if has { + // Is valid block. + break + } + // Erase invalid (outdated due to rollbacks) record. + history = history[:i-recordSize] + } + if len(history) != historySize { + // Some records were removed, so we need to update the DB. + if err := s.Db.PutDirectly(historyKey, history); err != nil { + return nil, err + } + } + return history, nil +} + +func (s *AccountsStorage) AddressesNumber() (uint64, error) { + iter, err := s.Db.NewKeyIterator([]byte{BalanceKeyPrefix}) + if err != nil { + return 0, err + } + addressesNumber := uint64(0) + for iter.Next() { + balance, err := s.AccountBalance(iter.Key()) + if err != nil { + return 0, err + } + if balance > 0 { + addressesNumber++ + } + } + iter.Release() + if err := iter.Error(); err != nil { + return 0, err + } + return addressesNumber, nil +} + +func (s *AccountsStorage) AccountBalance(balanceKey []byte) (uint64, error) { + has, err := s.Db.Has(balanceKey) + if err != nil { + return 0, errors.Errorf("failed to check if balance key exists: %v\n", err) + } + if !has { + // TODO: think about this scenario. + return 0, nil + } + history, err := s.Db.Get(balanceKey) + if err != nil { + return 0, errors.Errorf("failed to get history for given key: %v\n", err) + } + // Delete invalid records. + history, err = s.filterHistory(balanceKey, history) + if err != nil { + return 0, errors.Errorf("failed to filter history: %v\n", err) + } + if len(history) == 0 { + // There were no valid records, so the history is empty after filtering. + return 0, nil + } + balanceEnd := len(history) - crypto.SignatureSize + balance := binary.LittleEndian.Uint64(history[balanceEnd-8 : balanceEnd]) + return balance, nil +} + +func (s *AccountsStorage) newHistory(newRecord []byte, key []byte, blockID crypto.Signature) ([]byte, error) { + has, err := s.Db.Has(key) + if err != nil { + return nil, err + } + if !has { + // New history. + return newRecord, nil + } + // Get current history. + history, err := s.Db.Get(key) + if err != nil { + return nil, err + } + // Delete invalid (because of rollback) records. + history, err = s.filterHistory(key, history) + if err != nil { + return nil, err + } + if s.rollbackMax != 0 { + // Remove records which are too far in the past. + history, err = s.cutHistory(key, history) + if err != nil { + return nil, err + } + } + if len(history) < recordSize { + // History is empty after filtering, new record is the first one. + return newRecord, nil + } + lastRecord := history[len(history)-recordSize:] + idBytes := lastRecord[len(lastRecord)-crypto.SignatureSize:] + lastBlockID, err := toBlockID(idBytes) + if err != nil { + return nil, err + } + if lastBlockID == blockID { + // If the last record is the same block, rewrite it. + copy(history[len(history)-recordSize:], newRecord) + } else { + // Append new record to the end. + history = append(history, newRecord...) + } + return history, nil +} + +func (s *AccountsStorage) SetAccountBalance(balanceKey []byte, balance uint64, blockID crypto.Signature) error { + // Add block to valid blocks. + key := BlockIdKey{BlockID: blockID} + if err := s.Db.Put(key.Bytes(), Empty); err != nil { + return err + } + // Prepare new record. + balanceBuf := make([]byte, 8) + binary.LittleEndian.PutUint64(balanceBuf, balance) + newRecord := append(balanceBuf, blockID[:]...) + // Add it to history with filtering. + history, err := s.newHistory(newRecord, balanceKey, blockID) + if err != nil { + return err + } + if err := s.Db.Put(balanceKey, history); err != nil { + return err + } + return nil +} + +func (s *AccountsStorage) RollbackBlock(blockID crypto.Signature) error { + // Decrease DB's height (for sync/recovery). + height, err := s.GetHeight() + if err != nil { + return err + } + if err := s.SetHeight(height-1, true); err != nil { + return err + } + key := BlockIdKey{BlockID: blockID} + if err := s.Db.Delete(key.Bytes()); err != nil { + return err + } + return nil +} + +func (s *AccountsStorage) FinishBlock() error { + // Increase DB's height (for sync/recovery). + height, err := s.GetHeight() + if err != nil { + return err + } + if err := s.SetHeight(height+1, false); err != nil { + return err + } + if err := s.Db.Flush(); err != nil { + return err + } + return nil +} diff --git a/pkg/storage/accountsstorage_test.go b/pkg/state/accountsstorage_test.go similarity index 60% rename from pkg/storage/accountsstorage_test.go rename to pkg/state/accountsstorage_test.go index 709eb1d6e..c9270eb9f 100644 --- a/pkg/storage/accountsstorage_test.go +++ b/pkg/state/accountsstorage_test.go @@ -1,4 +1,4 @@ -package storage +package state import ( "io/ioutil" @@ -12,40 +12,28 @@ import ( ) const ( - TOTAL_BLOCKS_NUMBER = 200 + totalBlocksNumber = 200 ) -func createAccountsStorage(blockIdsFile string) (*AccountsStorage, []string, error) { - res := make([]string, 3) +func createAccountsStorage() (*AccountsStorage, []string, error) { + res := make([]string, 1) dbDir0, err := ioutil.TempDir(os.TempDir(), "dbDir0") if err != nil { return nil, res, err } - globalStor, err := keyvalue.NewKeyVal(dbDir0, false) + globalStor, err := keyvalue.NewKeyVal(dbDir0, true) if err != nil { return nil, res, err } - dbDir1, err := ioutil.TempDir(os.TempDir(), "dbDir1") + genesis, err := crypto.NewSignatureFromBase58(genesisSignature) if err != nil { return nil, res, err } - addr2Index, err := keyvalue.NewKeyVal(dbDir1, false) + stor, err := NewAccountsStorage(genesis, globalStor) if err != nil { return nil, res, err } - dbDir2, err := ioutil.TempDir(os.TempDir(), "dbDir2") - if err != nil { - return nil, res, err - } - asset2Index, err := keyvalue.NewKeyVal(dbDir2, false) - if err != nil { - return nil, res, err - } - stor, err := NewAccountsStorage(globalStor, addr2Index, asset2Index, blockIdsFile) - if err != nil { - return nil, res, err - } - res = []string{dbDir0, dbDir1, dbDir2} + res = []string{dbDir0} return stor, res, nil } @@ -74,12 +62,15 @@ func genBlockID(fillWith byte) crypto.Signature { } func TestBalances(t *testing.T) { - stor, path, err := createAccountsStorage("") + stor, path, err := createAccountsStorage() if err != nil { t.Fatalf("Can not create AccountsStorage: %v\n", err) } defer func() { + if err := stor.Db.Close(); err != nil { + t.Fatalf("Failed to close DB: %v", err) + } if err := util.CleanTemporaryDirs(path); err != nil { t.Fatalf("Failed to clean test data dirs: %v", err) } @@ -89,10 +80,14 @@ func TestBalances(t *testing.T) { balance := uint64(100) blockID := genBlockID(0) addr := genAddr(1) - if err := stor.SetAccountBalance(addr, nil, balance, blockID); err != nil { + key := BalanceKey{Address: addr} + if err := stor.SetAccountBalance(key.Bytes(), balance, blockID); err != nil { t.Fatalf("Faied to set account balance:%v\n", err) } - newBalance, err := stor.AccountBalance(addr, nil) + if err := stor.Db.Flush(); err != nil { + t.Fatalf("Failed to flush DB: %v\n", err) + } + newBalance, err := stor.AccountBalance(key.Bytes()) if err != nil { t.Fatalf("Failed to retrieve account balance: %v\n", err) } @@ -101,10 +96,13 @@ func TestBalances(t *testing.T) { } // Set balance in same block. balance = 2500 - if err := stor.SetAccountBalance(addr, nil, balance, blockID); err != nil { + if err := stor.SetAccountBalance(key.Bytes(), balance, blockID); err != nil { t.Fatalf("Faied to set account balance:%v\n", err) } - newBalance, err = stor.AccountBalance(addr, nil) + if err := stor.Db.Flush(); err != nil { + t.Fatalf("Failed to flush DB: %v\n", err) + } + newBalance, err = stor.AccountBalance(key.Bytes()) if err != nil { t.Fatalf("Failed to retrieve account balance: %v\n", err) } @@ -114,10 +112,13 @@ func TestBalances(t *testing.T) { // Set balance in new block. balance = 10 blockID = genBlockID(1) - if err := stor.SetAccountBalance(addr, nil, balance, blockID); err != nil { + if err := stor.SetAccountBalance(key.Bytes(), balance, blockID); err != nil { t.Fatalf("Faied to set account balance:%v\n", err) } - newBalance, err = stor.AccountBalance(addr, nil) + if err := stor.Db.Flush(); err != nil { + t.Fatalf("Failed to flush DB: %v\n", err) + } + newBalance, err = stor.AccountBalance(key.Bytes()) if err != nil { t.Fatalf("Failed to retrieve account balance: %v\n", err) } @@ -127,12 +128,15 @@ func TestBalances(t *testing.T) { } func TestRollbackBlock(t *testing.T) { - stor, path, err := createAccountsStorage("") + stor, path, err := createAccountsStorage() if err != nil { t.Fatalf("Can not create AccountsStorage: %v\n", err) } defer func() { + if err := stor.Db.Close(); err != nil { + t.Fatalf("Failed to close DB: %v", err) + } if err := util.CleanTemporaryDirs(path); err != nil { t.Fatalf("Failed to clean test data dirs: %v", err) } @@ -141,28 +145,37 @@ func TestRollbackBlock(t *testing.T) { addr0 := genAddr(0) addr1 := genAddr(1) asset1 := genAsset(1) - for i := 0; i < TOTAL_BLOCKS_NUMBER; i++ { + for i := 0; i < totalBlocksNumber; i++ { blockID := genBlockID(byte(i)) - if err := stor.SetAccountBalance(addr0, nil, uint64(i), blockID); err != nil { + key := BalanceKey{Address: addr0} + if err := stor.SetAccountBalance(key.Bytes(), uint64(i), blockID); err != nil { t.Fatalf("Faied to set account balance: %v\n", err) } - if err := stor.SetAccountBalance(addr1, nil, uint64(i/2), blockID); err != nil { + key = BalanceKey{Address: addr1} + if err := stor.SetAccountBalance(key.Bytes(), uint64(i/2), blockID); err != nil { t.Fatalf("Faied to set account balance: %v\n", err) } - if err := stor.SetAccountBalance(addr1, asset1, uint64(i/3), blockID); err != nil { + key = BalanceKey{Address: addr1, Asset: asset1} + if err := stor.SetAccountBalance(key.Bytes(), uint64(i/3), blockID); err != nil { t.Fatalf("Faied to set account balance: %v\n", err) } + if err := stor.FinishBlock(); err != nil { + t.Fatalf("FinishBlock(): %v\n", err) + } } - for i := TOTAL_BLOCKS_NUMBER - 1; i > 0; i-- { - balance0, err := stor.AccountBalance(addr0, nil) + for i := totalBlocksNumber - 1; i > 0; i-- { + key := BalanceKey{Address: addr0} + balance0, err := stor.AccountBalance(key.Bytes()) if err != nil { t.Fatalf("Failed to retrieve account balance: %v\n", err) } - balance1, err := stor.AccountBalance(addr1, nil) + key = BalanceKey{Address: addr1} + balance1, err := stor.AccountBalance(key.Bytes()) if err != nil { t.Fatalf("Failed to retrieve account balance: %v\n", err) } - asset1Balance, err := stor.AccountBalance(addr1, asset1) + key = BalanceKey{Address: addr1, Asset: asset1} + asset1Balance, err := stor.AccountBalance(key.Bytes()) if err != nil { t.Fatalf("Failed to retrieve account balance: %v\n", err) } @@ -179,7 +192,7 @@ func TestRollbackBlock(t *testing.T) { // Undo block. blockID := genBlockID(byte(i)) if err := stor.RollbackBlock(blockID); err != nil { - t.Fatalf("Failed to rollback block: %v\n", err) + t.Fatalf("Failed to rollback block: %v %d\n", err, i) } } } diff --git a/pkg/storage/blockreadwriter.go b/pkg/state/blockreadwriter.go similarity index 61% rename from pkg/storage/blockreadwriter.go rename to pkg/state/blockreadwriter.go index 2c196f277..4f9f37040 100644 --- a/pkg/storage/blockreadwriter.go +++ b/pkg/state/blockreadwriter.go @@ -1,4 +1,4 @@ -package storage +package state import ( "bufio" @@ -10,11 +10,11 @@ import ( "github.com/pkg/errors" "github.com/wavesplatform/gowaves/pkg/crypto" "github.com/wavesplatform/gowaves/pkg/keyvalue" + "github.com/wavesplatform/gowaves/pkg/proto" ) type BlockReadWriter struct { - // keyvalue.KeyValue to store ID --> offset in blockchain for blocks and transactions. - idKeyVal keyvalue.KeyValue + Db keyvalue.KeyValue // Series of transactions. blockchain *os.File @@ -22,8 +22,6 @@ type BlockReadWriter struct { headers *os.File // Height is used as index for block IDs. blockHeight2ID *os.File - // IDs of transactions. - txIDs *os.File blockchainBuf *bufio.Writer @@ -31,38 +29,46 @@ type BlockReadWriter struct { txBounds []byte headerBounds []byte heightBuf []byte - txNumberBuf []byte // offsetEnd is common for headers and the blockchain, since the limit for any offset length is 8 bytes. offsetEnd uint64 blockchainLen, headersLen uint64 - height uint64 // Total number of transactions. - txNumber uint64 offsetLen, headerOffsetLen int mtx sync.RWMutex } func openOrCreate(path string) (*os.File, uint64, error) { - if _, err := os.Stat(path); err == nil { - file, err := os.Open(path) - if err != nil { - return nil, 0, err - } - stat, err := os.Stat(path) - if err != nil { - return nil, 0, err + file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0755) + if err != nil { + return nil, 0, err + } + stat, err := os.Stat(path) + if err != nil { + return nil, 0, err + } + return file, uint64(stat.Size()), nil +} + +func initHeight(db keyvalue.KeyValue) (uint64, error) { + has, err := db.Has([]byte{RwHeightKeyPrefix}) + if err != nil { + return 0, err + } + if !has { + heightBuf := make([]byte, 8) + binary.LittleEndian.PutUint64(heightBuf, 0) + if err := db.PutDirectly([]byte{RwHeightKeyPrefix}, heightBuf); err != nil { + return 0, err } - return file, uint64(stat.Size()), nil - } else if os.IsNotExist(err) { - file, err := os.Create(path) + return 0, nil + } else { + heightBytes, err := db.Get([]byte{RwHeightKeyPrefix}) if err != nil { - return nil, 0, err + return 0, err } - return file, 0, nil - } else { - return nil, 0, err + return binary.LittleEndian.Uint64(heightBytes), nil } } @@ -75,11 +81,7 @@ func NewBlockReadWriter(dir string, offsetLen, headerOffsetLen int, keyVal keyva if err != nil { return nil, err } - blockHeight2ID, blockHeight2IDSize, err := openOrCreate(path.Join(dir, "block_height_to_id")) - if err != nil { - return nil, err - } - txIDs, txIDsSize, err := openOrCreate(path.Join(dir, "tx_ids")) + blockHeight2ID, _, err := openOrCreate(path.Join(dir, "block_height_to_id")) if err != nil { return nil, err } @@ -89,33 +91,61 @@ func NewBlockReadWriter(dir string, offsetLen, headerOffsetLen int, keyVal keyva if headerOffsetLen > 8 { return nil, errors.New("headerOffsetLen is too large") } + if _, err := initHeight(keyVal); err != nil { + return nil, err + } return &BlockReadWriter{ - idKeyVal: keyVal, + Db: keyVal, blockchain: blockchain, headers: headers, blockHeight2ID: blockHeight2ID, - txIDs: txIDs, blockchainBuf: bufio.NewWriter(blockchain), txBounds: make([]byte, offsetLen*2), headerBounds: make([]byte, headerOffsetLen*2), blockBounds: make([]byte, offsetLen*2), heightBuf: make([]byte, 8), - txNumberBuf: make([]byte, 8), offsetEnd: uint64(1< %d", rw.blockchainLen, rw.offsetEnd) } binary.LittleEndian.PutUint64(rw.txBounds[rw.offsetLen:], rw.blockchainLen) - if err := rw.idKeyVal.Put(txID, rw.txBounds); err != nil { - return err - } - if _, err := rw.txIDs.Write(txID); err != nil { + key := TxOffsetKey{TxID: txID} + if err := rw.Db.Put(key.Bytes(), rw.txBounds); err != nil { return err } - rw.txNumber++ - binary.LittleEndian.PutUint64(rw.txNumberBuf, rw.txNumber) return nil } @@ -199,24 +229,28 @@ func (rw *BlockReadWriter) BlockIDByHeight(height uint64) (crypto.Signature, err func (rw *BlockReadWriter) HeightByBlockID(blockID crypto.Signature) (uint64, error) { rw.mtx.RLock() defer rw.mtx.RUnlock() - blockInfo, err := rw.idKeyVal.Get(blockID[:]) + key := BlockOffsetKey{BlockID: blockID} + blockInfo, err := rw.Db.Get(key.Bytes()) if err != nil { return 0, err } - height := binary.LittleEndian.Uint64(blockInfo[len(blockInfo)-16 : len(blockInfo)-8]) + height := binary.LittleEndian.Uint64(blockInfo[len(blockInfo)-8:]) return height, nil } -func (rw *BlockReadWriter) CurrentHeight() uint64 { - rw.mtx.RLock() - defer rw.mtx.RUnlock() - return rw.height +func (rw *BlockReadWriter) CurrentHeight() (uint64, error) { + height, err := rw.GetHeight() + if err != nil { + return 0, err + } + return height, nil } func (rw *BlockReadWriter) ReadTransaction(txID []byte) ([]byte, error) { rw.mtx.RLock() defer rw.mtx.RUnlock() - txBounds, err := rw.idKeyVal.Get(txID) + key := TxOffsetKey{TxID: txID} + txBounds, err := rw.Db.Get(key.Bytes()) if err != nil { return nil, err } @@ -235,11 +269,12 @@ func (rw *BlockReadWriter) ReadTransaction(txID []byte) ([]byte, error) { func (rw *BlockReadWriter) ReadBlockHeader(blockID crypto.Signature) ([]byte, error) { rw.mtx.RLock() defer rw.mtx.RUnlock() - blockInfo, err := rw.idKeyVal.Get(blockID[:]) + key := BlockOffsetKey{BlockID: blockID} + blockInfo, err := rw.Db.Get(key.Bytes()) if err != nil { return nil, err } - headerBounds := blockInfo[rw.offsetLen*2 : len(blockInfo)-16] + headerBounds := blockInfo[rw.offsetLen*2 : len(blockInfo)-8] headerStart := binary.LittleEndian.Uint64(headerBounds[:rw.headerOffsetLen]) headerEnd := binary.LittleEndian.Uint64(headerBounds[rw.headerOffsetLen:]) headerBytes := make([]byte, headerEnd-headerStart) @@ -255,7 +290,8 @@ func (rw *BlockReadWriter) ReadBlockHeader(blockID crypto.Signature) ([]byte, er func (rw *BlockReadWriter) ReadTransactionsBlock(blockID crypto.Signature) ([]byte, error) { rw.mtx.RLock() defer rw.mtx.RUnlock() - blockInfo, err := rw.idKeyVal.Get(blockID[:]) + key := BlockOffsetKey{BlockID: blockID} + blockInfo, err := rw.Db.Get(key.Bytes()) if err != nil { return nil, err } @@ -272,10 +308,14 @@ func (rw *BlockReadWriter) ReadTransactionsBlock(blockID crypto.Signature) ([]by return blockBytes, nil } -func (rw *BlockReadWriter) cleanIDs(newHeight, newTxNumber uint64) error { +func (rw *BlockReadWriter) cleanIDs(oldHeight, newBlockchainLen uint64) error { + newHeight, err := rw.GetHeight() + if err != nil { + return err + } // Clean block IDs. - offset := rw.height - blocksIdsToRemove := int(rw.height - newHeight) + offset := oldHeight + blocksIdsToRemove := int(oldHeight - newHeight) for i := 0; i < blocksIdsToRemove; i++ { readPos := int64((offset - 1) * crypto.SignatureSize) idBytes := make([]byte, crypto.SignatureSize) @@ -284,94 +324,103 @@ func (rw *BlockReadWriter) cleanIDs(newHeight, newTxNumber uint64) error { } else if n != crypto.SignatureSize { return errors.New("cleanIDs(): invalid id size") } - if err := rw.idKeyVal.Delete(idBytes); err != nil { + blockID, err := toBlockID(idBytes) + if err != nil { + return err + } + key := BlockOffsetKey{BlockID: blockID} + if err := rw.Db.Delete(key.Bytes()); err != nil { return err } offset-- } // Clean transaction IDs. - offset = rw.txNumber - txIdsToRemove := int(rw.txNumber - newTxNumber) - for i := 0; i < txIdsToRemove; i++ { - readPos := int64((offset - 1) * crypto.DigestSize) - idBytes := make([]byte, crypto.DigestSize) - if n, err := rw.txIDs.ReadAt(idBytes, readPos); err != nil { + readPos := newBlockchainLen + for readPos < rw.blockchainLen { + txSizeBytes := make([]byte, 4) + if _, err := rw.blockchain.ReadAt(txSizeBytes, int64(readPos)); err != nil { return err - } else if n != crypto.DigestSize { - return errors.New("cleanIDs(): invalid id size") } - if err := rw.idKeyVal.Delete(idBytes); err != nil { + txSize := binary.BigEndian.Uint32(txSizeBytes) + readPos += 4 + txBytes := make([]byte, txSize) + if _, err := rw.blockchain.ReadAt(txBytes, int64(readPos)); err != nil { + return err + } + readPos += uint64(txSize) + tx, err := proto.BytesToTransaction(txBytes) + if err != nil { + return err + } + key := TxOffsetKey{TxID: tx.GetID()} + if err := rw.Db.Delete(key.Bytes()); err != nil { return err } - offset-- - } - // Remove blockIDs from blockHeight2ID file. - newOffset := int64(newHeight * crypto.SignatureSize) - if err := rw.blockHeight2ID.Truncate(newOffset); err != nil { - return err - } - if _, err := rw.blockHeight2ID.Seek(newOffset, 0); err != nil { - return err - } - // Remove txIDs from txIDs file. - newOffset = int64(newTxNumber * crypto.DigestSize) - if err := rw.txIDs.Truncate(newOffset); err != nil { - return err - } - if _, err := rw.txIDs.Seek(newOffset, 0); err != nil { - return err } return nil } -func (rw *BlockReadWriter) RemoveBlocks(removalEdge crypto.Signature) error { +func (rw *BlockReadWriter) Rollback(removalEdge crypto.Signature, cleanIDs bool) error { rw.mtx.Lock() defer rw.mtx.Unlock() - blockInfo, err := rw.idKeyVal.Get(removalEdge[:]) + key := BlockOffsetKey{BlockID: removalEdge} + blockInfo, err := rw.Db.Get(key.Bytes()) if err != nil { return err } - // Remove transactions. + newHeight := binary.LittleEndian.Uint64(blockInfo[len(blockInfo)-8:]) + 1 + // Set new height first of all. + oldHeight, err := rw.GetHeight() + if err != nil { + return err + } + if oldHeight < newHeight { + return errors.New("new height is greater than current height") + } + if err := rw.SetHeight(newHeight, true); err != nil { + return err + } blockBounds := blockInfo[:rw.offsetLen*2] - blockEnd := int64(binary.LittleEndian.Uint64(blockBounds[rw.offsetLen:])) - if err := rw.blockchain.Truncate(blockEnd); err != nil { + blockEnd := binary.LittleEndian.Uint64(blockBounds[rw.offsetLen:]) + if cleanIDs { + // Clean IDs of blocks and transactions. + if err := rw.cleanIDs(oldHeight, blockEnd); err != nil { + return err + } + } + // Remove transactions. + if err := rw.blockchain.Truncate(int64(blockEnd)); err != nil { return err } - if _, err := rw.blockchain.Seek(blockEnd, 0); err != nil { + if _, err := rw.blockchain.Seek(int64(blockEnd), 0); err != nil { return err } // Remove headers. - headerBounds := blockInfo[rw.offsetLen*2 : len(blockInfo)-16] - headerEnd := int64(binary.LittleEndian.Uint64(headerBounds[rw.headerOffsetLen:])) - if err := rw.headers.Truncate(headerEnd); err != nil { + headerBounds := blockInfo[rw.offsetLen*2 : len(blockInfo)-8] + headerEnd := binary.LittleEndian.Uint64(headerBounds[rw.headerOffsetLen:]) + if err := rw.headers.Truncate(int64(headerEnd)); err != nil { return err } - if _, err := rw.headers.Seek(headerEnd, 0); err != nil { + if _, err := rw.headers.Seek(int64(headerEnd), 0); err != nil { return err } - newHeight := binary.LittleEndian.Uint64(blockInfo[len(blockInfo)-16:len(blockInfo)-8]) + 1 - newTxNumber := binary.LittleEndian.Uint64(blockInfo[len(blockInfo)-8:]) + 1 - // Clean IDs of blocks and transactions. - if err := rw.cleanIDs(newHeight, newTxNumber); err != nil { - return nil + // Remove blockIDs from blockHeight2ID file. + newOffset := int64(newHeight * crypto.SignatureSize) + if err := rw.blockHeight2ID.Truncate(newOffset); err != nil { + return err + } + if _, err := rw.blockHeight2ID.Seek(newOffset, 0); err != nil { + return err } // Decrease counters. - rw.blockchainLen = uint64(blockEnd) - rw.headersLen = uint64(headerEnd) - rw.height = newHeight - rw.txNumber = newTxNumber - // Reset buffer. + rw.blockchainLen = blockEnd + rw.headersLen = headerEnd + // Reset buffers. rw.blockchainBuf.Reset(rw.blockchain) return nil } func (rw *BlockReadWriter) Close() error { - if err := rw.idKeyVal.Flush(); err != nil { - return err - } - if err := rw.blockchainBuf.Flush(); err != nil { - return err - } if err := rw.blockchain.Close(); err != nil { return err } @@ -381,8 +430,5 @@ func (rw *BlockReadWriter) Close() error { if err := rw.blockHeight2ID.Close(); err != nil { return err } - if err := rw.txIDs.Close(); err != nil { - return err - } return nil } diff --git a/pkg/storage/blockreadwriter_test.go b/pkg/state/blockreadwriter_test.go similarity index 91% rename from pkg/storage/blockreadwriter_test.go rename to pkg/state/blockreadwriter_test.go index 738a704e9..1ff5eb72d 100644 --- a/pkg/storage/blockreadwriter_test.go +++ b/pkg/state/blockreadwriter_test.go @@ -1,4 +1,4 @@ -package storage +package state import ( "bufio" @@ -10,7 +10,6 @@ import ( "io/ioutil" "os" "path/filepath" - "runtime" "sync" "testing" "time" @@ -23,9 +22,9 @@ import ( ) const ( - TASKS_CHAN_BUFFER_SIZE = 20 - READERS_NUMBER = 20 - BLOCKS_NUMBER = 9900 + tasksChanBufferSize = 20 + readersNumber = 5 + blocksNumber = 1000 ) var ( @@ -49,14 +48,6 @@ type ReadTask struct { CorrectResult []byte } -func getLocalDir() (string, error) { - _, filename, _, ok := runtime.Caller(0) - if !ok { - return "", errors.Errorf("Unable to find current package file") - } - return filepath.Dir(filename), nil -} - func readRealBlocks(t *testing.T, nBlocks int) ([]*proto.Block, error) { if len(cached_blocks) >= nBlocks { return cached_blocks[:nBlocks], nil @@ -152,6 +143,9 @@ func writeBlock(t *testing.T, rw *BlockReadWriter, block *proto.Block) { if err := rw.FinishBlock(blockID); err != nil { t.Fatalf("FinishBlock(): %v", err) } + if err := rw.Db.Flush(); err != nil { + t.Fatalf("Failed to flush DB: %v", err) + } } func testSingleBlock(t *testing.T, rw *BlockReadWriter, block *proto.Block) { @@ -220,6 +214,10 @@ func writeBlocks(ctx context.Context, rw *BlockReadWriter, blocks []*proto.Block close(readTasks) return err } + if err := rw.Db.Flush(); err != nil { + close(readTasks) + return err + } task = &ReadTask{Type: ReadBlock, BlockID: blockID, CorrectResult: block.Transactions} tasksBuf = append(tasksBuf, task) for _, task := range tasksBuf { @@ -286,12 +284,15 @@ func TestSimpleReadWrite(t *testing.T) { if err := rw.Close(); err != nil { t.Fatalf("Failed to close BlockReadWriter: %v", err) } + if err := rw.Db.Close(); err != nil { + t.Fatalf("Failed to close DB: %v", err) + } if err := util.CleanTemporaryDirs(path); err != nil { t.Fatalf("Failed to clean test data dirs: %v", err) } }() - blocks, err := readRealBlocks(t, BLOCKS_NUMBER) + blocks, err := readRealBlocks(t, blocksNumber) if err != nil { t.Fatalf("Can not read blocks from blockchain file: %v", err) } @@ -310,12 +311,15 @@ func TestSimultaneousReadWrite(t *testing.T) { if err := rw.Close(); err != nil { t.Fatalf("Failed to close BlockReadWriter: %v", err) } + if err := rw.Db.Close(); err != nil { + t.Fatalf("Failed to close DB: %v", err) + } if err := util.CleanTemporaryDirs(path); err != nil { t.Fatalf("Failed to clean test data dirs: %v", err) } }() - blocks, err := readRealBlocks(t, BLOCKS_NUMBER) + blocks, err := readRealBlocks(t, blocksNumber) if err != nil { t.Fatalf("Can not read blocks from blockchain file: %v", err) } @@ -323,7 +327,7 @@ func TestSimultaneousReadWrite(t *testing.T) { var wg sync.WaitGroup ctx, cancel := context.WithCancel(context.Background()) errCounter := 0 - readTasks := make(chan *ReadTask, TASKS_CHAN_BUFFER_SIZE) + readTasks := make(chan *ReadTask, tasksChanBufferSize) wg.Add(1) go func() { defer wg.Done() @@ -336,7 +340,7 @@ func TestSimultaneousReadWrite(t *testing.T) { cancel() } }() - for i := 0; i < READERS_NUMBER; i++ { + for i := 0; i < readersNumber; i++ { wg.Add(1) go func() { defer wg.Done() @@ -366,12 +370,15 @@ func TestSimultaneousReadDelete(t *testing.T) { if err := rw.Close(); err != nil { t.Fatalf("Failed to close BlockReadWriter: %v", err) } + if err := rw.Db.Close(); err != nil { + t.Fatalf("Failed to close DB: %v", err) + } if err := util.CleanTemporaryDirs(path); err != nil { t.Fatalf("Failed to clean test data dirs: %v", err) } }() - blocks, err := readRealBlocks(t, BLOCKS_NUMBER) + blocks, err := readRealBlocks(t, blocksNumber) if err != nil { t.Fatalf("Can not read blocks from blockchain file: %v", err) } @@ -379,8 +386,8 @@ func TestSimultaneousReadDelete(t *testing.T) { for _, block := range blocks { writeBlock(t, rw, block) } - idToTest := blocks[BLOCKS_NUMBER-1].BlockSignature - prevId := blocks[BLOCKS_NUMBER-2].BlockSignature + idToTest := blocks[blocksNumber-1].BlockSignature + prevId := blocks[blocksNumber-2].BlockSignature var wg sync.WaitGroup var removeErr error @@ -389,7 +396,7 @@ func TestSimultaneousReadDelete(t *testing.T) { defer wg.Done() // Give some time to start reading before deleting. time.Sleep(time.Second) - removeErr = rw.RemoveBlocks(prevId) + removeErr = rw.Rollback(prevId, true) }() for { _, err = rw.ReadBlockHeader(idToTest) diff --git a/pkg/state/genesis.go b/pkg/state/genesis.go index 61d4719c2..5104b9bb8 100644 --- a/pkg/state/genesis.go +++ b/pkg/state/genesis.go @@ -6,7 +6,7 @@ import ( ) const ( - GENESIS_SIGNATURE = "FSH8eAAzZNqnG8xgTZtz5xuLqXySsXgAjmFEC25hXMbEufiGjqWPnGCZFt6gLiVLJny16ipxRNAkkzjjhqTjBE2" + genesisSignature = "FSH8eAAzZNqnG8xgTZtz5xuLqXySsXgAjmFEC25hXMbEufiGjqWPnGCZFt6gLiVLJny16ipxRNAkkzjjhqTjBE2" ) var ( diff --git a/pkg/state/keys.go b/pkg/state/keys.go new file mode 100644 index 000000000..33be8a2e4 --- /dev/null +++ b/pkg/state/keys.go @@ -0,0 +1,77 @@ +package state + +import ( + "github.com/wavesplatform/gowaves/pkg/crypto" + "github.com/wavesplatform/gowaves/pkg/proto" +) + +const ( + // Balances (main state). + BalanceKeyPrefix byte = iota + + // Valid block IDs. + BlockIdKeyPrefix + + // For block storage. + // IDs of blocks and transactions --> offsets in files. + BlockOffsetKeyPrefix + TxOffsetKeyPrefix + + // Min height of BlockReadWriter's files. + RwHeightKeyPrefix + // Height of main db. + DbHeightKeyPrefix +) + +type BalanceKey struct { + Address proto.Address + Asset []byte +} + +func (k *BalanceKey) Bytes() []byte { + if k.Asset != nil { + buf := make([]byte, 1+proto.AddressSize+crypto.DigestSize) + buf[0] = BalanceKeyPrefix + copy(buf[1:], k.Address[:]) + copy(buf[1+proto.AddressSize:], k.Asset) + return buf + } else { + buf := make([]byte, 1+proto.AddressSize) + buf[0] = BalanceKeyPrefix + copy(buf[1:], k.Address[:]) + return buf + } +} + +type BlockIdKey struct { + BlockID crypto.Signature +} + +func (k *BlockIdKey) Bytes() []byte { + buf := make([]byte, 1+crypto.SignatureSize) + buf[0] = BlockIdKeyPrefix + copy(buf[1:], k.BlockID[:]) + return buf +} + +type BlockOffsetKey struct { + BlockID crypto.Signature +} + +func (k *BlockOffsetKey) Bytes() []byte { + buf := make([]byte, 1+crypto.SignatureSize) + buf[0] = BlockOffsetKeyPrefix + copy(buf[1:], k.BlockID[:]) + return buf +} + +type TxOffsetKey struct { + TxID []byte +} + +func (k *TxOffsetKey) Bytes() []byte { + buf := make([]byte, 1+crypto.DigestSize) + buf[0] = TxOffsetKeyPrefix + copy(buf[1:], k.TxID) + return buf +} diff --git a/pkg/state/state.go b/pkg/state/state.go index 36f3e3fc9..41293c8a1 100644 --- a/pkg/state/state.go +++ b/pkg/state/state.go @@ -9,21 +9,82 @@ import ( "github.com/wavesplatform/gowaves/pkg/crypto" "github.com/wavesplatform/gowaves/pkg/keyvalue" "github.com/wavesplatform/gowaves/pkg/proto" - "github.com/wavesplatform/gowaves/pkg/storage" ) const ( - BLOCKS_STOR_DIR = "blocks_storage" - BLOCKS_STOR_KEYVAL_DIR = "blocks_storage_keyvalue" - ACCOUNTS_STOR_GLOBAL_DIR = "accounts_stor_global" - ACCOUNTS_STOR_ADDR_DIR = "accounts_stor_addr" - ACCOUNTS_STOR_ASSET_DIR = "accounts_stor_assets" + rollbackMaxBlocks = 4000 + blocksStorDir = "blocks_storage" + keyvalueDir = "keyvalue" ) +type WavesBalanceKey [1 + proto.AddressSize]byte +type AssetBalanceKey [1 + proto.AddressSize + crypto.DigestSize]byte + +type BalancesStorage struct { + global *AccountsStorage + assets map[AssetBalanceKey]uint64 + waves map[WavesBalanceKey]uint64 +} + +func NewBalancesStorage(global *AccountsStorage) (*BalancesStorage, error) { + return &BalancesStorage{ + global: global, + assets: make(map[AssetBalanceKey]uint64), + waves: make(map[WavesBalanceKey]uint64), + }, nil +} + +func (stor *BalancesStorage) AccountBalance(key []byte) (uint64, error) { + size := len(key) + if size == 1+proto.AddressSize { + var wavesKey WavesBalanceKey + copy(wavesKey[:], key) + _, ok := stor.waves[wavesKey] + if !ok { + balance, err := stor.global.AccountBalance(key) + if err != nil { + return 0, err + } + stor.waves[wavesKey] = balance + } + return stor.waves[wavesKey], nil + } else if size == 1+proto.AddressSize+crypto.DigestSize { + var assetKey AssetBalanceKey + copy(assetKey[:], key) + _, ok := stor.assets[assetKey] + if !ok { + balance, err := stor.global.AccountBalance(key) + if err != nil { + return 0, err + } + stor.assets[assetKey] = balance + } + return stor.assets[assetKey], nil + } + return 0, errors.New("invalid key size") +} + +func (stor *BalancesStorage) SetAccountBalance(key []byte, balance uint64) error { + size := len(key) + if size == 1+proto.AddressSize { + var wavesKey WavesBalanceKey + copy(wavesKey[:], key) + stor.waves[wavesKey] = balance + } else if size == 1+proto.AddressSize+crypto.DigestSize { + var assetKey AssetBalanceKey + copy(assetKey[:], key) + stor.assets[assetKey] = balance + } else { + return errors.New("invalid key size") + } + return nil +} + type StateManager struct { genesis crypto.Signature - accountsStorage *storage.AccountsStorage - rw *storage.BlockReadWriter + db keyvalue.KeyValue + accountsStorage *AccountsStorage + rw *BlockReadWriter } type BlockStorageParams struct { @@ -34,43 +95,68 @@ func DefaultBlockStorageParams() BlockStorageParams { return BlockStorageParams{OffsetLen: 8, HeaderOffsetLen: 8} } +func syncDbAndStorage(db keyvalue.KeyValue, stor *AccountsStorage, rw *BlockReadWriter) error { + dbHeightBytes, err := db.Get([]byte{DbHeightKeyPrefix}) + if err != nil { + return err + } + dbHeight := binary.LittleEndian.Uint64(dbHeightBytes) + rwHeighBytes, err := db.Get([]byte{RwHeightKeyPrefix}) + if err != nil { + return err + } + rwHeight := binary.LittleEndian.Uint64(rwHeighBytes) + if rwHeight < dbHeight { + // This should never happen, because we update block storage before writing changes into DB. + panic("Impossible to sync: DB is ahead of block storage; remove data dir and restart the node.") + } + if dbHeight > 0 { + last, err := rw.BlockIDByHeight(dbHeight - 1) + if err != nil { + return err + } + if err := rw.Rollback(last, false); err != nil { + return errors.Errorf("failed to remove blocks from block storage: %v", err) + } + } + return nil +} + func NewStateManager(dataDir string, params BlockStorageParams) (*StateManager, error) { - genesis, err := crypto.NewSignatureFromBase58(GENESIS_SIGNATURE) + genesis, err := crypto.NewSignatureFromBase58(genesisSignature) if err != nil { - return nil, errors.Errorf("Failed to get genesis signature from string: %v\n", err) + return nil, errors.Errorf("failed to get genesis signature from string: %v\n", err) } - blockStorageKeyValDir := filepath.Join(dataDir, BLOCKS_STOR_KEYVAL_DIR) - blockStorageKeyVal, err := keyvalue.NewKeyVal(blockStorageKeyValDir, true) - blockStorageDir := filepath.Join(dataDir, BLOCKS_STOR_DIR) + blockStorageDir := filepath.Join(dataDir, blocksStorDir) if _, err := os.Stat(blockStorageDir); os.IsNotExist(err) { if err := os.Mkdir(blockStorageDir, 0755); err != nil { - return nil, errors.Errorf("Failed to create blocks directory: %v\n", err) + return nil, errors.Errorf("failed to create blocks directory: %v\n", err) } } - rw, err := storage.NewBlockReadWriter(blockStorageDir, params.OffsetLen, params.HeaderOffsetLen, blockStorageKeyVal) + dbDir := filepath.Join(dataDir, keyvalueDir) + db, err := keyvalue.NewKeyVal(dbDir, true) + rw, err := NewBlockReadWriter(blockStorageDir, params.OffsetLen, params.HeaderOffsetLen, db) if err != nil { - return nil, errors.Errorf("Failed to create block storage: %v\n", err) - } - dbDir0 := filepath.Join(dataDir, ACCOUNTS_STOR_GLOBAL_DIR) - globalStor, err := keyvalue.NewKeyVal(dbDir0, false) - dbDir1 := filepath.Join(dataDir, ACCOUNTS_STOR_ASSET_DIR) - addr2Index, err := keyvalue.NewKeyVal(dbDir1, false) - dbDir2 := filepath.Join(dataDir, ACCOUNTS_STOR_ADDR_DIR) - asset2Index, err := keyvalue.NewKeyVal(dbDir2, false) - idsFile, err := rw.BlockIdsFilePath() - if err != nil { - return nil, errors.Errorf("failed to get block ids file's path: %v\n", err) + return nil, errors.Errorf("failed to create block storage: %v\n", err) } - accountsStor, err := storage.NewAccountsStorage(globalStor, addr2Index, asset2Index, idsFile) + accountsStor, err := NewAccountsStorage(genesis, db) if err != nil { return nil, errors.Errorf("failed to create accounts storage: %v\n", err) } - state := &StateManager{genesis: genesis, accountsStorage: accountsStor, rw: rw} + accountsStor.SetRollbackMax(rollbackMaxBlocks, rw) + if err := syncDbAndStorage(db, accountsStor, rw); err != nil { + return nil, errors.Errorf("failed to sync block storage and DB: %v\n", err) + } + state := &StateManager{genesis: genesis, db: db, accountsStorage: accountsStor, rw: rw} return state, nil } func (s *StateManager) applyGenesis() error { - tv, err := proto.NewTransactionValidator(s.genesis, s.accountsStorage) + balancesStor, err := NewBalancesStorage(s.accountsStorage) + if err != nil { + return err + } + tv, err := NewTransactionValidator(s.genesis, balancesStor) if err != nil { return err } @@ -82,10 +168,18 @@ func (s *StateManager) applyGenesis() error { if err := tv.ValidateTransaction(s.genesis, &tx, true); err != nil { return errors.Wrap(err, "invalid genesis transaction") } - if err := s.performGenesisTransaction(tx); err != nil { + if err := s.performGenesisTransaction(tx, balancesStor); err != nil { return errors.Wrap(err, "failed to perform genesis transaction") } } + // Write transactions from local balances storage into DB batch. + if err := s.addChangesToBatch(balancesStor, s.genesis); err != nil { + return err + } + // Write batch to DB. + if err := s.db.Flush(); err != nil { + return err + } return nil } @@ -116,7 +210,7 @@ func (s *StateManager) GetBlockByHeight(height uint64) (*proto.Block, error) { } func (s *StateManager) Height() (uint64, error) { - return s.rw.CurrentHeight(), nil + return s.rw.CurrentHeight() } func (s *StateManager) BlockIDToHeight(blockID crypto.Signature) (uint64, error) { @@ -128,27 +222,28 @@ func (s *StateManager) HeightToBlockID(height uint64) (crypto.Signature, error) } func (s *StateManager) AccountBalance(addr proto.Address, asset []byte) (uint64, error) { - return s.accountsStorage.AccountBalance(addr, asset) + key := BalanceKey{Address: addr, Asset: asset} + return s.accountsStorage.AccountBalance(key.Bytes()) } -func (s *StateManager) WavesAddressesNumber() (uint64, error) { - return s.accountsStorage.WavesAddressesNumber() +func (s *StateManager) AddressesNumber() (uint64, error) { + return s.accountsStorage.AddressesNumber() } -func (s *StateManager) performGenesisTransaction(tx proto.Genesis) error { - receiverBalance, err := s.accountsStorage.AccountBalance(tx.Recipient, nil) +func (s *StateManager) performGenesisTransaction(tx proto.Genesis, stor *BalancesStorage) error { + key := BalanceKey{Address: tx.Recipient} + receiverBalance, err := stor.AccountBalance(key.Bytes()) if err != nil { return err } newReceiverBalance := receiverBalance + tx.Amount - if err := s.accountsStorage.SetAccountBalance(tx.Recipient, nil, newReceiverBalance, s.genesis); err != nil { + if err := stor.SetAccountBalance(key.Bytes(), newReceiverBalance); err != nil { return err } return nil } -func (s *StateManager) performTransaction(block *proto.Block, tx proto.Transaction) error { - blockID := block.BlockSignature +func (s *StateManager) performTransaction(block *proto.Block, tx proto.Transaction, stor *BalancesStorage) error { switch v := tx.(type) { case *proto.Payment: senderAddr, err := proto.NewAddressFromPublicKey(proto.MainNetScheme, v.SenderPK) @@ -159,7 +254,8 @@ func (s *StateManager) performTransaction(block *proto.Block, tx proto.Transacti if err != nil { return err } - senderBalance, err := s.accountsStorage.AccountBalance(senderAddr, nil) + senderKey := BalanceKey{Address: senderAddr} + senderBalance, err := stor.AccountBalance(senderKey.Bytes()) if err != nil { return err } @@ -167,23 +263,25 @@ func (s *StateManager) performTransaction(block *proto.Block, tx proto.Transacti if newSenderBalance < 0 { panic("Transaction results in negative balance after validation") } - if err := s.accountsStorage.SetAccountBalance(senderAddr, nil, newSenderBalance, blockID); err != nil { + if err := stor.SetAccountBalance(senderKey.Bytes(), newSenderBalance); err != nil { return err } - receiverBalance, err := s.accountsStorage.AccountBalance(v.Recipient, nil) + receiverKey := BalanceKey{Address: v.Recipient} + receiverBalance, err := stor.AccountBalance(receiverKey.Bytes()) if err != nil { return err } newReceiverBalance := receiverBalance + v.Amount - if err := s.accountsStorage.SetAccountBalance(v.Recipient, nil, newReceiverBalance, blockID); err != nil { + if err := stor.SetAccountBalance(receiverKey.Bytes(), newReceiverBalance); err != nil { return err } - minerBalance, err := s.accountsStorage.AccountBalance(minerAddr, nil) + minerKey := BalanceKey{Address: minerAddr} + minerBalance, err := stor.AccountBalance(minerKey.Bytes()) if err != nil { return err } newMinerBalance := minerBalance + v.Fee - if err := s.accountsStorage.SetAccountBalance(minerAddr, nil, newMinerBalance, blockID); err != nil { + if err := stor.SetAccountBalance(minerKey.Bytes(), newMinerBalance); err != nil { return err } return nil @@ -200,7 +298,9 @@ func (s *StateManager) performTransaction(block *proto.Block, tx proto.Transacti if err != nil { return err } - senderFeeBalance, err := s.accountsStorage.AccountBalance(senderAddr, v.FeeAsset.ToID()) + senderFeeKey := BalanceKey{Address: senderAddr, Asset: v.FeeAsset.ToID()} + senderAmountKey := BalanceKey{Address: senderAddr, Asset: v.AmountAsset.ToID()} + senderFeeBalance, err := stor.AccountBalance(senderFeeKey.Bytes()) if err != nil { return err } @@ -208,7 +308,7 @@ func (s *StateManager) performTransaction(block *proto.Block, tx proto.Transacti if newSenderFeeBalance < 0 { panic("Transaction results in negative balance after validation") } - senderAmountBalance, err := s.accountsStorage.AccountBalance(senderAddr, v.AmountAsset.ToID()) + senderAmountBalance, err := stor.AccountBalance(senderAmountKey.Bytes()) if err != nil { return err } @@ -216,26 +316,28 @@ func (s *StateManager) performTransaction(block *proto.Block, tx proto.Transacti if newSenderAmountBalance < 0 { panic("Transaction results in negative balance after validation") } - if err := s.accountsStorage.SetAccountBalance(senderAddr, v.FeeAsset.ToID(), newSenderFeeBalance, blockID); err != nil { + if err := stor.SetAccountBalance(senderFeeKey.Bytes(), newSenderFeeBalance); err != nil { return err } - if err := s.accountsStorage.SetAccountBalance(senderAddr, v.AmountAsset.ToID(), newSenderAmountBalance, blockID); err != nil { + if err := stor.SetAccountBalance(senderAmountKey.Bytes(), newSenderAmountBalance); err != nil { return err } - receiverBalance, err := s.accountsStorage.AccountBalance(*v.Recipient.Address, v.AmountAsset.ToID()) + receiverKey := BalanceKey{Address: *v.Recipient.Address, Asset: v.AmountAsset.ToID()} + receiverBalance, err := stor.AccountBalance(receiverKey.Bytes()) if err != nil { return err } newReceiverBalance := receiverBalance + v.Amount - if err := s.accountsStorage.SetAccountBalance(*v.Recipient.Address, v.AmountAsset.ToID(), newReceiverBalance, blockID); err != nil { + if err := stor.SetAccountBalance(receiverKey.Bytes(), newReceiverBalance); err != nil { return err } - minerBalance, err := s.accountsStorage.AccountBalance(minerAddr, v.FeeAsset.ToID()) + minerKey := BalanceKey{Address: minerAddr, Asset: v.FeeAsset.ToID()} + minerBalance, err := stor.AccountBalance(minerKey.Bytes()) if err != nil { return err } newMinerBalance := minerBalance + v.Fee - if err := s.accountsStorage.SetAccountBalance(minerAddr, v.FeeAsset.ToID(), newMinerBalance, blockID); err != nil { + if err := stor.SetAccountBalance(minerKey.Bytes(), newMinerBalance); err != nil { return err } return nil @@ -252,7 +354,9 @@ func (s *StateManager) performTransaction(block *proto.Block, tx proto.Transacti if err != nil { return err } - senderFeeBalance, err := s.accountsStorage.AccountBalance(senderAddr, v.FeeAsset.ToID()) + senderFeeKey := BalanceKey{Address: senderAddr, Asset: v.FeeAsset.ToID()} + senderAmountKey := BalanceKey{Address: senderAddr, Asset: v.AmountAsset.ToID()} + senderFeeBalance, err := stor.AccountBalance(senderFeeKey.Bytes()) if err != nil { return err } @@ -260,7 +364,7 @@ func (s *StateManager) performTransaction(block *proto.Block, tx proto.Transacti if newSenderFeeBalance < 0 { panic("Transaction results in negative balance after validation") } - senderAmountBalance, err := s.accountsStorage.AccountBalance(senderAddr, v.AmountAsset.ToID()) + senderAmountBalance, err := stor.AccountBalance(senderAmountKey.Bytes()) if err != nil { return err } @@ -268,26 +372,28 @@ func (s *StateManager) performTransaction(block *proto.Block, tx proto.Transacti if newSenderAmountBalance < 0 { panic("Transaction results in negative balance after validation") } - if err := s.accountsStorage.SetAccountBalance(senderAddr, v.FeeAsset.ToID(), newSenderFeeBalance, blockID); err != nil { + if err := stor.SetAccountBalance(senderFeeKey.Bytes(), newSenderFeeBalance); err != nil { return err } - if err := s.accountsStorage.SetAccountBalance(senderAddr, v.AmountAsset.ToID(), newSenderAmountBalance, blockID); err != nil { + if err := stor.SetAccountBalance(senderAmountKey.Bytes(), newSenderAmountBalance); err != nil { return err } - receiverBalance, err := s.accountsStorage.AccountBalance(*v.Recipient.Address, v.AmountAsset.ToID()) + receiverKey := BalanceKey{Address: *v.Recipient.Address, Asset: v.AmountAsset.ToID()} + receiverBalance, err := stor.AccountBalance(receiverKey.Bytes()) if err != nil { return err } newReceiverBalance := receiverBalance + v.Amount - if err := s.accountsStorage.SetAccountBalance(*v.Recipient.Address, v.AmountAsset.ToID(), newReceiverBalance, blockID); err != nil { + if err := stor.SetAccountBalance(receiverKey.Bytes(), newReceiverBalance); err != nil { return err } - minerBalance, err := s.accountsStorage.AccountBalance(minerAddr, v.FeeAsset.ToID()) + minerKey := BalanceKey{Address: minerAddr, Asset: v.FeeAsset.ToID()} + minerBalance, err := stor.AccountBalance(minerKey.Bytes()) if err != nil { return err } newMinerBalance := minerBalance + v.Fee - if err := s.accountsStorage.SetAccountBalance(minerAddr, v.FeeAsset.ToID(), newMinerBalance, blockID); err != nil { + if err := stor.SetAccountBalance(minerKey.Bytes(), newMinerBalance); err != nil { return err } return nil @@ -296,6 +402,20 @@ func (s *StateManager) performTransaction(block *proto.Block, tx proto.Transacti } } +func (s *StateManager) addChangesToBatch(stor *BalancesStorage, blockID crypto.Signature) error { + for key, balance := range stor.waves { + if err := s.accountsStorage.SetAccountBalance(key[:], balance, blockID); err != nil { + return err + } + } + for key, balance := range stor.assets { + if err := s.accountsStorage.SetAccountBalance(key[:], balance, blockID); err != nil { + return err + } + } + return nil +} + func (s *StateManager) addNewBlock(block *proto.Block, initialisation bool) error { // Indicate new block for storage. if err := s.rw.StartBlock(block.BlockSignature); err != nil { @@ -309,11 +429,16 @@ func (s *StateManager) addNewBlock(block *proto.Block, initialisation bool) erro if err := s.rw.WriteBlockHeader(block.BlockSignature, headerBytes); err != nil { return err } - tv, err := proto.NewTransactionValidator(s.genesis, s.accountsStorage) + balancesStor, err := NewBalancesStorage(s.accountsStorage) + if err != nil { + return err + } + tv, err := NewTransactionValidator(s.genesis, balancesStor) if err != nil { return err } transactions := block.Transactions + // Validate transactions. for i := 0; i < block.TransactionCount; i++ { n := int(binary.BigEndian.Uint32(transactions[0:4])) txBytes := transactions[4 : n+4] @@ -330,15 +455,24 @@ func (s *StateManager) addNewBlock(block *proto.Block, initialisation bool) erro if err = tv.ValidateTransaction(block.BlockSignature, tx, initialisation); err != nil { return errors.Wrap(err, "incorrect transaction inside of the block") } - if err = s.performTransaction(block, tx); err != nil { + if err = s.performTransaction(block, tx, balancesStor); err != nil { return errors.Wrap(err, "failed to perform the transaction") } } transactions = transactions[4+n:] } + // Write transactions from local balances storage into DB batch. + if err := s.addChangesToBatch(balancesStor, block.BlockSignature); err != nil { + return err + } + // Flush all buffers in BlockReadWriter. if err := s.rw.FinishBlock(block.BlockSignature); err != nil { return err } + // Write batch to DB. + if err := s.accountsStorage.FinishBlock(); err != nil { + return err + } return nil } @@ -352,7 +486,10 @@ func (s *StateManager) AcceptAndVerifyBlockBinary(data []byte, initialisation bo return errors.New("invalid block signature") } // Check parent. - height := s.rw.CurrentHeight() + height, err := s.rw.CurrentHeight() + if err != nil { + return err + } if height == 0 { if initialisation { if err := s.applyGenesis(); err != nil { @@ -386,23 +523,25 @@ func (s *StateManager) RollbackToHeight(height uint64) error { } func (s *StateManager) RollbackTo(removalEdge crypto.Signature) error { - if s.accountsStorage != nil { - // Rollback accounts storage. - for height := s.rw.CurrentHeight() - 1; height > 0; height-- { - blockID, err := s.rw.BlockIDByHeight(height) - if err != nil { - return errors.Errorf("failed to get block ID by height: %v\n", err) - } - if blockID == removalEdge { - break - } - if err := s.accountsStorage.RollbackBlock(blockID); err != nil { - return errors.Errorf("failed to rollback accounts storage: %v", err) - } + // Rollback accounts storage. + curHeight, err := s.rw.CurrentHeight() + if err != nil { + return err + } + for height := curHeight - 1; height > 0; height-- { + blockID, err := s.rw.BlockIDByHeight(height) + if err != nil { + return errors.Errorf("failed to get block ID by height: %v\n", err) + } + if blockID == removalEdge { + break + } + if err := s.accountsStorage.RollbackBlock(blockID); err != nil { + return errors.Errorf("failed to rollback accounts storage: %v", err) } } // Remove blocks from block storage. - if err := s.rw.RemoveBlocks(removalEdge); err != nil { + if err := s.rw.Rollback(removalEdge, true); err != nil { return errors.Errorf("failed to remove blocks from block storage: %v", err) } return nil @@ -412,5 +551,8 @@ func (s *StateManager) Close() error { if err := s.rw.Close(); err != nil { return err } + if err := s.db.Close(); err != nil { + return err + } return nil } diff --git a/pkg/state/state_test.go b/pkg/state/state_test.go index 82d7b7a7d..91c08f956 100644 --- a/pkg/state/state_test.go +++ b/pkg/state/state_test.go @@ -31,7 +31,7 @@ func TestBlockAcceptAndRollback(t *testing.T) { if err != nil { t.Fatalf("Failed to get local dir: %v\n", err) } - blocksPath := filepath.Join(dir, "..", "storage", "testdata", "blocks-10000") + blocksPath := filepath.Join(dir, "testdata", "blocks-10000") balancesPath0 := filepath.Join(dir, "testdata", "accounts-1000") balancesPath1 := filepath.Join(dir, "testdata", "accounts-900") balancesPath2 := filepath.Join(dir, "testdata", "accounts-30") @@ -53,7 +53,7 @@ func TestBlockAcceptAndRollback(t *testing.T) { } }() - if err := importer.ApplyFromFile(manager, blocksPath, BLOCKS_NUMBER, true); err != nil { + if err := importer.ApplyFromFile(manager, blocksPath, BLOCKS_NUMBER, 0, true); err != nil { t.Fatalf("Failed to import: %v\n", err) } if err := importer.CheckBalances(manager, balancesPath0); err != nil { diff --git a/pkg/storage/testdata/blocks-10000 b/pkg/state/testdata/blocks-10000 similarity index 100% rename from pkg/storage/testdata/blocks-10000 rename to pkg/state/testdata/blocks-10000 diff --git a/pkg/proto/transactions_validation.go b/pkg/state/transactions_validation.go similarity index 74% rename from pkg/proto/transactions_validation.go rename to pkg/state/transactions_validation.go index 3d62510ca..bc2d60fe3 100644 --- a/pkg/proto/transactions_validation.go +++ b/pkg/state/transactions_validation.go @@ -1,13 +1,14 @@ -package proto +package state import ( "github.com/pkg/errors" "github.com/wavesplatform/gowaves/pkg/crypto" + "github.com/wavesplatform/gowaves/pkg/proto" ) type AccountsState interface { // nil asset means Waves. - AccountBalance(addr Address, asset []byte) (uint64, error) + AccountBalance(key []byte) (uint64, error) } type TransactionValidator struct { @@ -19,13 +20,13 @@ func NewTransactionValidator(genesis crypto.Signature, state AccountsState) (*Tr return &TransactionValidator{genesis: genesis, state: state}, nil } -func (tv *TransactionValidator) IsSupported(tx Transaction) bool { +func (tv *TransactionValidator) IsSupported(tx proto.Transaction) bool { switch v := tx.(type) { - case *Genesis: + case *proto.Genesis: return true - case *Payment: + case *proto.Payment: return true - case *TransferV1: + case *proto.TransferV1: if v.FeeAsset.Present || v.AmountAsset.Present { // Only Waves for now. return false @@ -35,7 +36,7 @@ func (tv *TransactionValidator) IsSupported(tx Transaction) bool { return false } return true - case *TransferV2: + case *proto.TransferV2: if v.FeeAsset.Present || v.AmountAsset.Present { // Only Waves for now. return false @@ -51,9 +52,9 @@ func (tv *TransactionValidator) IsSupported(tx Transaction) bool { } } -func (tv *TransactionValidator) ValidateTransaction(blockID crypto.Signature, tx Transaction, initialisation bool) error { +func (tv *TransactionValidator) ValidateTransaction(blockID crypto.Signature, tx proto.Transaction, initialisation bool) error { switch v := tx.(type) { - case *Genesis: + case *proto.Genesis: if blockID == tv.genesis { if !initialisation { return errors.New("trying to add genesis transaction in new block") @@ -62,7 +63,7 @@ func (tv *TransactionValidator) ValidateTransaction(blockID crypto.Signature, tx } else { return errors.New("tried to add genesis transaction inside of non-genesis block") } - case *Payment: + case *proto.Payment: if !initialisation { return errors.New("trying to add payment transaction in new block") } @@ -83,11 +84,12 @@ func (tv *TransactionValidator) ValidateTransaction(blockID crypto.Signature, tx } // Verify the amount spent (amount and fee upper bound). totalAmount := v.Fee + v.Amount - senderAddr, err := NewAddressFromPublicKey(MainNetScheme, v.SenderPK) + senderAddr, err := proto.NewAddressFromPublicKey(proto.MainNetScheme, v.SenderPK) if err != nil { return errors.Wrap(err, "could not get address from public key") } - balance, err := tv.state.AccountBalance(senderAddr, nil) + senderKey := BalanceKey{Address: senderAddr} + balance, err := tv.state.AccountBalance(senderKey.Bytes()) if err != nil { return err } @@ -95,7 +97,7 @@ func (tv *TransactionValidator) ValidateTransaction(blockID crypto.Signature, tx return errors.Errorf("transaction verification failed: balance is %d, trying to spend %d", balance, totalAmount) } return nil - case *TransferV1: + case *proto.TransferV1: ok, err := v.Verify(v.SenderPK) if err != nil { return errors.Wrap(err, "failed to verify transaction signature") @@ -111,15 +113,17 @@ func (tv *TransactionValidator) ValidateTransaction(blockID crypto.Signature, tx return errors.New("negative fee in transaction") } // Verify the amount spent (amount and fee upper bound). - senderAddr, err := NewAddressFromPublicKey(MainNetScheme, v.SenderPK) + senderAddr, err := proto.NewAddressFromPublicKey(proto.MainNetScheme, v.SenderPK) if err != nil { return errors.Wrap(err, "Could not get address from public key") } - feeBalance, err := tv.state.AccountBalance(senderAddr, v.FeeAsset.ToID()) + senderFeeKey := BalanceKey{Address: senderAddr, Asset: v.FeeAsset.ToID()} + feeBalance, err := tv.state.AccountBalance(senderFeeKey.Bytes()) if err != nil { return err } - amountBalance, err := tv.state.AccountBalance(senderAddr, v.AmountAsset.ToID()) + senderAmountKey := BalanceKey{Address: senderAddr, Asset: v.AmountAsset.ToID()} + amountBalance, err := tv.state.AccountBalance(senderAmountKey.Bytes()) if err != nil { return err } @@ -130,7 +134,7 @@ func (tv *TransactionValidator) ValidateTransaction(blockID crypto.Signature, tx return errors.New("invalid transaction: not eough to pay the fee provided") } return nil - case *TransferV2: + case *proto.TransferV2: ok, err := v.Verify(v.SenderPK) if err != nil { return errors.Wrap(err, "failed to verify transaction signature") @@ -146,15 +150,17 @@ func (tv *TransactionValidator) ValidateTransaction(blockID crypto.Signature, tx return errors.New("negative fee in transaction") } // Verify the amount spent (amount and fee upper bound). - senderAddr, err := NewAddressFromPublicKey(MainNetScheme, v.SenderPK) + senderAddr, err := proto.NewAddressFromPublicKey(proto.MainNetScheme, v.SenderPK) if err != nil { return errors.Wrap(err, "could not get address from public key") } - feeBalance, err := tv.state.AccountBalance(senderAddr, v.FeeAsset.ToID()) + senderFeeKey := BalanceKey{Address: senderAddr, Asset: v.FeeAsset.ToID()} + feeBalance, err := tv.state.AccountBalance(senderFeeKey.Bytes()) if err != nil { return err } - amountBalance, err := tv.state.AccountBalance(senderAddr, v.AmountAsset.ToID()) + senderAmountKey := BalanceKey{Address: senderAddr, Asset: v.AmountAsset.ToID()} + amountBalance, err := tv.state.AccountBalance(senderAmountKey.Bytes()) if err != nil { return err } diff --git a/pkg/storage/accountsstorage.go b/pkg/storage/accountsstorage.go deleted file mode 100644 index 947b91c80..000000000 --- a/pkg/storage/accountsstorage.go +++ /dev/null @@ -1,318 +0,0 @@ -package storage - -import ( - "bufio" - "encoding/binary" - "io" - "os" - - "github.com/pkg/errors" - "github.com/wavesplatform/gowaves/pkg/crypto" - "github.com/wavesplatform/gowaves/pkg/keyvalue" - "github.com/wavesplatform/gowaves/pkg/proto" -) - -const ( - ROLLBACK_MAX_BLOCKS = 2000 - RECORD_SIZE = crypto.SignatureSize + 8 -) - -var ( - lastKey = []byte("last") // For addr2Index, asset2Index. -) - -type AccountsStorage struct { - globalStor keyvalue.IterableKeyVal // AddrIndex+AssetIndex -> [(blockID, balance), (blockID, balance), ...] - addr2Index keyvalue.IterableKeyVal - asset2Index keyvalue.IterableKeyVal - validIDs map[crypto.Signature]struct{} -} - -var Empty struct{} - -func toBlockID(bytes []byte) (crypto.Signature, error) { - var res crypto.Signature - if len(bytes) != crypto.SignatureSize { - return res, errors.New("failed to convert bytes to block ID: invalid length of bytes") - } - copy(res[:], bytes) - return res, nil -} - -func initIndexStores(addr2Index, asset2Index keyvalue.KeyValue) error { - has, err := addr2Index.Has(lastKey) - if err != nil { - return err - } - if !has { - lastBuf := make([]byte, 8) - binary.LittleEndian.PutUint64(lastBuf, 0) - if err := addr2Index.Put(lastKey, lastBuf); err != nil { - return err - } - } - has, err = asset2Index.Has(lastKey) - if err != nil { - return err - } - if !has { - lastBuf := make([]byte, 4) - binary.LittleEndian.PutUint32(lastBuf, 0) - if err := asset2Index.Put(lastKey, lastBuf); err != nil { - return err - } - } - return nil -} - -func NewAccountsStorage(globalStor, addr2Index, asset2Index keyvalue.IterableKeyVal, blockIdsFile string) (*AccountsStorage, error) { - validIDs := make(map[crypto.Signature]struct{}) - if blockIdsFile != "" { - blockIDs, err := os.Open(blockIdsFile) - if err != nil { - return nil, errors.Errorf("failed to open block IDs file: %v\n", err) - } - idBuf := make([]byte, crypto.SignatureSize) - r := bufio.NewReader(blockIDs) - // Copy block IDs to in-memory map. - for { - if n, err := io.ReadFull(r, idBuf); err != nil { - if err != io.EOF { - return nil, errors.Errorf("can not read block IDs from file: %v\n", err) - } - break - } else if n != crypto.SignatureSize { - return nil, errors.New("can not read ID of proper size from file") - } - blockID, err := toBlockID(idBuf) - if err != nil { - return nil, err - } - validIDs[blockID] = Empty - } - if err := blockIDs.Close(); err != nil { - return nil, errors.Errorf("failed to close block IDs file: %v\n", err) - } - } - if err := initIndexStores(addr2Index, asset2Index); err != nil { - return nil, errors.Errorf("failed to initialise index store: %v\n", err) - } - return &AccountsStorage{ - globalStor: globalStor, - addr2Index: addr2Index, - asset2Index: asset2Index, - validIDs: validIDs, - }, nil -} - -func (s *AccountsStorage) getKey(addr proto.Address, asset []byte) ([]byte, error) { - has, err := s.addr2Index.Has(addr[:]) - if err != nil { - return nil, err - } - addrIndex := make([]byte, 8) - if has { - addrIndex, err = s.addr2Index.Get(addr[:]) - if err != nil { - return nil, err - } - } else { - last, err := s.addr2Index.Get(lastKey) - if err != nil { - return nil, err - } - lastVal := binary.LittleEndian.Uint64(last) - binary.LittleEndian.PutUint64(addrIndex, lastVal+1) - if err := s.addr2Index.Put(lastKey, addrIndex); err != nil { - return nil, err - } - if err := s.addr2Index.Put(addr[:], addrIndex); err != nil { - return nil, err - } - } - if asset == nil { - // Waves. - return addrIndex, nil - } - has, err = s.asset2Index.Has(asset) - if err != nil { - return nil, err - } - assetIndex := make([]byte, 4) - if has { - assetIndex, err = s.asset2Index.Get(asset) - if err != nil { - return nil, err - } - } else { - last, err := s.asset2Index.Get(lastKey) - if err != nil { - return nil, err - } - lastVal := binary.LittleEndian.Uint32(last) - binary.LittleEndian.PutUint32(assetIndex, lastVal+1) - if err := s.asset2Index.Put(lastKey, assetIndex); err != nil { - return nil, err - } - if err := s.asset2Index.Put(asset, assetIndex); err != nil { - return nil, err - } - } - return append(addrIndex, assetIndex...), nil -} - -func (s *AccountsStorage) filterState(stateKey []byte, state []byte) ([]byte, error) { - for i := len(state); i >= RECORD_SIZE; i -= RECORD_SIZE { - record := state[i-RECORD_SIZE : i] - idBytes := record[len(record)-crypto.SignatureSize:] - blockID, err := toBlockID(idBytes) - if err != nil { - return nil, err - } - if _, ok := s.validIDs[blockID]; ok { - return state, nil - } else { - // Erase invalid (outdated due to rollbacks) record. - state = state[:i-RECORD_SIZE] - if err := s.globalStor.Put(stateKey, state); err != nil { - return nil, err - } - } - } - return state, nil -} - -func (s *AccountsStorage) WavesAddressesNumber() (uint64, error) { - iter, err := s.addr2Index.NewKeyIterator(nil) - if err != nil { - return 0, err - } - addressesNumber := uint64(0) - for iter.Next() { - if string(iter.Key()) != string(lastKey) { - addr, err := proto.NewAddressFromBytes(iter.Key()) - if err != nil { - return 0, err - } - balance, err := s.AccountBalance(addr, nil) - if err != nil { - return 0, err - } - if balance > 0 { - addressesNumber++ - } - } - } - iter.Release() - if err := iter.Error(); err != nil { - return 0, err - } - return addressesNumber, nil -} - -func (s *AccountsStorage) AccountBalance(addr proto.Address, asset []byte) (uint64, error) { - has, err := s.addr2Index.Has(addr[:]) - if err != nil { - return 0, errors.Errorf("failed to check if address exists: %v\n", err) - } - if !has { - // TODO: think about this scenario. - return 0, nil - } - if asset != nil { - has, err = s.asset2Index.Has(asset) - if err != nil { - return 0, errors.Errorf("failed to check if asset exists: %v\n", err) - } - if !has { - // TODO: think about this scenario. - return 0, nil - } - } - key, err := s.getKey(addr, asset) - if err != nil { - return 0, errors.Errorf("failed to get key from address and asset: %v\n", err) - } - state, err := s.globalStor.Get(key) - if err != nil { - return 0, errors.Errorf("failed to get state for given key: %v\n", err) - } - // Delete invalid records. - state, err = s.filterState(key, state) - if err != nil { - return 0, errors.Errorf("failed to filter state: %v\n", err) - } - if len(state) == 0 { - // There were no valid records, so the state is empty after filtering. - return 0, nil - } - balanceEnd := len(state) - crypto.SignatureSize - balance := binary.LittleEndian.Uint64(state[balanceEnd-8 : balanceEnd]) - return balance, nil -} - -func (s *AccountsStorage) newState(newRecord []byte, key []byte, blockID crypto.Signature) ([]byte, error) { - has, err := s.globalStor.Has(key) - if err != nil { - return nil, err - } - if !has { - // New state. - return newRecord, nil - } - // Get current state. - state, err := s.globalStor.Get(key) - if err != nil { - return nil, err - } - // Delete invalid records. - state, err = s.filterState(key, state) - if err != nil { - return nil, err - } - if len(state) < RECORD_SIZE { - // State is empty after filtering, new record is the first one. - return newRecord, nil - } - lastRecord := state[len(state)-RECORD_SIZE:] - idBytes := lastRecord[len(lastRecord)-crypto.SignatureSize:] - lastBlockID, err := toBlockID(idBytes) - if err != nil { - return nil, err - } - if lastBlockID == blockID { - // If the last record is the same block, rewrite it. - copy(state[len(state)-RECORD_SIZE:], newRecord) - } else { - // Append new record to the end. - state = append(state, newRecord...) - } - return state, nil -} - -func (s *AccountsStorage) SetAccountBalance(addr proto.Address, asset []byte, balance uint64, blockID crypto.Signature) error { - key, err := s.getKey(addr, asset) - if err != nil { - return errors.Errorf("failed to get key from address and asset: %v", err) - } - if _, ok := s.validIDs[blockID]; !ok { - s.validIDs[blockID] = Empty - } - // Prepare new record. - balanceBuf := make([]byte, 8) - binary.LittleEndian.PutUint64(balanceBuf, balance) - newRecord := append(balanceBuf, blockID[:]...) - state, err := s.newState(newRecord, key, blockID) - if err != nil { - return err - } - if err := s.globalStor.Put(key, state); err != nil { - return err - } - return nil -} - -func (s *AccountsStorage) RollbackBlock(blockID crypto.Signature) error { - delete(s.validIDs, blockID) - return nil -} diff --git a/pkg/util/util.go b/pkg/util/util.go index 96e37a8e2..50fd3d353 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -5,6 +5,16 @@ import ( "os" ) +func MinOf(vars ...uint64) uint64 { + min := vars[0] + for _, i := range vars { + if min > i { + min = i + } + } + return min +} + func CleanTemporaryDirs(dirs []string) error { for _, dir := range dirs { if err := os.RemoveAll(dir); err != nil {