Skip to content

Commit

Permalink
Simple Way to Parallel Verification Transaction (neo-project#1298)
Browse files Browse the repository at this point in the history
  • Loading branch information
eryeer authored and Luchuan committed Jan 10, 2020
1 parent 5cbcb5e commit 7028bbc
Show file tree
Hide file tree
Showing 9 changed files with 97 additions and 69 deletions.
2 changes: 1 addition & 1 deletion src/neo/Consensus/ConsensusService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ internal ConsensusService(IActorRef localNode, IActorRef taskManager, ConsensusC

private bool AddTransaction(Transaction tx, bool verify)
{
if (verify && !tx.Verify(context.Snapshot, context.SendersFeeMonitor.GetSenderFee(tx.Sender)))
if (verify && tx.Verify(context.Snapshot, context.SendersFeeMonitor.GetSenderFee(tx.Sender)) != RelayResultReason.Succeed)
{
Log($"Invalid transaction: {tx.Hash}{Environment.NewLine}{tx.ToArray().ToHexString()}", LogLevel.Warning);
RequestChangeView(ChangeViewReason.TxInvalid);
Expand Down
61 changes: 45 additions & 16 deletions src/neo/Ledger/Blockchain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Neo.Ledger
{
Expand All @@ -26,6 +27,7 @@ public class Import { public IEnumerable<Block> Blocks; }
public class ImportCompleted { }
public class FillMemoryPool { public IEnumerable<Transaction> Transactions; }
public class FillCompleted { }
private class ParallelVerified { public Transaction Transaction; public bool ShouldRelay; public RelayResultReason VerifyResult; }

public static readonly uint MillisecondsPerBlock = ProtocolSettings.Default.MillisecondsPerBlock;
public const uint DecrementInterval = 2000000;
Expand Down Expand Up @@ -276,7 +278,7 @@ private void OnFillMemoryPool(IEnumerable<Transaction> transactions)
// First remove the tx if it is unverified in the pool.
MemPool.TryRemoveUnVerified(tx.Hash, out _);
// Verify the the transaction
if (!tx.Verify(currentSnapshot, MemPool.SendersFeeMonitor.GetSenderFee(tx.Sender)))
if (tx.Verify(currentSnapshot, MemPool.SendersFeeMonitor.GetSenderFee(tx.Sender)) != RelayResultReason.Succeed)
continue;
// Add to the memory pool
MemPool.TryAdd(tx.Hash, tx);
Expand Down Expand Up @@ -395,22 +397,46 @@ private void OnNewHeaders(Header[] headers)
system.TaskManager.Tell(new TaskManager.HeaderTaskCompleted(), Sender);
}

private RelayResultReason OnNewTransaction(Transaction transaction, bool relay)
private void OnNewTransaction(Transaction transaction, bool relay)
{
RelayResultReason reason;
if (ContainsTransaction(transaction.Hash))
return RelayResultReason.AlreadyExists;
if (!MemPool.CanTransactionFitInPool(transaction))
return RelayResultReason.OutOfMemory;
if (!transaction.Verify(currentSnapshot, MemPool.SendersFeeMonitor.GetSenderFee(transaction.Sender)))
return RelayResultReason.Invalid;
if (!NativeContract.Policy.CheckPolicy(transaction, currentSnapshot))
return RelayResultReason.PolicyFail;

if (!MemPool.TryAdd(transaction.Hash, transaction))
return RelayResultReason.OutOfMemory;
if (relay)
system.LocalNode.Tell(new LocalNode.RelayDirectly { Inventory = transaction });
return RelayResultReason.Succeed;
reason = RelayResultReason.AlreadyExists;
else if (!MemPool.CanTransactionFitInPool(transaction))
reason = RelayResultReason.OutOfMemory;
else
reason = transaction.VerifyForEachBlock(currentSnapshot, MemPool.SendersFeeMonitor.GetSenderFee(transaction.Sender));
if (reason == RelayResultReason.Succeed)
{
Task.Run(() =>
{
return new ParallelVerified
{
Transaction = transaction,
ShouldRelay = relay,
VerifyResult = transaction.VerifyParallelParts(currentSnapshot)
};
}).PipeTo(Self, Sender);
}
else
{
Sender.Tell(reason);
}
}

private void OnParallelVerified(ParallelVerified parallelVerified)
{
RelayResultReason reason = parallelVerified.VerifyResult;
if (reason == RelayResultReason.Succeed)
{
if (!MemPool.CanTransactionFitInPool(parallelVerified.Transaction))
reason = RelayResultReason.OutOfMemory;
else if (!MemPool.TryAdd(parallelVerified.Transaction.Hash, parallelVerified.Transaction))
reason = RelayResultReason.OutOfMemory;
else if (parallelVerified.ShouldRelay)
system.LocalNode.Tell(new LocalNode.RelayDirectly { Inventory = parallelVerified.Transaction });
}
Sender.Tell(reason);
}

private void OnPersistCompleted(Block block)
Expand Down Expand Up @@ -443,7 +469,10 @@ protected override void OnReceive(object message)
break;
}
case Transaction transaction:
Sender.Tell(OnNewTransaction(transaction, true));
OnNewTransaction(transaction, true);
break;
case ParallelVerified parallelVerified:
OnParallelVerified(parallelVerified);
break;
case ConsensusPayload payload:
Sender.Tell(OnNewConsensus(payload));
Expand Down
2 changes: 1 addition & 1 deletion src/neo/Ledger/MemoryPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ private int ReverifyTransactions(SortedSet<PoolItem> verifiedSortedTxPool,
// Since unverifiedSortedTxPool is ordered in an ascending manner, we take from the end.
foreach (PoolItem item in unverifiedSortedTxPool.Reverse().Take(count))
{
if (item.Tx.Reverify(snapshot, SendersFeeMonitor.GetSenderFee(item.Tx.Sender)))
if (item.Tx.VerifyForEachBlock(snapshot, SendersFeeMonitor.GetSenderFee(item.Tx.Sender)) == RelayResultReason.Succeed)
{
reverifiedItems.Add(item);
SendersFeeMonitor.AddSenderFee(item.Tx);
Expand Down
2 changes: 2 additions & 0 deletions src/neo/Ledger/RelayResultReason.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ public enum RelayResultReason : byte
OutOfMemory,
UnableToVerify,
Invalid,
Expired,
InsufficientFunds,
PolicyFail,
Unknown
}
Expand Down
58 changes: 33 additions & 25 deletions src/neo/Network/P2P/Payloads/Transaction.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using Neo.Cryptography;
using Neo.IO;
using Neo.IO.Json;
using Neo.Ledger;
using Neo.Persistence;
using Neo.SmartContract;
using Neo.SmartContract.Native;
Expand Down Expand Up @@ -205,25 +206,6 @@ public UInt160[] GetScriptHashesForVerifying(StoreView snapshot)
return hashes.OrderBy(p => p).ToArray();
}

public virtual bool Reverify(StoreView snapshot, BigInteger totalSenderFeeFromPool)
{
if (ValidUntilBlock <= snapshot.Height || ValidUntilBlock > snapshot.Height + MaxValidUntilBlockIncrement)
return false;
if (NativeContract.Policy.GetBlockedAccounts(snapshot).Intersect(GetScriptHashesForVerifying(snapshot)).Count() > 0)
return false;
BigInteger balance = NativeContract.GAS.BalanceOf(snapshot, Sender);
BigInteger fee = SystemFee + NetworkFee + totalSenderFeeFromPool;
if (balance < fee) return false;
UInt160[] hashes = GetScriptHashesForVerifying(snapshot);
if (hashes.Length != Witnesses.Length) return false;
for (int i = 0; i < hashes.Length; i++)
{
if (Witnesses[i].VerificationScript.Length > 0) continue;
if (snapshot.Contracts.TryGet(hashes[i]) is null) return false;
}
return true;
}

void ISerializable.Serialize(BinaryWriter writer)
{
((IVerifiable)this).SerializeUnsigned(writer);
Expand Down Expand Up @@ -279,17 +261,43 @@ public static Transaction FromJson(JObject json)

bool IInventory.Verify(StoreView snapshot)
{
return Verify(snapshot, BigInteger.Zero);
return Verify(snapshot, BigInteger.Zero) == RelayResultReason.Succeed;
}

public virtual RelayResultReason Verify(StoreView snapshot, BigInteger totalSenderFeeFromPool)
{
RelayResultReason result = VerifyForEachBlock(snapshot, totalSenderFeeFromPool);
if (result != RelayResultReason.Succeed) return result;
return VerifyParallelParts(snapshot);
}

public virtual RelayResultReason VerifyForEachBlock(StoreView snapshot, BigInteger totalSenderFeeFromPool)
{
if (ValidUntilBlock <= snapshot.Height || ValidUntilBlock > snapshot.Height + MaxValidUntilBlockIncrement)
return RelayResultReason.Expired;
UInt160[] hashes = GetScriptHashesForVerifying(snapshot);
if (NativeContract.Policy.GetBlockedAccounts(snapshot).Intersect(hashes).Any())
return RelayResultReason.PolicyFail;
BigInteger balance = NativeContract.GAS.BalanceOf(snapshot, Sender);
BigInteger fee = SystemFee + NetworkFee + totalSenderFeeFromPool;
if (balance < fee) return RelayResultReason.InsufficientFunds;
if (hashes.Length != Witnesses.Length) return RelayResultReason.Invalid;
for (int i = 0; i < hashes.Length; i++)
{
if (Witnesses[i].VerificationScript.Length > 0) continue;
if (snapshot.Contracts.TryGet(hashes[i]) is null) return RelayResultReason.Invalid;
}
return RelayResultReason.Succeed;
}

public virtual bool Verify(StoreView snapshot, BigInteger totalSenderFeeFromPool)
public RelayResultReason VerifyParallelParts(StoreView snapshot)
{
if (!Reverify(snapshot, totalSenderFeeFromPool)) return false;
int size = Size;
if (size > MaxTransactionSize) return false;
if (size > MaxTransactionSize) return RelayResultReason.Invalid;
long net_fee = NetworkFee - size * NativeContract.Policy.GetFeePerByte(snapshot);
if (net_fee < 0) return false;
return this.VerifyWitnesses(snapshot, net_fee);
if (net_fee < 0) return RelayResultReason.InsufficientFunds;
if (!this.VerifyWitnesses(snapshot, net_fee)) return RelayResultReason.Invalid;
return RelayResultReason.Succeed;
}

public StackItem ToStackItem()
Expand Down
27 changes: 8 additions & 19 deletions src/neo/Network/RPC/RpcServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -129,26 +129,15 @@ private JObject GetInvokeResult(byte[] script, IVerifiable checkWitnessHashes =

private static JObject GetRelayResult(RelayResultReason reason, UInt256 hash)
{
switch (reason)
if (reason == RelayResultReason.Succeed)
{
case RelayResultReason.Succeed:
{
var ret = new JObject();
ret["hash"] = hash.ToString();
return ret;
}
case RelayResultReason.AlreadyExists:
throw new RpcException(-501, "Block or transaction already exists and cannot be sent repeatedly.");
case RelayResultReason.OutOfMemory:
throw new RpcException(-502, "The memory pool is full and no more transactions can be sent.");
case RelayResultReason.UnableToVerify:
throw new RpcException(-503, "The block cannot be validated.");
case RelayResultReason.Invalid:
throw new RpcException(-504, "Block or transaction validation failed.");
case RelayResultReason.PolicyFail:
throw new RpcException(-505, "One of the Policy filters failed.");
default:
throw new RpcException(-500, "Unknown error.");
var ret = new JObject();
ret["hash"] = hash.ToString();
return ret;
}
else
{
throw new RpcException(-500, reason.ToString());
}
}

Expand Down
8 changes: 4 additions & 4 deletions tests/neo.UnitTests/Ledger/UT_MemoryPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ private Transaction CreateTransactionWithFee(long fee)
var randomBytes = new byte[16];
random.NextBytes(randomBytes);
Mock<Transaction> mock = new Mock<Transaction>();
mock.Setup(p => p.Reverify(It.IsAny<StoreView>(), It.IsAny<BigInteger>())).Returns(true);
mock.Setup(p => p.Verify(It.IsAny<StoreView>(), It.IsAny<BigInteger>())).Returns(true);
mock.Setup(p => p.VerifyForEachBlock(It.IsAny<StoreView>(), It.IsAny<BigInteger>())).Returns(RelayResultReason.Succeed);
mock.Setup(p => p.Verify(It.IsAny<StoreView>(), It.IsAny<BigInteger>())).Returns(RelayResultReason.Succeed);
mock.Object.Script = randomBytes;
mock.Object.Sender = UInt160.Zero;
mock.Object.NetworkFee = fee;
Expand All @@ -99,8 +99,8 @@ private Transaction CreateTransactionWithFeeAndBalanceVerify(long fee)
random.NextBytes(randomBytes);
Mock<Transaction> mock = new Mock<Transaction>();
UInt160 sender = UInt160.Zero;
mock.Setup(p => p.Reverify(It.IsAny<StoreView>(), It.IsAny<BigInteger>())).Returns(((StoreView snapshot, BigInteger amount) => NativeContract.GAS.BalanceOf(snapshot, sender) >= amount + fee));
mock.Setup(p => p.Verify(It.IsAny<StoreView>(), It.IsAny<BigInteger>())).Returns(true);
mock.Setup(p => p.VerifyForEachBlock(It.IsAny<StoreView>(), It.IsAny<BigInteger>())).Returns((StoreView snapshot, BigInteger amount) => NativeContract.GAS.BalanceOf(snapshot, sender) >= amount + fee ? RelayResultReason.Succeed : RelayResultReason.InsufficientFunds);
mock.Setup(p => p.Verify(It.IsAny<StoreView>(), It.IsAny<BigInteger>())).Returns(RelayResultReason.Succeed);
mock.Object.Script = randomBytes;
mock.Object.Sender = sender;
mock.Object.NetworkFee = fee;
Expand Down
4 changes: 2 additions & 2 deletions tests/neo.UnitTests/Ledger/UT_SendersFeeMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ private Transaction CreateTransactionWithFee(long networkFee, long systemFee)
var randomBytes = new byte[16];
random.NextBytes(randomBytes);
Mock<Transaction> mock = new Mock<Transaction>();
mock.Setup(p => p.Reverify(It.IsAny<StoreView>(), It.IsAny<BigInteger>())).Returns(true);
mock.Setup(p => p.Verify(It.IsAny<StoreView>(), It.IsAny<BigInteger>())).Returns(true);
mock.Setup(p => p.VerifyForEachBlock(It.IsAny<StoreView>(), It.IsAny<BigInteger>())).Returns(RelayResultReason.Succeed);
mock.Setup(p => p.Verify(It.IsAny<StoreView>(), It.IsAny<BigInteger>())).Returns(RelayResultReason.Succeed);
mock.Object.Script = randomBytes;
mock.Object.Sender = UInt160.Zero;
mock.Object.NetworkFee = networkFee;
Expand Down
2 changes: 1 addition & 1 deletion tests/neo.UnitTests/Network/P2P/Payloads/UT_Transaction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,7 @@ public void Transaction_Reverify_Hashes_Length_Unequal_To_Witnesses_Length()
};
UInt160[] hashes = txSimple.GetScriptHashesForVerifying(snapshot);
Assert.AreEqual(2, hashes.Length);
Assert.IsFalse(txSimple.Reverify(snapshot, BigInteger.Zero));
Assert.AreNotEqual(RelayResultReason.Succeed, txSimple.VerifyForEachBlock(snapshot, BigInteger.Zero));
}

[TestMethod]
Expand Down

0 comments on commit 7028bbc

Please sign in to comment.