Skip to content

Commit

Permalink
Make storing transactions atomic
Browse files Browse the repository at this point in the history
  • Loading branch information
dahlia committed Aug 7, 2019
1 parent ca51f0e commit 3c491cb
Show file tree
Hide file tree
Showing 4 changed files with 254 additions and 42 deletions.
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ To be released.
- `Swarm<T>.PreloadAsync()` method became to report a block downloading
progress with the total number of blocks to download in the entire batch,
instead of the window size of a chunk (i.e., 500). [[#396], [#399]]
- `FileStore` and `LiteDBStore` became to gurantee atomicity of storing
transactions. [[#413]]

### Bug fixes

Expand Down Expand Up @@ -96,6 +98,7 @@ To be released.
[#396]: https://github.com/planetarium/libplanet/issues/396
[#398]: https://github.com/planetarium/libplanet/pull/398
[#399]: https://github.com/planetarium/libplanet/pull/399
[#413]: https://github.com/planetarium/libplanet/pull/413
[LiteDB #1268]: https://github.com/mbdavid/LiteDB/issues/1268


Expand Down
110 changes: 110 additions & 0 deletions Libplanet.Tests/Store/StoreTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Collections.Immutable;
using System.Linq;
using System.Security.Cryptography;
using System.Threading.Tasks;
using Libplanet.Action;
using Libplanet.Blocks;
using Libplanet.Crypto;
Expand Down Expand Up @@ -542,5 +543,114 @@ public void IndexBlockHashReturnNull()
Assert.Equal(1, Fx.Store.CountIndex(Fx.StoreNamespace));
Assert.Null(Fx.Store.IndexBlockHash(Fx.StoreNamespace, 2));
}

[Fact]
public void TxAtomicity()
{
Transaction<AtomicityTestAction> MakeTx(
System.Random random,
MD5 md5,
PrivateKey key,
int txNonce
)
{
byte[] arbitraryBytes = new byte[20];
random.NextBytes(arbitraryBytes);
byte[] digest = md5.ComputeHash(arbitraryBytes);
var action = new AtomicityTestAction
{
ArbitraryBytes = arbitraryBytes.ToImmutableArray(),
Md5Digest = digest.ToImmutableArray(),
};
return Transaction<AtomicityTestAction>.Create(
txNonce,
key,
new[] { action },
ImmutableHashSet<Address>.Empty,
DateTimeOffset.UtcNow
);
}

const int taskCount = 10;
const int txCount = 50;
var md5Hasher = MD5.Create();
Transaction<AtomicityTestAction> commonTx = MakeTx(
new System.Random(),
md5Hasher,
new PrivateKey(),
0
);
Task[] tasks = new Task[taskCount];
for (int i = 0; i < taskCount; i++)
{
var task = new Task(() =>
{
PrivateKey key = new PrivateKey();
var random = new System.Random();
var md5 = MD5.Create();
Transaction<AtomicityTestAction> tx;
for (int j = 0; j < 50; j++)
{
Fx.Store.PutTransaction(commonTx);
}
for (int j = 0; j < txCount; j++)
{
tx = MakeTx(random, md5, key, j + 1);
Fx.Store.PutTransaction(tx);
}
});
task.Start();
tasks[i] = task;
}

Task.WaitAll(tasks);

Assert.Equal(1 + (taskCount * txCount), Fx.Store.CountTransactions());
foreach (TxId txid in Fx.Store.IterateTransactionIds())
{
var tx = Fx.Store.GetTransaction<AtomicityTestAction>(txid);
tx.Validate();
Assert.Single(tx.Actions);
AtomicityTestAction action = tx.Actions[0];
Assert.Equal(
md5Hasher.ComputeHash(action.ArbitraryBytes.ToBuilder().ToArray()),
action.Md5Digest.ToBuilder().ToArray()
);
}
}

private class AtomicityTestAction : IAction
{
public ImmutableArray<byte> ArbitraryBytes { get; set; }

public ImmutableArray<byte> Md5Digest { get; set; }

public IImmutableDictionary<string, object> PlainValue =>
new Dictionary<string, object>
{
{ "bytes", ArbitraryBytes.ToBuilder().ToArray() },
{ "md5", Md5Digest.ToBuilder().ToArray() },
}.ToImmutableDictionary();

public void LoadPlainValue(IImmutableDictionary<string, object> plainValue)
{
ArbitraryBytes = (plainValue["bytes"] as byte[]).ToImmutableArray();
Md5Digest = (plainValue["md5"] as byte[]).ToImmutableArray();
}

public IAccountStateDelta Execute(IActionContext context)
{
return context.PreviousStates;
}

public void Render(IActionContext context, IAccountStateDelta nextStates)
{
}

public void Unrender(IActionContext context, IAccountStateDelta nextStates)
{
}
}
}
}
81 changes: 62 additions & 19 deletions Libplanet/Store/FileStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Runtime.Serialization.Formatters.Binary;
using System.Security.Cryptography;
using System.Text.RegularExpressions;
using System.Threading;
using Libplanet.Action;
using Libplanet.Blocks;
using Libplanet.Serialization;
Expand Down Expand Up @@ -38,6 +39,8 @@ public class FileStore : BaseStore

private readonly string _path;

private readonly ReaderWriterLockSlim _txLock;

public FileStore(string path)
{
if (File.Exists(path) && !Directory.Exists(path))
Expand All @@ -54,6 +57,8 @@ public FileStore(string path)
{
Directory.CreateDirectory(Path.Combine(_path, dir));
}

_txLock = new ReaderWriterLockSlim();
}

public string GetTransactionPath(TxId txid)
Expand Down Expand Up @@ -309,12 +314,29 @@ public override bool DeleteBlock(HashDigest<SHA256> blockHash)
public override bool DeleteTransaction(TxId txid)
{
var txFile = new FileInfo(GetTransactionPath(txid));
if (!txFile.Exists)
_txLock.EnterUpgradeableReadLock();
try
{
return false;
if (!txFile.Exists)
{
return false;
}

_txLock.EnterWriteLock();
try
{
txFile.Delete();
}
finally
{
_txLock.ExitWriteLock();
}
}
finally
{
_txLock.ExitUpgradeableReadLock();
}

txFile.Delete();
return true;
}

Expand Down Expand Up @@ -484,25 +506,31 @@ public override IEnumerable<TxId> IterateTransactionIds()
RegexOptions.IgnoreCase
);
var rootDir = new DirectoryInfo(GetTransactionPath());
foreach (DirectoryInfo prefix in rootDir.EnumerateDirectories())
_txLock.EnterReadLock();
try
{
if (!prefixRegex.IsMatch(prefix.Name))
{
continue;
}

foreach (FileInfo rest in prefix.EnumerateFiles())
foreach (DirectoryInfo prefix in rootDir.EnumerateDirectories())
{
if (!restRegex.IsMatch(rest.Name))
if (!prefixRegex.IsMatch(prefix.Name))
{
continue;
}

yield return new TxId(
ByteUtil.ParseHex(prefix.Name + rest.Name)
);
foreach (FileInfo rest in prefix.EnumerateFiles())
{
if (!restRegex.IsMatch(rest.Name))
{
continue;
}

yield return new TxId(ByteUtil.ParseHex(prefix.Name + rest.Name));
}
}
}
finally
{
_txLock.ExitReadLock();
}
}

/// <inheritdoc />
Expand All @@ -528,13 +556,28 @@ public override void PutBlock<T>(Block<T> block)

public override void PutTransaction<T>(Transaction<T> tx)
{
byte[] txBytes = tx.ToBencodex(true);
var txFile = new FileInfo(GetTransactionPath(tx.Id));
txFile.Directory.Create();
using (Stream stream = txFile.Open(
FileMode.OpenOrCreate, FileAccess.Write))
_txLock.EnterUpgradeableReadLock();
try
{
txFile.Directory.Create();
_txLock.EnterWriteLock();
try
{
using (Stream stream = txFile.Open(FileMode.OpenOrCreate, FileAccess.Write))
{
stream.Write(txBytes, 0, txBytes.Length);
}
}
finally
{
_txLock.ExitWriteLock();
}
}
finally
{
byte[] txBytes = tx.ToBencodex(true);
stream.Write(txBytes, 0, txBytes.Length);
_txLock.ExitUpgradeableReadLock();
}
}

Expand Down
Loading

0 comments on commit 3c491cb

Please sign in to comment.