Skip to content

Commit

Permalink
Give staged transactions lifetime on VolatileStagePolicy<T>
Browse files Browse the repository at this point in the history
  • Loading branch information
dahlia committed Dec 22, 2020
1 parent cb19702 commit 1ab33aa
Show file tree
Hide file tree
Showing 8 changed files with 143 additions and 44 deletions.
13 changes: 9 additions & 4 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ To be released.

### Added APIs

- Added `IStagePolicy` interface. [[#1130], [#1131]]
- Added `VolatileStagePolicy` class. [[#1130], [#1131]]
- Added `IStagePolicy<T>` interface. [[#1130], [#1131]]
- Added `VolatileStagePolicy<T>` class. [[#1130], [#1131], [#1136]]
- Added `ITransport` interface. [[#1052]]
- Added `NetMQTransport` class which implements `ITransport`. [[#1052]]
- Added `Message` abstract class. [[#1052]]
Expand Down Expand Up @@ -74,7 +74,8 @@ To be released.
- Removed `Swarm<T>.TraceTable()` method. [[#1120]]
- Added `Swarm<T>.PeerStates` property. [[#1120]]
- Added `IProtocol` interface. [[#1120]]
- Added `KademliaProtocol` class which implements `IProtocol`. [[#1120]]
- Added `KademliaProtocol` class which implements `IProtocol`.
[[#1120], [#1135]]

### Behavioral changes

Expand All @@ -84,7 +85,9 @@ To be released.
- When a `BlockChain<T>` follows `VolatileStagePolicy<T>`, which is
Libplanet's the only built-in `IStagePolicy<T>` implementation at
the moment, as its `StagePolicy`, its staged transactions are no longer
persistent but volatile instead. [[#1130], [#1131]]
persistent but volatile instead. It also automatically purges staged
transactions after the given `Lifetime`, which is 3 hours by default.
[[#1130], [#1131], [#1136]]
- `Swarm<T>` became not to receive states from trusted peers.
[[#1061], [#1102]]
- `Swarm<T>` became not to retry when block downloading. [[#1062], [#1102]]
Expand Down Expand Up @@ -135,6 +138,8 @@ To be released.
[#1130]: https://github.com/planetarium/libplanet/issues/1130
[#1131]: https://github.com/planetarium/libplanet/pull/1131
[#1132]: https://github.com/planetarium/libplanet/pull/1132
[#1135]: https://github.com/planetarium/libplanet/pull/1135
[#1136]: https://github.com/planetarium/libplanet/pull/1136


Version 0.10.2
Expand Down
2 changes: 1 addition & 1 deletion Libplanet.Tests/Blockchain/BlockChainTest.Append.cs
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ public void UnstageAfterAppendComplete()
{
PrivateKey privateKey = new PrivateKey();
(Address[] addresses, Transaction<DumbAction>[] txs) =
MakeFixturesForAppendTests(privateKey);
MakeFixturesForAppendTests(privateKey, epoch: DateTimeOffset.UtcNow);
var genesis = _blockChain.Genesis;

Block<DumbAction> block1 = TestUtils.MineNext(
Expand Down
20 changes: 16 additions & 4 deletions Libplanet.Tests/Blockchain/BlockChainTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1639,6 +1639,16 @@ public void GetNextTxNonce()
};
StageTransactions(txsE);

foreach (var tx in _blockChain.StagePolicy.Iterate(_blockChain))
{
_logger.Fatal(
"{Id}; {Signer}; {Nonce}; {Timestamp}",
tx.Id,
tx.Signer,
tx.Nonce,
tx.Timestamp);
}

Assert.Equal(6, _blockChain.GetNextTxNonce(address));
}

Expand Down Expand Up @@ -2011,8 +2021,10 @@ void BuildIndex(Guid id, Block<DumbAction> block)
MakeIncompleteBlockStates() =>
MakeIncompleteBlockStates(_fx.Store, _fx.StateStore);

private (Address[], Transaction<DumbAction>[])
MakeFixturesForAppendTests(PrivateKey privateKey = null)
private (Address[], Transaction<DumbAction>[]) MakeFixturesForAppendTests(
PrivateKey privateKey = null,
DateTimeOffset epoch = default
)
{
Address[] addresses =
{
Expand All @@ -2039,7 +2051,7 @@ void BuildIndex(Guid id, Block<DumbAction> block)
new DumbAction(addresses[0], "foo"),
new DumbAction(addresses[1], "bar"),
},
timestamp: DateTimeOffset.MinValue,
timestamp: epoch,
nonce: 0,
privateKey: privateKey),
_fx.MakeTransaction(
Expand All @@ -2048,7 +2060,7 @@ void BuildIndex(Guid id, Block<DumbAction> block)
new DumbAction(addresses[2], "baz"),
new DumbAction(addresses[3], "qux"),
},
timestamp: DateTimeOffset.MinValue.AddSeconds(5),
timestamp: epoch.AddSeconds(5),
nonce: 1,
privateKey: privateKey),
};
Expand Down
13 changes: 7 additions & 6 deletions Libplanet.Tests/Blockchain/Policies/StagePolicyTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ namespace Libplanet.Tests.Blockchain.Policies
{
public abstract class StagePolicyTest
{
private readonly BlockPolicy<DumbAction> _policy;
private readonly DefaultStoreFixture _fx;
private readonly BlockChain<DumbAction> _chain;
private readonly Transaction<DumbAction>[] _txs;
protected readonly BlockPolicy<DumbAction> _policy;
protected readonly DefaultStoreFixture _fx;
protected readonly BlockChain<DumbAction> _chain;
protected readonly PrivateKey _key;
protected readonly Transaction<DumbAction>[] _txs;

protected StagePolicyTest()
{
Expand All @@ -27,11 +28,11 @@ protected StagePolicyTest()
_fx.StateStore,
_fx.GenesisBlock
);
var key = new PrivateKey();
_key = new PrivateKey();
_txs = Enumerable.Range(0, 5).Select(i =>
Transaction<DumbAction>.Create(
i,
key,
_key,
_fx.GenesisBlock.Hash,
Enumerable.Empty<DumbAction>()
)
Expand Down
29 changes: 29 additions & 0 deletions Libplanet.Tests/Blockchain/Policies/VolatileStagePolicyTest.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
using System;
using System.Linq;
using System.Threading;
using Libplanet.Blockchain.Policies;
using Libplanet.Tests.Common.Action;
using Libplanet.Tx;
using Xunit;

namespace Libplanet.Tests.Blockchain.Policies
{
Expand All @@ -10,5 +15,29 @@ public class VolatileStagePolicyTest : StagePolicyTest

protected override IStagePolicy<DumbAction> StagePolicy =>
_stagePolicy;

[Fact]
public void Lifetime()
{
TimeSpan timeBuffer = TimeSpan.FromSeconds(2.5);

Transaction<DumbAction> tx = Transaction<DumbAction>.Create(
0,
_key,
_fx.GenesisBlock.Hash,
Enumerable.Empty<DumbAction>(),
timestamp: DateTimeOffset.UtcNow - _stagePolicy.Lifetime + timeBuffer
);
_stagePolicy.Stage(_chain, tx);
Assert.True(_stagePolicy.HasStaged(_chain, tx.Id, false));
Assert.Equal(tx, _stagePolicy.Get(_chain, tx.Id, false));
Assert.Contains(tx, _stagePolicy.Iterate(_chain));

Thread.Sleep(timeBuffer * 2);

Assert.False(_stagePolicy.HasStaged(_chain, tx.Id, true));
Assert.Null(_stagePolicy.Get(_chain, tx.Id, true));
Assert.DoesNotContain(tx, _stagePolicy.Iterate(_chain));
}
}
}
4 changes: 2 additions & 2 deletions Libplanet.Tests/Store/StoreFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ public Transaction<DumbAction> MakeTransaction(
)
{
privateKey = privateKey ?? new PrivateKey();
timestamp = timestamp ??
new DateTimeOffset(2018, 11, 21, 0, 0, 0, TimeSpan.Zero);
timestamp = timestamp ?? DateTimeOffset.UtcNow;

return Transaction<DumbAction>.Create(
nonce,
privateKey,
Expand Down
5 changes: 5 additions & 0 deletions Libplanet/Blockchain/BlockChain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,11 @@ public long GetNextTxNonce(Address address)

foreach (long n in stagedTxNonces)
{
if (n < nonce)
{
continue;
}

if (n != nonce)
{
break;
Expand Down
101 changes: 74 additions & 27 deletions Libplanet/Blockchain/Policies/VolatileStagePolicy.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#nullable enable
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using Libplanet.Action;
using Libplanet.Tx;
Expand All @@ -23,17 +25,41 @@ public class VolatileStagePolicy<T> : IStagePolicy<T>

/// <summary>
/// Creates a new <see cref="VolatileStagePolicy{T}"/> instance.
/// <para><see cref="Lifetime"/> is configured to 3 hours.</para>
/// </summary>
public VolatileStagePolicy()
: this(TimeSpan.FromHours(3))
{
}

/// <summary>
/// Creates a new <see cref="VolatileStagePolicy{T}"/> instance.
/// </summary>
/// <param name="lifetime">Volatilizes staged transactions older than this <paramref
/// name="lifetime"/>. See also the <see cref="Lifetime"/> property.</param>
public VolatileStagePolicy(TimeSpan lifetime)
{
Lifetime = lifetime;
_set = new ConcurrentDictionary<TxId, Transaction<T>>();
_queue = new List<TxId>();
_lock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);
}

/// <summary>
/// Volatilizes staged transactions older than this <see cref="Lifetime"/>.
/// <para>Note that transactions older than the lifetime never cannot be staged.</para>
/// </summary>
public TimeSpan Lifetime { get; }

/// <inheritdoc cref="IStagePolicy{T}.Stage(BlockChain{T}, Transaction{T})"/>
public void Stage(BlockChain<T> blockChain, Transaction<T> transaction)
{
if (DateTimeOffset.UtcNow - Lifetime > transaction.Timestamp)
{
// The transaction is already expired; don't stage it at all.
return;
}

try
{
if (!_set.TryAdd(transaction.Id, transaction))
Expand Down Expand Up @@ -73,46 +99,38 @@ public void Unstage(BlockChain<T> blockChain, TxId id)
}

/// <inheritdoc cref="IStagePolicy{T}.HasStaged(BlockChain{T}, TxId, bool)"/>
public bool HasStaged(BlockChain<T> blockChain, TxId id, bool includeUnstaged)
{
if (includeUnstaged)
{
return _set.ContainsKey(id);
}

_lock.EnterReadLock();
try
{
return _queue.Contains(id);
}
finally
{
_lock.ExitReadLock();
}
}
public bool HasStaged(BlockChain<T> blockChain, TxId id, bool includeUnstaged) =>
Get(blockChain, id, includeUnstaged) is { };

/// <inheritdoc cref="IStagePolicy{T}.Get(BlockChain{T}, TxId, bool)"/>
public Transaction<T>? Get(BlockChain<T> blockChain, TxId id, bool includeUnstaged)
{
if (_set.TryGetValue(id, out Transaction<T>? tx))
if (!_set.TryGetValue(id, out Transaction<T>? tx))
{
return null;
}
else if (tx.Timestamp >= DateTimeOffset.UtcNow - Lifetime)
{
if (includeUnstaged)
{
return tx;
}

_lock.EnterReadLock();
bool queued;
try
{
queued = _queue.Contains(tx.Id);
return includeUnstaged || _queue.Contains(id) ? tx : null;
}
finally
{
_lock.ExitReadLock();
}
}

return queued ? tx : null;
_lock.EnterWriteLock();
try
{
_queue.Remove(id);
_set.TryRemove(id, out _);
}
finally
{
_lock.ExitWriteLock();
}

return null;
Expand All @@ -132,13 +150,42 @@ public IEnumerable<Transaction<T>> Iterate(BlockChain<T> blockChain)
_lock.ExitReadLock();
}

DateTimeOffset exp = DateTimeOffset.UtcNow - Lifetime;
var expired = new List<TxId>();
foreach (TxId txid in queue)
{
if (_set.TryGetValue(txid, out Transaction<T>? tx))
{
yield return tx;
if (tx.Timestamp > exp)
{
yield return tx;
}
else
{
expired.Add(tx.Id);
}
}
}

if (!expired.Any())
{
yield break;
}

// Clean up expired transactions (if any exist).
_lock.EnterWriteLock();
try
{
foreach (TxId txid in expired)
{
_queue.Remove(txid);
_set.TryRemove(txid, out _);
}
}
finally
{
_lock.ExitWriteLock();
}
}
}
}

0 comments on commit 1ab33aa

Please sign in to comment.