From d429a71a6787484a68d084b118bcd7239c6186b6 Mon Sep 17 00:00:00 2001 From: Swen Mun Date: Wed, 5 Jun 2019 11:39:53 +0900 Subject: [PATCH 1/8] Fix tests --- Libplanet.Tests/Net/SwarmTest.cs | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/Libplanet.Tests/Net/SwarmTest.cs b/Libplanet.Tests/Net/SwarmTest.cs index a6b786a340..931a1d852d 100644 --- a/Libplanet.Tests/Net/SwarmTest.cs +++ b/Libplanet.Tests/Net/SwarmTest.cs @@ -110,19 +110,20 @@ public async Task CanNotStartTwice() } [Fact(Timeout = Timeout)] - public async Task CanStop() + public async Task StopAsync() { Swarm swarm = _swarms[0]; BlockChain chain = _blockchains[0]; await swarm.StopAsync(); - Task task = await StartAsync(swarm, chain); + var task = await StartAsync(swarm, chain); Assert.True(swarm.Running); await swarm.StopAsync(); Assert.False(swarm.Running); - await task; + + Assert.False(task.IsFaulted); } [Fact(Timeout = Timeout)] @@ -867,12 +868,10 @@ private async Task StartAsync( ) where T : IAction, new() { - Task task = Task.Run( - async () => await swarm.StartAsync( - blockChain, - 200, - cancellationToken - ) + Task task = swarm.StartAsync( + blockChain, + 200, + cancellationToken ); await swarm.WaitForRunningAsync(); return task; From 1aba1607c0f328dec70ba36b2944c48e333d0526 Mon Sep 17 00:00:00 2001 From: Swen Mun Date: Fri, 7 Jun 2019 17:31:55 +0900 Subject: [PATCH 2/8] Bump AsyncEnumerator --- Libplanet/Libplanet.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Libplanet/Libplanet.csproj b/Libplanet/Libplanet.csproj index 871e22d6a1..f1944208bf 100644 --- a/Libplanet/Libplanet.csproj +++ b/Libplanet/Libplanet.csproj @@ -59,7 +59,7 @@ https://docs.libplanet.io/ - + From 1d711a1217e2d5c1dc487d0915f9b60c3215ef99 Mon Sep 17 00:00:00 2001 From: Swen Mun Date: Wed, 5 Jun 2019 14:21:33 +0900 Subject: [PATCH 3/8] Ignore insignificant exception --- CHANGES.md | 4 ++++ Libplanet/Net/Swarm.cs | 12 ++++++++++++ 2 files changed, 16 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index 90fc3138f1..881dfbe44b 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -21,8 +21,12 @@ To be released. ### Bug fixes +- Fixed a bug that `Swarm` reported `TaskCanceledException` as an unknown + exception while stopping. [[#275]] + [#270]: https://github.com/planetarium/libplanet/pull/270 [#273]: https://github.com/planetarium/libplanet/issues/273 +[#275]: https://github.com/planetarium/libplanet/pull/275 [#276]: https://github.com/planetarium/libplanet/pull/276 diff --git a/Libplanet/Net/Swarm.cs b/Libplanet/Net/Swarm.cs index aec16bf5e9..3826271bb1 100644 --- a/Libplanet/Net/Swarm.cs +++ b/Libplanet/Net/Swarm.cs @@ -549,6 +549,10 @@ await PreloadAsync( await Task.WhenAny(tasks); } + catch (TaskCanceledException e) + { + _logger.Information(e, "Task was canceled."); + } catch (Exception e) { _logger.Error( @@ -949,6 +953,10 @@ await ProcessMessageAsync( "Could not parse NetMQMessage properly; ignore." ); } + catch (TaskCanceledException e) + { + _logger.Information(e, "Task was canceled."); + } catch (Exception e) { _logger.Error( @@ -1153,6 +1161,10 @@ await FillBlocksAsync( peer, synced, stop, progress, cancellationToken); break; } + catch (TaskCanceledException) + { + throw; + } catch (Exception e) { if (retry > 0) From 575c06008abf1afbf200817ce3d5928394def675 Mon Sep 17 00:00:00 2001 From: Swen Mun Date: Wed, 5 Jun 2019 14:22:08 +0900 Subject: [PATCH 4/8] Add cancellation support --- Libplanet/Net/Swarm.cs | 89 +++++++++++++++++++++++------------------- 1 file changed, 49 insertions(+), 40 deletions(-) diff --git a/Libplanet/Net/Swarm.cs b/Libplanet/Net/Swarm.cs index 3826271bb1..092b0a2714 100644 --- a/Libplanet/Net/Swarm.cs +++ b/Libplanet/Net/Swarm.cs @@ -516,19 +516,20 @@ public async Task StartAsync( try { + _workerCancellationTokenSource = new CancellationTokenSource(); + CancellationToken workerCancellationToken = + CancellationTokenSource.CreateLinkedTokenSource( + _workerCancellationTokenSource.Token, cancellationToken + ).Token; + using (await _runningMutex.LockAsync()) { Running = true; await PreloadAsync( blockChain, - cancellationToken: cancellationToken); + cancellationToken: workerCancellationToken); } - _workerCancellationTokenSource = new CancellationTokenSource(); - CancellationToken workerCancellationToken = - CancellationTokenSource.CreateLinkedTokenSource( - _workerCancellationTokenSource.Token, cancellationToken - ).Token; var tasks = new List { RepeatDeltaDistributionAsync( @@ -640,7 +641,8 @@ await socket.SendMultipartMessageAsync( cancellationToken: token); NetMQMessage response = - await socket.ReceiveMultipartMessageAsync(); + await socket.ReceiveMultipartMessageAsync( + cancellationToken: token); Message parsedMessage = Message.Parse(response, reply: true); if (parsedMessage is BlockHashes blockHashes) { @@ -934,16 +936,18 @@ private async Task ReceiveMessageAsync( // Queue a task per message to avoid blocking. #pragma warning disable CS4014 - Task.Run(async () => - { - // it's still async because some method it relies are - // async yet. - await ProcessMessageAsync( - blockChain, - message, - cancellationToken - ); - }); + Task.Run( + async () => + { + // it's still async because some method it relies + // are async yet. + await ProcessMessageAsync( + blockChain, + message, + cancellationToken + ); + }, + cancellationToken); #pragma warning restore CS4014 } catch (InvalidMessageException e) @@ -1071,12 +1075,14 @@ private async Task ProcessBlockHashes( IAsyncEnumerable> fetched = GetBlocksAsync( peer, message.Hashes, cancellationToken); - List> blocks = await fetched.ToListAsync(); + List> blocks = await fetched.ToListAsync( + cancellationToken + ); _logger.Debug("GetBlocksAsync() complete."); try { - using (await _blockSyncMutex.LockAsync()) + using (await _blockSyncMutex.LockAsync(cancellationToken)) { await AppendBlocksAsync( blockChain, peer, blocks, cancellationToken @@ -1240,7 +1246,7 @@ private async Task FillBlocksAsync( CancellationToken cancellationToken) where T : IAction, new() { - while (true) + while (!cancellationToken.IsCancellationRequested) { BlockLocator locator = blockChain.GetBlockLocator(); IEnumerable> hashes = @@ -1269,27 +1275,30 @@ await GetBlocksAsync( peer, hashesAsArray, cancellationToken - ).ForEachAsync(block => - { - _logger.Debug($"Trying to append block[{block.Hash}]..."); - - // As actions in this block should be rendered - // after actions in stale blocks are unrendered, - // given the `render: false` option here. - blockChain.Append( - block, - DateTimeOffset.UtcNow, - render: false - ); - received++; - progress?.Report(new BlockDownloadState() + ).ForEachAsync( + block => { - TotalBlockCount = hashCount, - ReceivedBlockCount = received, - ReceivedBlockHash = block.Hash, - }); - _logger.Debug($"Block[{block.Hash}] is appended."); - }); + _logger.Debug( + $"Trying to append block[{block.Hash}]..."); + + // As actions in this block should be rendered + // after actions in stale blocks are unrendered, + // given the `render: false` option here. + blockChain.Append( + block, + DateTimeOffset.UtcNow, + render: false + ); + received++; + progress?.Report(new BlockDownloadState + { + TotalBlockCount = hashCount, + ReceivedBlockCount = received, + ReceivedBlockHash = block.Hash, + }); + _logger.Debug($"Block[{block.Hash}] is appended."); + }, + cancellationToken); } } From 816d217d6c381da1405ea482d5ed3e277ef7f7e5 Mon Sep 17 00:00:00 2001 From: Swen Mun Date: Wed, 5 Jun 2019 14:23:19 +0900 Subject: [PATCH 5/8] Double check exit --- Libplanet/Net/Swarm.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Libplanet/Net/Swarm.cs b/Libplanet/Net/Swarm.cs index 092b0a2714..dbb139cc28 100644 --- a/Libplanet/Net/Swarm.cs +++ b/Libplanet/Net/Swarm.cs @@ -548,7 +548,7 @@ await PreloadAsync( tasks.Add(RefreshPermissions(workerCancellationToken)); } - await Task.WhenAny(tasks); + await await Task.WhenAny(tasks); } catch (TaskCanceledException e) { From f0800a8fea17f29ba1a4a772be87ae97634ddb7b Mon Sep 17 00:00:00 2001 From: Swen Mun Date: Fri, 7 Jun 2019 16:55:16 +0900 Subject: [PATCH 6/8] Yield.Token instead of cancellationToken --- Libplanet/Net/Swarm.cs | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/Libplanet/Net/Swarm.cs b/Libplanet/Net/Swarm.cs index dbb139cc28..5f5f8984cd 100644 --- a/Libplanet/Net/Swarm.cs +++ b/Libplanet/Net/Swarm.cs @@ -657,8 +657,7 @@ await socket.ReceiveMultipartMessageAsync( internal IAsyncEnumerable> GetBlocksAsync( Peer peer, - IEnumerable> blockHashes, - CancellationToken token = default(CancellationToken)) + IEnumerable> blockHashes) where T : IAction, new() { if (!_peers.ContainsKey(peer)) @@ -669,6 +668,7 @@ internal IAsyncEnumerable> GetBlocksAsync( return new AsyncEnumerable>(async yield => { + CancellationToken yieldToken = yield.CancellationToken; using (var socket = new DealerSocket(ToNetMQAddress(peer))) { var blockHashesAsArray = @@ -677,16 +677,17 @@ blockHashes as HashDigest[] ?? var request = new GetBlocks(blockHashesAsArray); await socket.SendMultipartMessageAsync( request.ToNetMQMessage(_privateKey), - cancellationToken: token); + cancellationToken: yieldToken); int hashCount = blockHashesAsArray.Count(); - _logger.Debug($"Required block count: {hashCount}."); - while (hashCount > 0) + _logger.Debug( + $"Required block count: {hashCount}. {yieldToken}"); + while (hashCount > 0 && !yieldToken.IsCancellationRequested) { _logger.Debug("Receiving block..."); NetMQMessage response = await socket.ReceiveMultipartMessageAsync( - cancellationToken: token); + cancellationToken: yieldToken); Message parsedMessage = Message.Parse(response, true); if (parsedMessage is Messages.Blocks blockMessage) { @@ -1073,7 +1074,9 @@ private async Task ProcessBlockHashes( $"Trying to GetBlocksAsync() " + $"(using {message.Hashes.Count()} hashes)"); IAsyncEnumerable> fetched = GetBlocksAsync( - peer, message.Hashes, cancellationToken); + peer, + message.Hashes + ); List> blocks = await fetched.ToListAsync( cancellationToken @@ -1273,8 +1276,7 @@ await GetBlockHashesAsync( await GetBlocksAsync( peer, - hashesAsArray, - cancellationToken + hashesAsArray ).ForEachAsync( block => { From b603e662c889f1b043cf5b2950c5853a87d5f883 Mon Sep 17 00:00:00 2001 From: Swen Mun Date: Fri, 7 Jun 2019 17:31:31 +0900 Subject: [PATCH 7/8] Catch ObjectDisposedException --- CHANGES.md | 2 ++ Libplanet/Net/Swarm.cs | 7 +++++++ 2 files changed, 9 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index 881dfbe44b..4eed583ceb 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -23,6 +23,8 @@ To be released. - Fixed a bug that `Swarm` reported `TaskCanceledException` as an unknown exception while stopping. [[#275]] +- Fixed a bug that `Swarm` didn't stop properly during `Swarm.Preload()`. + [[#275]] [#270]: https://github.com/planetarium/libplanet/pull/270 [#273]: https://github.com/planetarium/libplanet/issues/273 diff --git a/Libplanet/Net/Swarm.cs b/Libplanet/Net/Swarm.cs index 5f5f8984cd..084c2ab26f 100644 --- a/Libplanet/Net/Swarm.cs +++ b/Libplanet/Net/Swarm.cs @@ -1170,6 +1170,13 @@ await FillBlocksAsync( peer, synced, stop, progress, cancellationToken); break; } + + // We can't recover with TaskCanceledException and + // ObjectDisposedException. so just re-throw them. + catch (ObjectDisposedException) + { + throw; + } catch (TaskCanceledException) { throw; From 1091414dbcc8e63c2aeb1f9b12f26af96a5c3631 Mon Sep 17 00:00:00 2001 From: Swen Mun Date: Mon, 10 Jun 2019 11:03:17 +0900 Subject: [PATCH 8/8] Apply suggestions from code review Co-Authored-By: Seunghun Lee --- CHANGES.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 4eed583ceb..6f5a0e796a 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -21,9 +21,9 @@ To be released. ### Bug fixes -- Fixed a bug that `Swarm` reported `TaskCanceledException` as an unknown + - Fixed a bug that `Swarm` reported `TaskCanceledException` as an unknown exception while stopping. [[#275]] -- Fixed a bug that `Swarm` didn't stop properly during `Swarm.Preload()`. + - Fixed a bug that `Swarm` didn't stop properly during `Swarm.Preload()`. [[#275]] [#270]: https://github.com/planetarium/libplanet/pull/270