Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Swarm.StopAsync() #275

Merged
merged 8 commits into from
Jun 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,14 @@ To be released.

### Bug fixes

- Fixed a bug that `Swarm` reported `TaskCanceledException` as an unknown
exception while stopping. [[#275]]
- Fixed a bug that `Swarm` didn't stop properly during `Swarm.Preload()`.
[[#275]]

[#270]: https://github.com/planetarium/libplanet/pull/270
[#273]: https://github.com/planetarium/libplanet/issues/273
[#275]: https://github.com/planetarium/libplanet/pull/275
[#276]: https://github.com/planetarium/libplanet/pull/276


Expand Down
17 changes: 8 additions & 9 deletions Libplanet.Tests/Net/SwarmTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,19 +110,20 @@ public async Task CanNotStartTwice()
}

[Fact(Timeout = Timeout)]
public async Task CanStop()
public async Task StopAsync()
{
Swarm swarm = _swarms[0];
BlockChain<DumbAction> chain = _blockchains[0];

await swarm.StopAsync();
Task task = await StartAsync(swarm, chain);
var task = await StartAsync(swarm, chain);

Assert.True(swarm.Running);
await swarm.StopAsync();

Assert.False(swarm.Running);
await task;

Assert.False(task.IsFaulted);
}

[Fact(Timeout = Timeout)]
Expand Down Expand Up @@ -867,12 +868,10 @@ private async Task<Task> StartAsync<T>(
)
where T : IAction, new()
{
Task task = Task.Run(
async () => await swarm.StartAsync(
blockChain,
200,
cancellationToken
)
Task task = swarm.StartAsync(
blockChain,
200,
cancellationToken
);
await swarm.WaitForRunningAsync();
return task;
Expand Down
2 changes: 1 addition & 1 deletion Libplanet/Libplanet.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ https://docs.libplanet.io/</Description>
</ItemGroup>

<ItemGroup>
<PackageReference Include="AsyncEnumerator" Version="2.2.1" />
<PackageReference Include="AsyncEnumerator" Version="2.2.2" />
<PackageReference Include="Bencodex" Version="0.1.0" />
<PackageReference Include="BouncyCastle.NetCore" Version="1.8.3" />
<PackageReference Include="Equals.Fody" Version="1.9.6" />
Expand Down
130 changes: 80 additions & 50 deletions Libplanet/Net/Swarm.cs
Original file line number Diff line number Diff line change
Expand Up @@ -516,19 +516,20 @@ public async Task StartAsync<T>(

try
{
_workerCancellationTokenSource = new CancellationTokenSource();
CancellationToken workerCancellationToken =
CancellationTokenSource.CreateLinkedTokenSource(
_workerCancellationTokenSource.Token, cancellationToken
).Token;

using (await _runningMutex.LockAsync())
{
Running = true;
await PreloadAsync(
blockChain,
cancellationToken: cancellationToken);
cancellationToken: workerCancellationToken);
}

_workerCancellationTokenSource = new CancellationTokenSource();
CancellationToken workerCancellationToken =
CancellationTokenSource.CreateLinkedTokenSource(
_workerCancellationTokenSource.Token, cancellationToken
).Token;
var tasks = new List<Task>
{
RepeatDeltaDistributionAsync(
Expand All @@ -547,7 +548,11 @@ await PreloadAsync(
tasks.Add(RefreshPermissions(workerCancellationToken));
}

await Task.WhenAny(tasks);
await await Task.WhenAny(tasks);
}
catch (TaskCanceledException e)
{
_logger.Information(e, "Task was canceled.");
}
catch (Exception e)
{
Expand Down Expand Up @@ -636,7 +641,8 @@ await socket.SendMultipartMessageAsync(
cancellationToken: token);

NetMQMessage response =
await socket.ReceiveMultipartMessageAsync();
await socket.ReceiveMultipartMessageAsync(
cancellationToken: token);
Message parsedMessage = Message.Parse(response, reply: true);
if (parsedMessage is BlockHashes blockHashes)
{
Expand All @@ -651,8 +657,7 @@ await socket.SendMultipartMessageAsync(

internal IAsyncEnumerable<Block<T>> GetBlocksAsync<T>(
Peer peer,
IEnumerable<HashDigest<SHA256>> blockHashes,
CancellationToken token = default(CancellationToken))
IEnumerable<HashDigest<SHA256>> blockHashes)
where T : IAction, new()
{
if (!_peers.ContainsKey(peer))
Expand All @@ -663,6 +668,7 @@ internal IAsyncEnumerable<Block<T>> GetBlocksAsync<T>(

return new AsyncEnumerable<Block<T>>(async yield =>
{
CancellationToken yieldToken = yield.CancellationToken;
using (var socket = new DealerSocket(ToNetMQAddress(peer)))
{
var blockHashesAsArray =
Expand All @@ -671,16 +677,17 @@ blockHashes as HashDigest<SHA256>[] ??
var request = new GetBlocks(blockHashesAsArray);
await socket.SendMultipartMessageAsync(
request.ToNetMQMessage(_privateKey),
cancellationToken: token);
cancellationToken: yieldToken);

int hashCount = blockHashesAsArray.Count();
_logger.Debug($"Required block count: {hashCount}.");
while (hashCount > 0)
_logger.Debug(
$"Required block count: {hashCount}. {yieldToken}");
while (hashCount > 0 && !yieldToken.IsCancellationRequested)
{
_logger.Debug("Receiving block...");
NetMQMessage response =
await socket.ReceiveMultipartMessageAsync(
cancellationToken: token);
cancellationToken: yieldToken);
Message parsedMessage = Message.Parse(response, true);
if (parsedMessage is Messages.Blocks blockMessage)
{
Expand Down Expand Up @@ -930,16 +937,18 @@ private async Task ReceiveMessageAsync<T>(

// Queue a task per message to avoid blocking.
#pragma warning disable CS4014
Task.Run(async () =>
{
// it's still async because some method it relies are
// async yet.
await ProcessMessageAsync(
blockChain,
message,
cancellationToken
);
});
Task.Run(
async () =>
{
// it's still async because some method it relies
// are async yet.
await ProcessMessageAsync(
blockChain,
message,
cancellationToken
);
},
cancellationToken);
#pragma warning restore CS4014
}
catch (InvalidMessageException e)
Expand All @@ -949,6 +958,10 @@ await ProcessMessageAsync(
"Could not parse NetMQMessage properly; ignore."
);
}
catch (TaskCanceledException e)
{
_logger.Information(e, "Task was canceled.");
}
catch (Exception e)
{
_logger.Error(
Expand Down Expand Up @@ -1061,14 +1074,18 @@ private async Task ProcessBlockHashes<T>(
$"Trying to GetBlocksAsync() " +
$"(using {message.Hashes.Count()} hashes)");
IAsyncEnumerable<Block<T>> fetched = GetBlocksAsync<T>(
peer, message.Hashes, cancellationToken);
peer,
message.Hashes
);

List<Block<T>> blocks = await fetched.ToListAsync();
List<Block<T>> blocks = await fetched.ToListAsync(
cancellationToken
);
_logger.Debug("GetBlocksAsync() complete.");

try
{
using (await _blockSyncMutex.LockAsync())
using (await _blockSyncMutex.LockAsync(cancellationToken))
{
await AppendBlocksAsync(
blockChain, peer, blocks, cancellationToken
Expand Down Expand Up @@ -1153,6 +1170,17 @@ await FillBlocksAsync(
peer, synced, stop, progress, cancellationToken);
break;
}

// We can't recover with TaskCanceledException and
// ObjectDisposedException. so just re-throw them.
catch (ObjectDisposedException)
{
throw;
}
catch (TaskCanceledException)
{
throw;
}
catch (Exception e)
{
if (retry > 0)
Expand Down Expand Up @@ -1228,7 +1256,7 @@ private async Task FillBlocksAsync<T>(
CancellationToken cancellationToken)
where T : IAction, new()
{
while (true)
while (!cancellationToken.IsCancellationRequested)
{
BlockLocator locator = blockChain.GetBlockLocator();
IEnumerable<HashDigest<SHA256>> hashes =
Expand All @@ -1255,29 +1283,31 @@ await GetBlockHashesAsync(

await GetBlocksAsync<T>(
peer,
hashesAsArray,
cancellationToken
).ForEachAsync(block =>
{
_logger.Debug($"Trying to append block[{block.Hash}]...");

// As actions in this block should be rendered
// after actions in stale blocks are unrendered,
// given the `render: false` option here.
blockChain.Append(
block,
DateTimeOffset.UtcNow,
render: false
);
received++;
progress?.Report(new BlockDownloadState()
hashesAsArray
).ForEachAsync(
block =>
{
TotalBlockCount = hashCount,
ReceivedBlockCount = received,
ReceivedBlockHash = block.Hash,
});
_logger.Debug($"Block[{block.Hash}] is appended.");
});
_logger.Debug(
$"Trying to append block[{block.Hash}]...");

// As actions in this block should be rendered
// after actions in stale blocks are unrendered,
// given the `render: false` option here.
blockChain.Append(
block,
DateTimeOffset.UtcNow,
render: false
);
received++;
progress?.Report(new BlockDownloadState
{
TotalBlockCount = hashCount,
ReceivedBlockCount = received,
ReceivedBlockHash = block.Hash,
});
_logger.Debug($"Block[{block.Hash}] is appended.");
},
cancellationToken);
}
}

Expand Down