Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distinguish the starting stages of Swarm clearly #760

Merged
merged 3 commits into from
Jan 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ To be released.
relaying proxy concurrently. [[#744]]
- `Swarm<T>` became to throw `InvalidGenesisBlockException` when receiving
block from the nodes that have a different genesis block. [[#746]]
- `Swarm<T>` became to distinguish the starting stages clearly. In other words,
`Swarm<T>.StartAsync()` became not to call `Swarm<T>.PreloadAsync()`. [[#735], [#760]]

### Bug fixes

Expand Down Expand Up @@ -179,6 +181,7 @@ To be released.
[#728]: https://github.com/planetarium/libplanet/pull/728
[#732]: https://github.com/planetarium/libplanet/pull/732
[#734]: https://github.com/planetarium/libplanet/pull/734
[#735]: https://github.com/planetarium/libplanet/issues/735
[#736]: https://github.com/planetarium/libplanet/pull/736
[#739]: https://github.com/planetarium/libplanet/pull/739
[#744]: https://github.com/planetarium/libplanet/pull/744
Expand All @@ -189,6 +192,7 @@ To be released.
[#757]: https://github.com/planetarium/libplanet/pull/757
[#758]: https://github.com/planetarium/libplanet/pull/758
[#759]: https://github.com/planetarium/libplanet/pull/759
[#760]: https://github.com/planetarium/libplanet/pull/760


Version 0.7.0
Expand Down
2 changes: 1 addition & 1 deletion Libplanet.Benchmarks/SwarmBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ private async Task<Task> StartAsync<T>(
)
where T : IAction, new()
{
Task task = swarm.StartAsync(200, 200, null, cancellationToken);
Task task = swarm.StartAsync(200, 200, cancellationToken);
await swarm.WaitForRunningAsync();
return task;
}
Expand Down
7 changes: 4 additions & 3 deletions Libplanet.Tests/Net/SwarmTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ public async Task AutoConnectAfterStart()
Assert.Empty(swarmB.Peers);

await StartAsync(swarmA);
await Task.Delay(100);
Assert.Contains(swarmA.AsPeer, swarmB.Peers);
}
finally
Expand Down Expand Up @@ -1403,7 +1404,7 @@ public async Task InitialBlockDownloadStates()
var trustedStateValidators = new[] { minerSwarm.Address }.ToImmutableHashSet();

await receiverSwarm.PreloadAsync(trustedStateValidators: trustedStateValidators);
await receiverSwarm.PreloadAsync(true);
await receiverSwarm.PreloadAsync();
var state = receiverChain.GetState(address);

Assert.Equal((Text)"foo,bar,baz", state);
Expand Down Expand Up @@ -2412,7 +2413,7 @@ public async Task HandleReorgInSynchronizing()
await BootstrapAsync(miner2, miner1.AsPeer);
await BootstrapAsync(receiver, miner1.AsPeer);

var t = receiver.PreloadAsync(render: true);
var t = receiver.PreloadAsync();
await miner1.BlockChain.MineBlock(miner1.Address);
await miner2.BlockChain.MineBlock(miner2.Address);
Block<Sleep> latest = await miner2.BlockChain.MineBlock(miner2.Address);
Expand Down Expand Up @@ -2765,7 +2766,7 @@ private async Task<Task> StartAsync<T>(
)
where T : IAction, new()
{
Task task = swarm.StartAsync(200, 200, null, cancellationToken);
Task task = swarm.StartAsync(200, 200, cancellationToken);
await swarm.WaitForRunningAsync();
return task;
}
Expand Down
98 changes: 26 additions & 72 deletions Libplanet/Net/Swarm.cs
Original file line number Diff line number Diff line change
Expand Up @@ -223,13 +223,11 @@ public async Task StopAsync(
public async Task StartAsync(
int millisecondsDialTimeout = 15000,
int millisecondsBroadcastTxInterval = 5000,
EventHandler<PreloadBlockDownloadFailEventArgs> preloadBlockDownloadFailed = null,
CancellationToken cancellationToken = default(CancellationToken))
{
await StartAsync(
TimeSpan.FromMilliseconds(millisecondsDialTimeout),
TimeSpan.FromMilliseconds(millisecondsBroadcastTxInterval),
preloadBlockDownloadFailed: preloadBlockDownloadFailed,
cancellationToken
);
}
Expand All @@ -242,13 +240,7 @@ await StartAsync(
/// </param>
/// <param name="broadcastTxInterval">The time period of exchange of staged transactions.
/// </param>
/// <param name="preloadBlockDownloadFailed">
/// The <see cref="EventHandler" /> triggered when
/// <see cref="PreloadAsync(TimeSpan?, IProgress{PreloadState}, IImmutableSet{Address},
/// EventHandler{PreloadBlockDownloadFailEventArgs}, CancellationToken)" />
/// fails to download blocks.
/// </param>
/// /// <param name="cancellationToken">
/// <param name="cancellationToken">
/// A cancellation token used to propagate notification that this
/// operation should be canceled.
/// </param>
Expand All @@ -267,7 +259,6 @@ await StartAsync(
public async Task StartAsync(
TimeSpan dialTimeout,
TimeSpan broadcastTxInterval,
EventHandler<PreloadBlockDownloadFailEventArgs> preloadBlockDownloadFailed = null,
CancellationToken cancellationToken = default(CancellationToken))
{
var tasks = new List<Task>();
Expand All @@ -280,15 +271,6 @@ public async Task StartAsync(
_logger.Debug("Starting swarm...");
_logger.Debug("Peer information : {Peer}", AsPeer);

using (await _runningMutex.LockAsync())
{
await PreloadAsync(
dialTimeout: dialTimeout,
render: true,
cancellationToken: _cancellationToken
);
}

try
{
tasks.Add(_transport.RunAsync(_cancellationToken));
Expand Down Expand Up @@ -407,53 +389,10 @@ public string TraceTable()
/// A task without value.
/// You only can <c>await</c> until the method is completed.
/// </returns>
/// <remarks>This does not render downloaded <see cref="IAction"/>s, but fills states only.
/// If you want to render all <see cref="IAction"/>s from the genesis block to the recent
/// blocks use
/// <see cref="StartAsync(TimeSpan, TimeSpan, EventHandler{PreloadBlockDownloadFailEventArgs}, CancellationToken)"/>
/// method instead.</remarks>
/// <exception cref="AggregateException">Thrown when the given the block downloading is
/// failed and if <paramref name="blockDownloadFailed "/> is <c>null</c>.</exception>
#pragma warning restore MEN002 // Line is too long
public Task PreloadAsync(
TimeSpan? dialTimeout = null,
IProgress<PreloadState> progress = null,
IImmutableSet<Address> trustedStateValidators = null,
EventHandler<PreloadBlockDownloadFailEventArgs> blockDownloadFailed = null,
CancellationToken cancellationToken = default(CancellationToken)
)
{
return PreloadAsync(
render: false,
dialTimeout: dialTimeout,
progress: progress,
trustedStateValidators: trustedStateValidators,
blockDownloadFailed: blockDownloadFailed,
cancellationToken: cancellationToken
);
}

public async Task<BoundPeer> FindSpecificPeerAsync(
Address target,
Address searchAddress,
int depth,
BoundPeer viaPeer,
TimeSpan? timeout,
CancellationToken cancellationToken)
{
NetMQTransport netMQTransport = (NetMQTransport)_transport;
return await netMQTransport.FindSpecificPeerAsync(
target,
searchAddress,
depth,
viaPeer,
timeout,
cancellationToken);
}

// FIXME: It is not guaranteed that states will be reported in order. see issue #436, #430
internal async Task PreloadAsync(
bool render,
public async Task PreloadAsync(
TimeSpan? dialTimeout = null,
IProgress<PreloadState> progress = null,
IImmutableSet<Address> trustedStateValidators = null,
Expand Down Expand Up @@ -506,12 +445,15 @@ await DialToExistingPeers(dialTimeout, cancellationToken)
"Try to download blocks from {EndPoint}@{Address}.",
peerWithHeight.Peer.EndPoint,
peerWithHeight.Peer.Address.ToHex());

// FIXME: It is not guaranteed that states will be reported in order.
// see issue #436, #430
await SyncBehindsBlocksFromPeerAsync(
workspace,
peerWithHeight,
progress,
cancellationToken,
render
false
);
}
catch (Exception e)
Expand Down Expand Up @@ -556,14 +498,6 @@ await SyncBehindsBlocksFromPeerAsync(
// it doesn't need to receive states from other peers at all.
return;
}
else if (render)
{
// If it's already rendered by SyncBehindsBlocksFromPeersAsync() method
// it means states are already calculated so that it does not need to receive
// calculated states from trusted peers.
complete = true;
return;
}

long height = workspace.Tip.Index;

Expand All @@ -575,6 +509,8 @@ await SyncBehindsBlocksFromPeerAsync(
.OrderByDescending(pair => pair.Item2)
.Select(pair => (pair.Item1, workspace[pair.Item2.Value].Hash));

// FIXME: It is not guaranteed that states will be reported in order.
// see issue #436, #430
long? receivedStateHeight = await SyncRecentStatesFromTrustedPeersAsync(
workspace,
progress,
Expand Down Expand Up @@ -633,6 +569,24 @@ await SyncBehindsBlocksFromPeerAsync(
}
}

public async Task<BoundPeer> FindSpecificPeerAsync(
Address target,
Address searchAddress,
int depth,
BoundPeer viaPeer,
TimeSpan? timeout,
CancellationToken cancellationToken)
{
NetMQTransport netMQTransport = (NetMQTransport)_transport;
return await netMQTransport.FindSpecificPeerAsync(
target,
searchAddress,
depth,
viaPeer,
timeout,
cancellationToken);
}

internal async Task AddPeersAsync(
IEnumerable<Peer> peers,
TimeSpan? timeout,
Expand Down