Skip to content

Commit

Permalink
NonblockRenderer<T>
Browse files Browse the repository at this point in the history
  • Loading branch information
dahlia committed Aug 3, 2021
1 parent 368f0e2 commit a840662
Show file tree
Hide file tree
Showing 3 changed files with 286 additions and 0 deletions.
5 changes: 5 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,17 @@ To be released.

### Added APIs

- Added `NonblockRenderer<T>` class. [[#1402], [#1422]]

### Behavioral changes

### Bug fixes

### CLI tools

[#1402]: https://github.com/planetarium/libplanet/issues/1402
[#1422]: https://github.com/planetarium/libplanet/pull/1422


Version 0.13.1
--------------
Expand Down
86 changes: 86 additions & 0 deletions Libplanet.Tests/Blockchain/Renderers/NonblockRendererTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
using System;
using System.Collections.Generic;
using System.Security.Cryptography;
using System.Threading;
using Libplanet.Blockchain.Renderers;
using Libplanet.Blocks;
using Libplanet.Tests.Common.Action;
using xRetry;
using Xunit;

namespace Libplanet.Tests.Blockchain.Renderers
{
public class NonblockRendererTest
{
private static HashAlgorithmType _hashAlgorithm = HashAlgorithmType.Of<SHA256>();

private static Block<DumbAction> _genesis =
TestUtils.MineGenesis<DumbAction>(_ => _hashAlgorithm, default(Address));

private static Block<DumbAction> _blockA =
TestUtils.MineNext(_genesis, _ => _hashAlgorithm);

private static Block<DumbAction> _blockB =
TestUtils.MineNext(_genesis, _ => _hashAlgorithm);

[RetryFact]
public void Test()
{
const int sleepSeconds = 1;
var log = new List<string>();
var innerRenderer = new AnonymousRenderer<DumbAction>()
{
BlockRenderer = (Block<DumbAction> oldTip, Block<DumbAction> newTip) =>
{
Thread.Sleep(sleepSeconds * 1000);
log.Add($"Block({oldTip.Index}, {newTip.Index})");
},
ReorgRenderer = (
Block<DumbAction> oldTip,
Block<DumbAction> newTip,
Block<DumbAction> branchpoint
) =>
{
Thread.Sleep(sleepSeconds * 1000);
log.Add($"Reorg({oldTip.Index}, {newTip.Index}, {branchpoint.Index})");
},
ReorgEndRenderer = (
Block<DumbAction> oldTip,
Block<DumbAction> newTip,
Block<DumbAction> branchpoint
) =>
{
Thread.Sleep(sleepSeconds * 1000);
log.Add($"ReorgEnd({oldTip.Index}, {newTip.Index}, {branchpoint.Index})");
},
};
using (var renderer = new NonblockRenderer<DumbAction>(
innerRenderer, 3, NonblockRenderer<DumbAction>.FullMode.DropNewest))
{
DateTimeOffset start = DateTimeOffset.UtcNow;
renderer.RenderReorg(_blockA, _blockB, _genesis);
Assert.Empty(log);
renderer.RenderBlock(_blockA, _blockB);
Assert.Empty(log);
renderer.RenderReorgEnd(_blockA, _blockB, _genesis);
Assert.Empty(log);
DateTimeOffset end = DateTimeOffset.UtcNow;
TimeSpan elapsed = end - start;
Assert.True(
elapsed < TimeSpan.FromSeconds(3 * sleepSeconds),
$"Elapsed more than {3 * sleepSeconds} seconds ({elapsed}); seems blocking."
);
}

Assert.Equal(
new[]
{
"Reorg(1, 1, 0)",
"Block(1, 1)",
"ReorgEnd(1, 1, 0)",
},
log
);
}
}
}
195 changes: 195 additions & 0 deletions Libplanet/Blockchain/Renderers/NonblockRenderer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
#nullable enable
using System;
using System.Diagnostics.CodeAnalysis;
using System.Threading;
using System.Threading.Channels;
using Libplanet.Action;
using Libplanet.Blocks;

namespace Libplanet.Blockchain.Renderers
{
/// <summary>
/// Decorates a <see cref="IRenderer{T}"/> instance and lets all rendering events be
/// non-blocking.
/// <para>Every method call on the renderer will immediately return and the rendering
/// will be performed in a background thread. Note that the order of render events is
/// still guaranteed. In other words, a later event never arrives before events earlier
/// than it.</para>
/// </summary>
/// <typeparam name="T">An <see cref="IAction"/> type. It should match to
/// <see cref="Libplanet.Blockchain.BlockChain{T}"/>'s type parameter.</typeparam>
/// <example>
/// <code><![CDATA[
/// IRenderer<ExampleAction> renderer = new SomeRenderer();
/// // Wraps the renderer with NonblockRenderer; the SomeRenderer instance becomes to receive
/// // event messages in NonblockRenderer's backround thread:
/// renderer = new NonblockRenderer<ExampleAction>(
/// renderer,
/// queue: 1024,
/// fullFallback: droppedEvent => ShowError("Too many rendering events in a short time."));
/// /// ...
/// // Should be disposed when no longer needed:
/// renderer.Dispose();
/// ]]></code>
/// </example>
/// <remarks>As rendering events become performed in a background thread instead of the main
/// thread, some graphics/UI drawings might be disallowed. In such case, communicate with the
/// main thread through <a
/// href="https://devblogs.microsoft.com/dotnet/an-introduction-to-system-threading-channels/"
/// >producer/consumer channels</a>.</remarks>
public class NonblockRenderer<T> : IRenderer<T>, IDisposable
where T : IAction, new()
{
private readonly Channel<System.Action> _channel;
private readonly ChannelWriter<System.Action> _writer;
private readonly ChannelReader<System.Action> _reader;
private readonly FullFallback? _fullFallback;
private readonly Thread _worker;

/// <summary>
/// Creates a new instance of <see cref="NonblockRenderer{T}"/> decorating the given
/// <paramref name="renderer"/> instance.
/// </summary>
/// <param name="renderer">The renderer to decorate which has the <em>actual</em>
/// implementations and receives events in a background thread.</param>
/// <param name="queue">The size of the internal event queue.</param>
/// <param name="fullMode">Specifies the behavior when the internal event queue is full so
/// that no more event can be added.</param>
[SuppressMessage(
"Microsoft.StyleCop.CSharp.ReadabilityRules",
"SA1118",
Justification = "A switch expression should be multiline.")]
public NonblockRenderer(IRenderer<T> renderer, int queue, FullMode fullMode)
: this(
renderer,
queue,
fullMode switch
{
FullMode.DropOldest => BoundedChannelFullMode.DropOldest,
_ => BoundedChannelFullMode.DropNewest,
}
)
{
}

/// <summary>
/// Creates a new instance of <see cref="NonblockRenderer{T}"/> decorating the given
/// <paramref name="renderer"/> instance.
/// </summary>
/// <param name="renderer">The renderer to decorate which has the <em>actual</em>
/// implementations and receives events in a background thread.</param>
/// <param name="queue">The size of the internal event queue.</param>
/// <param name="fullFallback">Specifies the custom behavior when the internal event
/// queue is full so that no more event can be added.</param>
public NonblockRenderer(IRenderer<T> renderer, int queue, FullFallback fullFallback)
: this(
renderer,
queue,
BoundedChannelFullMode.DropWrite,
fullFallback
)
{
}

private NonblockRenderer(
IRenderer<T> renderer,
int queue,
BoundedChannelFullMode boundedChannelFullMode,
FullFallback? fullFallback = null
)
{
Renderer = renderer;
_channel = Channel.CreateBounded<System.Action>(new BoundedChannelOptions(queue)
{
AllowSynchronousContinuations = true,
FullMode = boundedChannelFullMode,
SingleReader = true,
SingleWriter = true,
});
_writer = _channel.Writer;
_reader = _channel.Reader;
_fullFallback = fullFallback;
_worker = new Thread(async () =>
{
while (await _reader.WaitToReadAsync())
{
while (_reader.TryRead(out System.Action? action))
{
action?.Invoke();
}
}
})
{
IsBackground = true,
};
}

/// <summary>
/// Customizes behavior when the internal event queue is full so that no more event
/// can be added.
/// </summary>
/// <param name="droppedEvent">The render event failed to be queued.</param>
public delegate void FullFallback(System.Action droppedEvent);

/// <summary>
/// Specifies the behavior when the internal event queue is full so that no more event
/// can be added.
/// </summary>
public enum FullMode
{
/// <summary>
/// Drops the oldest event when the queue is full.
/// </summary>
DropOldest,

/// <summary>
/// Drops the newest event when the queue is full.
/// </summary>
DropNewest,
}

/// <summary>
/// The inner renderer which has the <em>actual</em> implementations and receives events.
/// </summary>
public IRenderer<T> Renderer { get; }

/// <inheritdoc cref="IDisposable.Dispose()"/>
public void Dispose()
{
_channel.Writer.Complete();
if (_worker.IsAlive)
{
_worker.Join();
}
}

/// <inheritdoc cref="IRenderer{T}.RenderBlock(Block{T}, Block{T})"/>
public void RenderBlock(Block<T> oldTip, Block<T> newTip) =>
Queue(() => Renderer.RenderBlock(oldTip, newTip));

/// <inheritdoc cref="IRenderer{T}.RenderReorg(Block{T}, Block{T}, Block{T})"/>
public void RenderReorg(Block<T> oldTip, Block<T> newTip, Block<T> branchpoint) =>
Queue(() => Renderer.RenderReorg(oldTip, newTip, branchpoint));

/// <inheritdoc cref="IRenderer{T}.RenderReorgEnd(Block{T}, Block{T}, Block{T})"/>
public void RenderReorgEnd(Block<T> oldTip, Block<T> newTip, Block<T> branchpoint) =>
Queue(() => Renderer.RenderReorgEnd(oldTip, newTip, branchpoint));

/// <summary>
/// Queues the callback to be executed in the worker thread.
/// </summary>
/// <param name="action">The callback to be executed in the worker thread.</param>
protected void Queue(System.Action action)
{
if (!_writer.TryWrite(action))
{
_fullFallback?.Invoke(action);
}

if (!_worker.IsAlive)
{
_worker.Start();
}
}
}
}

0 comments on commit a840662

Please sign in to comment.