Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixed factory for unsigned transactions #285

Merged
merged 4 commits into from
Jul 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 21 additions & 6 deletions dataRetriever/factory/shard/resolversContainerFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func NewResolversContainerFactory(
func (rcf *resolversContainerFactory) Create() (dataRetriever.ResolversContainer, error) {
container := containers.NewResolversContainer()

keys, resolverSlice, err := rcf.generateTxResolvers(factory.TransactionTopic, dataRetriever.TransactionUnit)
keys, resolverSlice, err := rcf.generateTxResolvers(factory.TransactionTopic, dataRetriever.TransactionUnit, rcf.dataPools.Transactions())
if err != nil {
return nil, err
}
Expand All @@ -83,7 +83,11 @@ func (rcf *resolversContainerFactory) Create() (dataRetriever.ResolversContainer
return nil, err
}

keys, resolverSlice, err = rcf.generateTxResolvers(factory.UnsignedTransactionTopic, dataRetriever.UnsignedTransactionUnit)
keys, resolverSlice, err = rcf.generateTxResolvers(
factory.UnsignedTransactionTopic,
dataRetriever.UnsignedTransactionUnit,
rcf.dataPools.UnsignedTransactions(),
)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -156,7 +160,12 @@ func (rcf *resolversContainerFactory) createTopicAndAssignHandler(

//------- Tx resolvers

func (rcf *resolversContainerFactory) generateTxResolvers(topic string, unit dataRetriever.UnitType) ([]string, []dataRetriever.Resolver, error) {
func (rcf *resolversContainerFactory) generateTxResolvers(
topic string,
unit dataRetriever.UnitType,
dataPool dataRetriever.ShardedDataCacherNotifier,
) ([]string, []dataRetriever.Resolver, error) {

shardC := rcf.shardCoordinator

noOfShards := shardC.NumberOfShards()
Expand All @@ -168,7 +177,7 @@ func (rcf *resolversContainerFactory) generateTxResolvers(topic string, unit dat
identifierTx := topic + shardC.CommunicationIdentifier(idx)
excludePeersFromTopic := topic + shardC.CommunicationIdentifier(shardC.SelfId())

resolver, err := rcf.createTxResolver(identifierTx, excludePeersFromTopic, unit)
resolver, err := rcf.createTxResolver(identifierTx, excludePeersFromTopic, unit, dataPool)
if err != nil {
return nil, nil, err
}
Expand All @@ -180,7 +189,13 @@ func (rcf *resolversContainerFactory) generateTxResolvers(topic string, unit dat
return keys, resolverSlice, nil
}

func (rcf *resolversContainerFactory) createTxResolver(topic string, excludedTopic string, unit dataRetriever.UnitType) (dataRetriever.Resolver, error) {
func (rcf *resolversContainerFactory) createTxResolver(
topic string,
excludedTopic string,
unit dataRetriever.UnitType,
dataPool dataRetriever.ShardedDataCacherNotifier,
) (dataRetriever.Resolver, error) {

txStorer := rcf.store.GetStorer(unit)

//TODO instantiate topic sender resolver with the shard IDs for which this resolver is supposed to serve the data
Expand All @@ -199,7 +214,7 @@ func (rcf *resolversContainerFactory) createTxResolver(topic string, excludedTop

resolver, err := resolvers.NewTxResolver(
resolverSender,
rcf.dataPools.Transactions(),
dataPool,
txStorer,
rcf.marshalizer,
rcf.dataPacker,
Expand Down
53 changes: 3 additions & 50 deletions integrationTests/singleShard/transaction/interceptedBulkTx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"
"time"

"github.com/ElrondNetwork/elrond-go/data"
"github.com/ElrondNetwork/elrond-go/data/state"
"github.com/ElrondNetwork/elrond-go/data/state/addressConverters"
"github.com/ElrondNetwork/elrond-go/data/transaction"
Expand Down Expand Up @@ -65,7 +66,7 @@ func TestNode_GenerateSendInterceptBulkTransactionsWithMessenger(t *testing.T) {

mut := sync.Mutex{}
txHashes := make([][]byte, 0)
transactions := make([]*transaction.Transaction, 0)
transactions := make([]data.TransactionHandler, 0)

//wire up handler
dPool.Transactions().RegisterHandler(func(key []byte) {
Expand Down Expand Up @@ -99,53 +100,5 @@ func TestNode_GenerateSendInterceptBulkTransactionsWithMessenger(t *testing.T) {
return
}

if noOfTx != len(txHashes) {

for i := startingNonce; i < startingNonce+uint64(noOfTx); i++ {
found := false

for _, tx := range transactions {
if tx.Nonce == i {
found = true
break
}
}

if !found {
fmt.Printf("tx with nonce %d is missing\n", i)
}

}

assert.Fail(t, fmt.Sprintf("should have been %d, got %d", noOfTx, len(txHashes)))

return
}

bitmap := make([]bool, noOfTx+int(startingNonce))
//set for each nonce from found tx a true flag in bitmap
for i := 0; i < noOfTx; i++ {
val, _ := dPool.Transactions().ShardDataStore(
process.ShardCacherIdentifier(shardCoordinator.SelfId(), shardCoordinator.SelfId()),
).Get(txHashes[i])

if val == nil {
continue
}

tx := val.(*transaction.Transaction)

bitmap[tx.Nonce] = true
}

//for the first startingNonce values, the bitmap should be false
//for the rest, true
for i := 0; i < noOfTx+int(startingNonce); i++ {
if i < int(startingNonce) {
assert.False(t, bitmap[i])
continue
}

assert.True(t, bitmap[i])
}
checkResults(t, startingNonce, noOfTx, txHashes, transactions, dPool.Transactions(), shardCoordinator)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package transaction

import (
"crypto/rand"
"encoding/base64"
"encoding/binary"
"encoding/hex"
"fmt"
"math/big"
"sync"
"testing"
"time"

"github.com/ElrondNetwork/elrond-go/core"
"github.com/ElrondNetwork/elrond-go/core/partitioning"
"github.com/ElrondNetwork/elrond-go/data"
"github.com/ElrondNetwork/elrond-go/data/smartContractResult"
"github.com/ElrondNetwork/elrond-go/data/state"
"github.com/ElrondNetwork/elrond-go/data/state/addressConverters"
"github.com/ElrondNetwork/elrond-go/marshal"
"github.com/ElrondNetwork/elrond-go/p2p"
"github.com/ElrondNetwork/elrond-go/process"
"github.com/ElrondNetwork/elrond-go/process/factory"
"github.com/ElrondNetwork/elrond-go/sharding"
"github.com/stretchr/testify/assert"
)

func TestNode_GenerateSendInterceptBulkUnsignedTransactionsWithMessenger(t *testing.T) {
if testing.Short() {
t.Skip("this is not a short test")
}

dPool := createTestDataPool()
startingNonce := uint64(6)
addrConverter, _ := addressConverters.NewPlainAddressConverter(32, "0x")
accntAdapter := createAccountsDB()
shardCoordinator := &sharding.OneShardCoordinator{}
n, mes, sk, _ := createNetNode(dPool, accntAdapter, shardCoordinator)

_ = n.Start()
defer func() {
_ = n.Stop()
}()

_ = n.P2PBootstrap()

time.Sleep(time.Second)

//set the account's nonce to startingNonce
nodePubKeyBytes, _ := sk.GeneratePublic().ToByteArray()
nodeAddress, _ := addrConverter.CreateAddressFromPublicKeyBytes(nodePubKeyBytes)
nodeAccount, _ := accntAdapter.GetAccountWithJournal(nodeAddress)
_ = nodeAccount.(*state.Account).SetNonceWithJournal(startingNonce)
_, _ = accntAdapter.Commit()

noOfUnsignedTx := 8000

time.Sleep(time.Second)

wg := sync.WaitGroup{}
wg.Add(noOfUnsignedTx)

chanDone := make(chan bool)

go func() {
wg.Wait()
chanDone <- true
}()

mut := sync.Mutex{}
unsignedtxHashes := make([][]byte, 0)
unsignedTransactions := make([]data.TransactionHandler, 0)

//wire up handler
dPool.UnsignedTransactions().RegisterHandler(func(key []byte) {
mut.Lock()
defer mut.Unlock()

unsignedtxHashes = append(unsignedtxHashes, key)

dataStore := dPool.UnsignedTransactions().ShardDataStore(
process.ShardCacherIdentifier(shardCoordinator.SelfId(), shardCoordinator.SelfId()),
)
val, _ := dataStore.Get(key)
if val == nil {
assert.Fail(t, fmt.Sprintf("key %s not in store?", base64.StdEncoding.EncodeToString(key)))
return
}

unsignedTransactions = append(unsignedTransactions, val.(*smartContractResult.SmartContractResult))
wg.Done()
})

err := generateAndSendBulkSmartContractResults(
startingNonce,
noOfUnsignedTx,
testMarshalizer,
shardCoordinator,
mes,
)

assert.Nil(t, err)

select {
case <-chanDone:
case <-time.After(time.Second * 60):
assert.Fail(t, "timeout")
return
}

checkResults(t, startingNonce, noOfUnsignedTx, unsignedtxHashes, unsignedTransactions, dPool.UnsignedTransactions(), shardCoordinator)
}

func generateAndSendBulkSmartContractResults(
startingNonce uint64,
noOfUnsignedTx int,
marshalizer marshal.Marshalizer,
shardCoordinator sharding.Coordinator,
messenger p2p.Messenger,
) error {

dataPacker, err := partitioning.NewSizeDataPacker(marshalizer)
if err != nil {
return err
}

sender := make([]byte, 32)
_, _ = rand.Reader.Read(sender)

dest := make([]byte, 32)
_, _ = rand.Reader.Read(dest)

unsigedTxs := make([][]byte, 0)
for nonce := startingNonce; nonce < startingNonce+uint64(noOfUnsignedTx); nonce++ {
uTx := &smartContractResult.SmartContractResult{
Nonce: nonce,
TxHash: []byte("tx hash"),
SndAddr: sender,
RcvAddr: dest,
Value: big.NewInt(0),
}
buff := make([]byte, 8)
binary.BigEndian.PutUint64(buff, nonce)
uTx.Data = hex.EncodeToString(buff)

uTxBytes, _ := marshalizer.Marshal(uTx)
unsigedTxs = append(unsigedTxs, uTxBytes)
}

//the topic identifier is made of the current shard id and sender's shard id
identifier := factory.UnsignedTransactionTopic + shardCoordinator.CommunicationIdentifier(shardCoordinator.SelfId())

packets, err := dataPacker.PackDataInChunks(unsigedTxs, core.MaxBulkTransactionSize)
if err != nil {
return err
}

for _, buff := range packets {
go func(bufferToSend []byte) {
messenger.BroadcastOnChannelBlocking(
identifier,
identifier,
bufferToSend,
)
}(buff)
}

return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (

"github.com/ElrondNetwork/elrond-go/crypto/signing/kyber/singlesig"
"github.com/ElrondNetwork/elrond-go/data/transaction"
"github.com/ElrondNetwork/elrond-go/hashing/sha256"
"github.com/ElrondNetwork/elrond-go/marshal"
"github.com/ElrondNetwork/elrond-go/process"
"github.com/ElrondNetwork/elrond-go/process/factory"
"github.com/ElrondNetwork/elrond-go/sharding"
Expand All @@ -22,9 +20,6 @@ func TestNode_RequestInterceptTransactionWithMessenger(t *testing.T) {
t.Skip("this is not a short test")
}

hasher := sha256.Sha256{}
marshalizer := &marshal.JsonMarshalizer{}

dPoolRequester := createTestDataPool()
dPoolResolver := createTestDataPool()

Expand Down Expand Up @@ -62,23 +57,23 @@ func TestNode_RequestInterceptTransactionWithMessenger(t *testing.T) {
tx := transaction.Transaction{
Nonce: 0,
Value: big.NewInt(0),
RcvAddr: hasher.Compute("receiver"),
RcvAddr: testHasher.Compute("receiver"),
SndAddr: buffPk1,
Data: "tx notarized data",
}

txBuff, _ := marshalizer.Marshal(&tx)
txBuff, _ := testMarshalizer.Marshal(&tx)
signer := &singlesig.SchnorrSigner{}

tx.Signature, _ = signer.Sign(sk1, txBuff)

signedTxBuff, _ := marshalizer.Marshal(&tx)
signedTxBuff, _ := testMarshalizer.Marshal(&tx)

fmt.Printf("Transaction: %v\n%v\n", tx, string(signedTxBuff))

chanDone := make(chan bool)

txHash := hasher.Compute(string(signedTxBuff))
txHash := testHasher.Compute(string(signedTxBuff))

//step 2. wire up a received handler for requester
dPoolRequester.Transactions().RegisterHandler(func(key []byte) {
Expand Down
Loading