Skip to content

Commit

Permalink
Merge pull request #285 from ElrondNetwork/bugs/fixed-resolvers-facto…
Browse files Browse the repository at this point in the history
…ry-utx

fixed factory for unsigned transactions
  • Loading branch information
iulianpascalau authored Jul 18, 2019
2 parents 16c57aa + 8fa0cbf commit 766d5a1
Show file tree
Hide file tree
Showing 13 changed files with 561 additions and 197 deletions.
27 changes: 21 additions & 6 deletions dataRetriever/factory/shard/resolversContainerFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func NewResolversContainerFactory(
func (rcf *resolversContainerFactory) Create() (dataRetriever.ResolversContainer, error) {
container := containers.NewResolversContainer()

keys, resolverSlice, err := rcf.generateTxResolvers(factory.TransactionTopic, dataRetriever.TransactionUnit)
keys, resolverSlice, err := rcf.generateTxResolvers(factory.TransactionTopic, dataRetriever.TransactionUnit, rcf.dataPools.Transactions())
if err != nil {
return nil, err
}
Expand All @@ -83,7 +83,11 @@ func (rcf *resolversContainerFactory) Create() (dataRetriever.ResolversContainer
return nil, err
}

keys, resolverSlice, err = rcf.generateTxResolvers(factory.UnsignedTransactionTopic, dataRetriever.UnsignedTransactionUnit)
keys, resolverSlice, err = rcf.generateTxResolvers(
factory.UnsignedTransactionTopic,
dataRetriever.UnsignedTransactionUnit,
rcf.dataPools.UnsignedTransactions(),
)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -156,7 +160,12 @@ func (rcf *resolversContainerFactory) createTopicAndAssignHandler(

//------- Tx resolvers

func (rcf *resolversContainerFactory) generateTxResolvers(topic string, unit dataRetriever.UnitType) ([]string, []dataRetriever.Resolver, error) {
func (rcf *resolversContainerFactory) generateTxResolvers(
topic string,
unit dataRetriever.UnitType,
dataPool dataRetriever.ShardedDataCacherNotifier,
) ([]string, []dataRetriever.Resolver, error) {

shardC := rcf.shardCoordinator

noOfShards := shardC.NumberOfShards()
Expand All @@ -168,7 +177,7 @@ func (rcf *resolversContainerFactory) generateTxResolvers(topic string, unit dat
identifierTx := topic + shardC.CommunicationIdentifier(idx)
excludePeersFromTopic := topic + shardC.CommunicationIdentifier(shardC.SelfId())

resolver, err := rcf.createTxResolver(identifierTx, excludePeersFromTopic, unit)
resolver, err := rcf.createTxResolver(identifierTx, excludePeersFromTopic, unit, dataPool)
if err != nil {
return nil, nil, err
}
Expand All @@ -180,7 +189,13 @@ func (rcf *resolversContainerFactory) generateTxResolvers(topic string, unit dat
return keys, resolverSlice, nil
}

func (rcf *resolversContainerFactory) createTxResolver(topic string, excludedTopic string, unit dataRetriever.UnitType) (dataRetriever.Resolver, error) {
func (rcf *resolversContainerFactory) createTxResolver(
topic string,
excludedTopic string,
unit dataRetriever.UnitType,
dataPool dataRetriever.ShardedDataCacherNotifier,
) (dataRetriever.Resolver, error) {

txStorer := rcf.store.GetStorer(unit)

//TODO instantiate topic sender resolver with the shard IDs for which this resolver is supposed to serve the data
Expand All @@ -199,7 +214,7 @@ func (rcf *resolversContainerFactory) createTxResolver(topic string, excludedTop

resolver, err := resolvers.NewTxResolver(
resolverSender,
rcf.dataPools.Transactions(),
dataPool,
txStorer,
rcf.marshalizer,
rcf.dataPacker,
Expand Down
53 changes: 3 additions & 50 deletions integrationTests/singleShard/transaction/interceptedBulkTx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"
"time"

"github.com/ElrondNetwork/elrond-go/data"
"github.com/ElrondNetwork/elrond-go/data/state"
"github.com/ElrondNetwork/elrond-go/data/state/addressConverters"
"github.com/ElrondNetwork/elrond-go/data/transaction"
Expand Down Expand Up @@ -65,7 +66,7 @@ func TestNode_GenerateSendInterceptBulkTransactionsWithMessenger(t *testing.T) {

mut := sync.Mutex{}
txHashes := make([][]byte, 0)
transactions := make([]*transaction.Transaction, 0)
transactions := make([]data.TransactionHandler, 0)

//wire up handler
dPool.Transactions().RegisterHandler(func(key []byte) {
Expand Down Expand Up @@ -99,53 +100,5 @@ func TestNode_GenerateSendInterceptBulkTransactionsWithMessenger(t *testing.T) {
return
}

if noOfTx != len(txHashes) {

for i := startingNonce; i < startingNonce+uint64(noOfTx); i++ {
found := false

for _, tx := range transactions {
if tx.Nonce == i {
found = true
break
}
}

if !found {
fmt.Printf("tx with nonce %d is missing\n", i)
}

}

assert.Fail(t, fmt.Sprintf("should have been %d, got %d", noOfTx, len(txHashes)))

return
}

bitmap := make([]bool, noOfTx+int(startingNonce))
//set for each nonce from found tx a true flag in bitmap
for i := 0; i < noOfTx; i++ {
val, _ := dPool.Transactions().ShardDataStore(
process.ShardCacherIdentifier(shardCoordinator.SelfId(), shardCoordinator.SelfId()),
).Get(txHashes[i])

if val == nil {
continue
}

tx := val.(*transaction.Transaction)

bitmap[tx.Nonce] = true
}

//for the first startingNonce values, the bitmap should be false
//for the rest, true
for i := 0; i < noOfTx+int(startingNonce); i++ {
if i < int(startingNonce) {
assert.False(t, bitmap[i])
continue
}

assert.True(t, bitmap[i])
}
checkResults(t, startingNonce, noOfTx, txHashes, transactions, dPool.Transactions(), shardCoordinator)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package transaction

import (
"crypto/rand"
"encoding/base64"
"encoding/binary"
"encoding/hex"
"fmt"
"math/big"
"sync"
"testing"
"time"

"github.com/ElrondNetwork/elrond-go/core"
"github.com/ElrondNetwork/elrond-go/core/partitioning"
"github.com/ElrondNetwork/elrond-go/data"
"github.com/ElrondNetwork/elrond-go/data/smartContractResult"
"github.com/ElrondNetwork/elrond-go/data/state"
"github.com/ElrondNetwork/elrond-go/data/state/addressConverters"
"github.com/ElrondNetwork/elrond-go/marshal"
"github.com/ElrondNetwork/elrond-go/p2p"
"github.com/ElrondNetwork/elrond-go/process"
"github.com/ElrondNetwork/elrond-go/process/factory"
"github.com/ElrondNetwork/elrond-go/sharding"
"github.com/stretchr/testify/assert"
)

func TestNode_GenerateSendInterceptBulkUnsignedTransactionsWithMessenger(t *testing.T) {
if testing.Short() {
t.Skip("this is not a short test")
}

dPool := createTestDataPool()
startingNonce := uint64(6)
addrConverter, _ := addressConverters.NewPlainAddressConverter(32, "0x")
accntAdapter := createAccountsDB()
shardCoordinator := &sharding.OneShardCoordinator{}
n, mes, sk, _ := createNetNode(dPool, accntAdapter, shardCoordinator)

_ = n.Start()
defer func() {
_ = n.Stop()
}()

_ = n.P2PBootstrap()

time.Sleep(time.Second)

//set the account's nonce to startingNonce
nodePubKeyBytes, _ := sk.GeneratePublic().ToByteArray()
nodeAddress, _ := addrConverter.CreateAddressFromPublicKeyBytes(nodePubKeyBytes)
nodeAccount, _ := accntAdapter.GetAccountWithJournal(nodeAddress)
_ = nodeAccount.(*state.Account).SetNonceWithJournal(startingNonce)
_, _ = accntAdapter.Commit()

noOfUnsignedTx := 8000

time.Sleep(time.Second)

wg := sync.WaitGroup{}
wg.Add(noOfUnsignedTx)

chanDone := make(chan bool)

go func() {
wg.Wait()
chanDone <- true
}()

mut := sync.Mutex{}
unsignedtxHashes := make([][]byte, 0)
unsignedTransactions := make([]data.TransactionHandler, 0)

//wire up handler
dPool.UnsignedTransactions().RegisterHandler(func(key []byte) {
mut.Lock()
defer mut.Unlock()

unsignedtxHashes = append(unsignedtxHashes, key)

dataStore := dPool.UnsignedTransactions().ShardDataStore(
process.ShardCacherIdentifier(shardCoordinator.SelfId(), shardCoordinator.SelfId()),
)
val, _ := dataStore.Get(key)
if val == nil {
assert.Fail(t, fmt.Sprintf("key %s not in store?", base64.StdEncoding.EncodeToString(key)))
return
}

unsignedTransactions = append(unsignedTransactions, val.(*smartContractResult.SmartContractResult))
wg.Done()
})

err := generateAndSendBulkSmartContractResults(
startingNonce,
noOfUnsignedTx,
testMarshalizer,
shardCoordinator,
mes,
)

assert.Nil(t, err)

select {
case <-chanDone:
case <-time.After(time.Second * 60):
assert.Fail(t, "timeout")
return
}

checkResults(t, startingNonce, noOfUnsignedTx, unsignedtxHashes, unsignedTransactions, dPool.UnsignedTransactions(), shardCoordinator)
}

func generateAndSendBulkSmartContractResults(
startingNonce uint64,
noOfUnsignedTx int,
marshalizer marshal.Marshalizer,
shardCoordinator sharding.Coordinator,
messenger p2p.Messenger,
) error {

dataPacker, err := partitioning.NewSizeDataPacker(marshalizer)
if err != nil {
return err
}

sender := make([]byte, 32)
_, _ = rand.Reader.Read(sender)

dest := make([]byte, 32)
_, _ = rand.Reader.Read(dest)

unsigedTxs := make([][]byte, 0)
for nonce := startingNonce; nonce < startingNonce+uint64(noOfUnsignedTx); nonce++ {
uTx := &smartContractResult.SmartContractResult{
Nonce: nonce,
TxHash: []byte("tx hash"),
SndAddr: sender,
RcvAddr: dest,
Value: big.NewInt(0),
}
buff := make([]byte, 8)
binary.BigEndian.PutUint64(buff, nonce)
uTx.Data = hex.EncodeToString(buff)

uTxBytes, _ := marshalizer.Marshal(uTx)
unsigedTxs = append(unsigedTxs, uTxBytes)
}

//the topic identifier is made of the current shard id and sender's shard id
identifier := factory.UnsignedTransactionTopic + shardCoordinator.CommunicationIdentifier(shardCoordinator.SelfId())

packets, err := dataPacker.PackDataInChunks(unsigedTxs, core.MaxBulkTransactionSize)
if err != nil {
return err
}

for _, buff := range packets {
go func(bufferToSend []byte) {
messenger.BroadcastOnChannelBlocking(
identifier,
identifier,
bufferToSend,
)
}(buff)
}

return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (

"github.com/ElrondNetwork/elrond-go/crypto/signing/kyber/singlesig"
"github.com/ElrondNetwork/elrond-go/data/transaction"
"github.com/ElrondNetwork/elrond-go/hashing/sha256"
"github.com/ElrondNetwork/elrond-go/marshal"
"github.com/ElrondNetwork/elrond-go/process"
"github.com/ElrondNetwork/elrond-go/process/factory"
"github.com/ElrondNetwork/elrond-go/sharding"
Expand All @@ -22,9 +20,6 @@ func TestNode_RequestInterceptTransactionWithMessenger(t *testing.T) {
t.Skip("this is not a short test")
}

hasher := sha256.Sha256{}
marshalizer := &marshal.JsonMarshalizer{}

dPoolRequester := createTestDataPool()
dPoolResolver := createTestDataPool()

Expand Down Expand Up @@ -62,23 +57,23 @@ func TestNode_RequestInterceptTransactionWithMessenger(t *testing.T) {
tx := transaction.Transaction{
Nonce: 0,
Value: big.NewInt(0),
RcvAddr: hasher.Compute("receiver"),
RcvAddr: testHasher.Compute("receiver"),
SndAddr: buffPk1,
Data: "tx notarized data",
}

txBuff, _ := marshalizer.Marshal(&tx)
txBuff, _ := testMarshalizer.Marshal(&tx)
signer := &singlesig.SchnorrSigner{}

tx.Signature, _ = signer.Sign(sk1, txBuff)

signedTxBuff, _ := marshalizer.Marshal(&tx)
signedTxBuff, _ := testMarshalizer.Marshal(&tx)

fmt.Printf("Transaction: %v\n%v\n", tx, string(signedTxBuff))

chanDone := make(chan bool)

txHash := hasher.Compute(string(signedTxBuff))
txHash := testHasher.Compute(string(signedTxBuff))

//step 2. wire up a received handler for requester
dPoolRequester.Transactions().RegisterHandler(func(key []byte) {
Expand Down
Loading

0 comments on commit 766d5a1

Please sign in to comment.