Skip to content

Commit

Permalink
Accounts state & block storage (#71)
Browse files Browse the repository at this point in the history
* Add vim stuff to .gitignore

* First sketch for block storage

* Implement additional transaction verification

* Move BytesToTransaction() to proto package

* Separate transaction validation from block state

* Use Address instead of Recipient as key for accounts state

* BlockReadWriter: add NewBlock() to interface

NewBlock() indicates that next transaction is the first transaction
of new block.

* Implement blockreadwriter

* Change AccountsState interface

* Add GetBlock(height)

* Rename BlockManager to StateManager

Also:
 * Add Has() method to KeyValue interface
 * Remove unneeded TODO

* Add simple leveldb keyvalue

It implements KeyValue interface needed for blockreadwriter

* blockreadwriter: support dealing with existing files

* blockreadwriter: BlockIDByHeight() must use Rlock

* StateManager: add RollbackToHeight()

* Few fixes

* Add first tests for blockreadwriter

* BlockReadWriter: disable snappy; flush blockchainBuf in FinishBlock

Snappy requires decoding transaction by transaction, it is not possible
to decode concatentation of encoded transactions (block) at a time,
which is too consuming.

* BlockReadWriter: fix tests

* StateManager: Remove unneeded cancel func

* BlockReadWriter: add sketch for simultaneous-performance tests

* Do perform only transactions which are currently supported

* Fix Block unmarshal bug

* BlockReadWriter tests: do cleanup with defer

This way it cleans all the stuff up when tests fail as well.

* AccountsState: use asset ID as identifier, not asset itself

* Add alias condition to IsSupported()

* StateManager: Fix AddNewBlock()

* Implement and use mock accounts storage

* StateManager: check block before adding it

* StateManager: do not check parent for genesis block

* Add external utility for importing blocks

* Transactions: implement missed GetID(), add missed DataV1 to type list

* Block storage: clean IDs of removed blocks and transactions

* LevelDB keyvalue: add possibility of usage without batch

* Add missed locks

* Block storage: add deletion tests

* StateManager: add error condition if block's sig at 0 height is not genesis

* Block storage tests: use local dir for testdata

* Implement account storage

* Account storage: rewrite last record when it's the same block

* Account storage: few fixes

 * Support Waves (nil asset)
 * Support creation without Block IDs file
 * Implement forgotten addition of assets and addresses to stores.

* Add first test for account storage

* Account storage: add rollback test

* Block storage: add BlockIdsFilePath()

* Add functions to create storage objects (with temporary dirs)

Is used for testing, will be used in importer utility as well.

* Importer: use accounts storage

* Importer: separate all the functionality from cmd

This functionality will be used in tests of state package also.

* Tx types fixes

* Genesis block

* Add block height to block file's name

* Check number of Waves addresses

* State test for block acceptance

* Fix few bugs

 * StateManager: do not remove blocks from storage before rolling back accounts,
   otherwise block storage returns new height.
 * Accounts storage: do not check if ID of block to rollback is present or not,
   this is needed because some blocks may not have transactions at all.
 * Accounts storage: do not remove state when it becomes empty.
   If it is empty, then the balance is 0, but it is not absent, so
   AccountBalance() shouldn't return error.

* State: add rollback test on real blocks

* Remove extra interface TransactionExtended

* Fixes

Block marshal bug: TransactionCount was missing.
StateManager, rollback: clear blocks after rolling the state back.

* State test: check GetBlock()

* Remove unused code

* Refactoring

* StateManager: add Height(), BlockID <---> Height functions

* StateManager: check initialisation condition for zero height

* Do not use batch size

* Separate importer

* Refactoring
  • Loading branch information
zer0main authored and alexeykiselev committed Feb 12, 2019
1 parent 507431f commit 59b4d76
Show file tree
Hide file tree
Showing 24 changed files with 6,308 additions and 59 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
# Output of the go coverage tool, specifically when used with LiteIDE
*.out

# Vim
*.swo
*.swp

.idea/
vendor/
build/
Expand Down
55 changes: 55 additions & 0 deletions cmd/importer/importer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package main

import (
"flag"
"fmt"
"io/ioutil"
"log"
"os"
"time"

"github.com/wavesplatform/gowaves/pkg/importer"
"github.com/wavesplatform/gowaves/pkg/state"
)

var (
blockchainPath = flag.String("blockchain-path", "", "Path to binary blockchain file.")
balancesPath = flag.String("balances-path", "", "Path to JSON with correct balances after applying blocks.")
nBlocks = flag.Int("blocks-number", 1000, "Number of blocks to import.")
)

func main() {
flag.Parse()
if len(*blockchainPath) == 0 {
log.Fatalf("You must specify blockchain-path option.")
}
dataDir, err := ioutil.TempDir(os.TempDir(), "dataDir")
if err != nil {
log.Fatalf("Faied to create temp dir for data: %v\n", err)
}
manager, err := state.NewStateManager(dataDir, state.DefaultBlockStorageParams())
if err != nil {
log.Fatalf("Failed to create state manager: %v.\n", err)
}

defer func() {
if err := manager.Close(); err != nil {
log.Fatalf("Failed to close StateManager: %v\n", err)
}
if err := os.RemoveAll(dataDir); err != nil {
log.Fatalf("Failed to clean data dir: %v\n", err)
}
}()

start := time.Now()
if err := importer.ApplyFromFile(manager, *blockchainPath, *nBlocks, false); err != nil {
log.Fatalf("Failed to apply blocks: %v\n", err)
}
elapsed := time.Since(start)
fmt.Printf("Import took %s\n", elapsed)
if len(*balancesPath) != 0 {
if err := importer.CheckBalances(manager, *balancesPath); err != nil {
log.Fatalf("CheckBalances(): %v\n", err)
}
}
}
97 changes: 97 additions & 0 deletions pkg/importer/importer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package importer

import (
"bufio"
"bytes"
"encoding/binary"
"encoding/json"
"io"
"os"

"github.com/pkg/errors"
"github.com/wavesplatform/gowaves/pkg/proto"
)

type State interface {
AcceptAndVerifyBlockBinary(block []byte, initialisation bool) error
GetBlockByHeight(height uint64) (*proto.Block, error)
WavesAddressesNumber() (uint64, error)
AccountBalance(addr proto.Address, asset []byte) (uint64, error)
}

func ApplyFromFile(st State, blockchainPath string, nBlocks int, checkBlocks bool) error {
blockchain, err := os.Open(blockchainPath)
if err != nil {
return errors.Errorf("failed to open blockchain file: %v\n", err)
}
sb := make([]byte, 4)
buf := make([]byte, 2*1024*1024)
r := bufio.NewReader(blockchain)
for i := 0; i < nBlocks; i++ {
if _, err := io.ReadFull(r, sb); err != nil {
return err
}
size := binary.BigEndian.Uint32(sb)
block := buf[:size]
if _, err := io.ReadFull(r, block); err != nil {
return err
}
if err := st.AcceptAndVerifyBlockBinary(block, true); err != nil {
return err
}
if checkBlocks {
savedBlock, err := st.GetBlockByHeight(uint64(i))
if err != nil {
return err
}
savedBlockBytes, err := savedBlock.MarshalBinary()
if err != nil {
return err
}
if bytes.Compare(block, savedBlockBytes) != 0 {
return errors.New("accepted and returned blocks differ\n")
}
}
}
if err := blockchain.Close(); err != nil {
return errors.Errorf("failed to close blockchain file: %v\n", err)
}
return nil
}

func CheckBalances(st State, balancesPath string) error {
balances, err := os.Open(balancesPath)
if err != nil {
return errors.Errorf("failed to open balances file: %v\n", err)
}
var state map[string]uint64
jsonParser := json.NewDecoder(balances)
if err := jsonParser.Decode(&state); err != nil {
return errors.Errorf("failed to decode state: %v\n", err)
}
addressesNumber, err := st.WavesAddressesNumber()
if err != nil {
return errors.Errorf("failed to get number of waves addresses: %v\n", err)
}
properAddressesNumber := uint64(len(state))
if properAddressesNumber != addressesNumber {
return errors.Errorf("number of addresses differ: %d and %d\n", properAddressesNumber, addressesNumber)
}
for addrStr, properBalance := range state {
addr, err := proto.NewAddressFromString(addrStr)
if err != nil {
return errors.Errorf("faied to convert string to address: %v\n", err)
}
balance, err := st.AccountBalance(addr, nil)
if err != nil {
return errors.Errorf("failed to get balance: %v\n", err)
}
if balance != properBalance {
return errors.Errorf("balances for address %v differ: %d and %d\n", addr, properBalance, balance)
}
}
if err := balances.Close(); err != nil {
return errors.Errorf("failed to close balances file: %v\n", err)
}
return nil
}
22 changes: 22 additions & 0 deletions pkg/keyvalue/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package keyvalue

type KeyValue interface {
Has(key []byte) (bool, error)
Put(key, val []byte) error
Get(key []byte) ([]byte, error)
Delete(key []byte) error
Flush() error
}

type Iterator interface {
Key() []byte
Value() []byte
Next() bool
Error() error
Release()
}

type IterableKeyVal interface {
KeyValue
NewKeyIterator(prefix []byte) (Iterator, error)
}
66 changes: 66 additions & 0 deletions pkg/keyvalue/leveldb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package keyvalue

import (
"github.com/pkg/errors"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/util"
)

type KeyVal struct {
db *leveldb.DB
batch *leveldb.Batch
}

func NewKeyVal(path string, withBatch bool) (*KeyVal, error) {
db, err := leveldb.OpenFile(path, nil)
if err != nil {
return nil, err
}
var batch *leveldb.Batch
if withBatch {
batch = new(leveldb.Batch)
}
return &KeyVal{db: db, batch: batch}, nil
}

func (k *KeyVal) Get(key []byte) ([]byte, error) {
return k.db.Get(key, nil)
}

func (k *KeyVal) Has(key []byte) (bool, error) {
return k.db.Has(key, nil)
}

func (k *KeyVal) Delete(key []byte) error {
return k.db.Delete(key, nil)
}

func (k *KeyVal) Put(key, val []byte) error {
if k.batch != nil {
k.batch.Put(key, val)
} else {
if err := k.db.Put(key, val, nil); err != nil {
return err
}
}
return nil
}

func (k *KeyVal) Flush() error {
if k.batch == nil {
return errors.New("no batch to flush")
}
if err := k.db.Write(k.batch, nil); err != nil {
return err
}
k.batch.Reset()
return nil
}

func (k *KeyVal) NewKeyIterator(prefix []byte) (Iterator, error) {
if prefix != nil {
return k.db.NewIterator(util.BytesPrefix(prefix), nil), nil
} else {
return k.db.NewIterator(nil, nil), nil
}
}
71 changes: 66 additions & 5 deletions pkg/proto/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package proto
import (
"encoding/binary"

"github.com/pkg/errors"
"github.com/wavesplatform/gowaves/pkg/crypto"
)

// Block is a block of the blockchain
type Block struct {
// Block info (except transactions)
type BlockHeader struct {
Version uint8
Timestamp uint64
Parent crypto.Signature
Expand All @@ -16,13 +17,60 @@ type Block struct {
GenSignature crypto.Digest
TransactionBlockLength uint32
TransactionCount int
Transactions []byte `json:"-"`
GenPublicKey crypto.PublicKey
BlockSignature crypto.Signature

Height uint64
}

func (b *BlockHeader) MarshalHeaderToBinary() ([]byte, error) {
res := make([]byte, 1+8+64+4+8+32+4)
res[0] = b.Version
binary.BigEndian.PutUint64(res[1:9], b.Timestamp)
copy(res[9:], b.Parent[:])
binary.BigEndian.PutUint32(res[73:77], b.ConsensusBlockLength)
binary.BigEndian.PutUint64(res[77:85], b.BaseTarget)
copy(res[85:117], b.GenSignature[:])
binary.BigEndian.PutUint32(res[117:121], b.TransactionBlockLength)
if b.Version == 3 {
countBuf := make([]byte, 4)
binary.BigEndian.PutUint32(countBuf, uint32(b.TransactionCount))
res = append(res, countBuf...)
} else {
res = append(res, byte(b.TransactionCount))
}
res = append(res, b.GenPublicKey[:]...)
res = append(res, b.BlockSignature[:]...)

return res, nil
}

func (b *BlockHeader) UnmarshalHeaderFromBinary(data []byte) error {
b.Version = data[0]
b.Timestamp = binary.BigEndian.Uint64(data[1:9])
copy(b.Parent[:], data[9:73])
b.ConsensusBlockLength = binary.BigEndian.Uint32(data[73:77])
b.BaseTarget = binary.BigEndian.Uint64(data[77:85])
copy(b.GenSignature[:], data[85:117])
b.TransactionBlockLength = binary.BigEndian.Uint32(data[117:121])
if b.Version == 3 {
b.TransactionCount = int(binary.BigEndian.Uint32(data[121:125]))
} else {
b.TransactionCount = int(data[121])
}

copy(b.GenPublicKey[:], data[len(data)-64-32:len(data)-64])
copy(b.BlockSignature[:], data[len(data)-64:])

return nil
}

// Block is a block of the blockchain
type Block struct {
BlockHeader
Transactions []byte `json:"-"`
}

// MarshalBinary encodes Block to binary form
func (b *Block) MarshalBinary() ([]byte, error) {
res := make([]byte, 1+8+64+4+8+32+4)
Expand All @@ -33,6 +81,13 @@ func (b *Block) MarshalBinary() ([]byte, error) {
binary.BigEndian.PutUint64(res[77:85], b.BaseTarget)
copy(res[85:117], b.GenSignature[:])
binary.BigEndian.PutUint32(res[117:121], b.TransactionBlockLength)
if b.Version == 3 {
countBuf := make([]byte, 4)
binary.BigEndian.PutUint32(countBuf, uint32(b.TransactionCount))
res = append(res, countBuf...)
} else {
res = append(res, byte(b.TransactionCount))
}
res = append(res, b.Transactions...)
res = append(res, b.GenPublicKey[:]...)
res = append(res, b.BlockSignature[:]...)
Expand All @@ -51,12 +106,18 @@ func (b *Block) UnmarshalBinary(data []byte) error {
b.TransactionBlockLength = binary.BigEndian.Uint32(data[117:121])
if b.Version == 3 {
b.TransactionCount = int(binary.BigEndian.Uint32(data[121:125]))
transBytes := data[125 : 125+b.TransactionBlockLength]
if b.TransactionBlockLength < 4 {
return errors.New("TransactionBlockLength is too small")
}
transBytes := data[125 : 125+b.TransactionBlockLength-4]
b.Transactions = make([]byte, len(transBytes))
copy(b.Transactions, transBytes)
} else {
b.TransactionCount = int(data[121])
transBytes := data[122 : 122+b.TransactionBlockLength]
if b.TransactionBlockLength < 1 {
return errors.New("TransactionBlockLength is too small")
}
transBytes := data[122 : 122+b.TransactionBlockLength-1]
b.Transactions = make([]byte, len(transBytes))
copy(b.Transactions, transBytes)
}
Expand Down
26 changes: 25 additions & 1 deletion pkg/proto/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,31 @@ var blockTests = []blockTest{
},
}

func TestBlockMarshaling(t *testing.T) {
func TestBlockHeaderUnmarshaling(t *testing.T) {
for i, v := range blockTests {
t.Run(fmt.Sprintf("%v", i), func(t *testing.T) {
decoded, err := hex.DecodeString(v.hexEncoded)
if err != nil {
t.Fatal(err)
}
var b BlockHeader
if err = b.UnmarshalHeaderFromBinary(decoded); err != nil {
t.Fatal(err)
}

bytes, err := json.Marshal(b)
if err != nil {
t.Fatal(err)
}
str := string(bytes)
if str != v.jsonEncoded {
t.Error("unmarshaled to wrong json document:\nhave: ", str, "\nwant: ", v.jsonEncoded)
}
})
}
}

func TestBlockUnmarshaling(t *testing.T) {
for i, v := range blockTests {
t.Run(fmt.Sprintf("%v", i), func(t *testing.T) {
decoded, err := hex.DecodeString(v.hexEncoded)
Expand Down
Loading

0 comments on commit 59b4d76

Please sign in to comment.