Skip to content

Commit

Permalink
Improve ClusterClientDiscovery feature (#7274)
Browse files Browse the repository at this point in the history
* Improve ClusterClientDiscovery feature

* Update API Approval list

* Fix MNTR unit test

* Add specific receptionist path verification error handling
  • Loading branch information
Arkatufus authored Jul 3, 2024
1 parent 7f37465 commit e4e4ca4
Show file tree
Hide file tree
Showing 8 changed files with 345 additions and 167 deletions.
2 changes: 1 addition & 1 deletion src/Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<PropertyGroup>
<Copyright>Copyright © 2013-2023 Akka.NET Team</Copyright>
<Authors>Akka.NET Team</Authors>
<VersionPrefix>1.5.25</VersionPrefix>
<VersionPrefix>1.5.26</VersionPrefix>
<PackageIcon>akkalogo.png</PackageIcon>
<PackageProjectUrl>https://github.com/akkadotnet/akka.net</PackageProjectUrl>
<PackageLicenseUrl>https://github.com/akkadotnet/akka.net/blob/master/LICENSE</PackageLicenseUrl>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Akka.Management" Version="1.5.26-beta3" />
<PackageReference Include="Akka.MultiNode.TestAdapter" Version="$(MultiNodeAdapterVersion)" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="$(TestSdkVersion)" />
<PackageReference Include="FluentAssertions" Version="$(FluentAssertionsVersion)" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,20 @@
using Akka.Configuration;
using Akka.Discovery;
using Akka.Discovery.Config;
using Akka.Event;
using Akka.Management.Dsl;
using Akka.MultiNode.TestAdapter;
using Akka.Remote.TestKit;
using Akka.TestKit.TestActors;
using FluentAssertions;
using FluentAssertions.Extensions;

namespace Akka.Cluster.Tools.Tests.MultiNode.Client
{
public sealed class ClusterClientDiscoverySpecConfig : MultiNodeConfig
{
public static readonly int[] HttpPorts = { 30001, 30002, 30003 };

public RoleName Client { get; }
public RoleName First { get; }
public RoleName Second { get; }
Expand All @@ -36,15 +41,19 @@ public ClusterClientDiscoverySpecConfig()
Third = Role("third");

CommonConfig = ConfigurationFactory.ParseString("""
akka.loglevel = INFO
akka.remote.dot-netty.tcp.hostname = localhost
akka.loglevel = DEBUG
akka.actor.provider = cluster
akka.remote.dot-netty.tcp.hostname = localhost
akka.remote.dot-netty.tcp.port = 0
akka.remote.log-remote-lifecycle-events = off
akka.management.http.hostname = "127.0.0.1"
akka.cluster.client {
heartbeat-interval = 1d
acceptable-heartbeat-pause = 1d
reconnect-timeout = 3s
reconnect-timeout = 1s
refresh-contacts-interval = 1d
}
akka.test.filter-leeway = 10s
Expand All @@ -53,6 +62,10 @@ public ClusterClientDiscoverySpecConfig()
.WithFallback(DistributedPubSub.DefaultConfig())
.WithFallback(MultiNodeClusterSpec.ClusterConfig());

NodeConfig(new[]{ First }, new[]{ ConfigurationFactory.ParseString($"akka.management.http.port = {HttpPorts[0]}") });
NodeConfig(new[]{ Second }, new[]{ ConfigurationFactory.ParseString($"akka.management.http.port = {HttpPorts[1]}") });
NodeConfig(new[]{ Third }, new[]{ ConfigurationFactory.ParseString($"akka.management.http.port = {HttpPorts[2]}") });

NodeConfig(new[]{ Client },
new []{
ConfigurationFactory.ParseString("""
Expand All @@ -61,15 +74,14 @@ public ClusterClientDiscoverySpecConfig()
heartbeat-interval = 1s
acceptable-heartbeat-pause = 2s
use-initial-contacts-discovery = true
reconnect-timeout = 4s
reconnect-timeout = 3s
verbose-logging = true
discovery
{
service-name = test-cluster
discovery-timeout = 10s
probe-timeout = 1s
}
}
discovery
{
Expand Down Expand Up @@ -124,58 +136,61 @@ public void ClusterClientDiscoverySpecs()

private void ClusterClient_must_startup_cluster_with_single_node()
{
Within(TimeSpan.FromSeconds(30), () =>
Join(_config.First, _config.First);

RunOn(() =>
{
Join(_config.First, _config.First);
RunOn(() =>
{
var service = Sys.ActorOf(EchoActor.Props(this), "testService");
ClusterClientReceptionist.Get(Sys).RegisterService(service);
AwaitMembersUp(1);
}, _config.First);
EnterBarrier("cluster-started");
AkkaManagement.Get(Sys).Start().Wait();
var service = Sys.ActorOf(EchoActor.Props(this), "testService");
ClusterClientReceptionist.Get(Sys).RegisterService(service);
AwaitMembersUp(1);
}, _config.First);

EnterBarrier("cluster-started");

RunOn(() =>
{
_discoveryService =
(ConfigServiceDiscovery)Discovery.Discovery.Get(Sys).LoadServiceDiscovery("config");
var address = GetAddress(_config.First);
_discoveryService.TryAddEndpoint("test-cluster", new ServiceDiscovery.ResolvedTarget(address.Host, address.Port));
RunOn(() =>
{
_discoveryService =
(ConfigServiceDiscovery)Discovery.Discovery.Get(Sys).LoadServiceDiscovery("config");
var address = GetAddress(_config.First);
_discoveryService.TryAddEndpoint("test-cluster",
new ServiceDiscovery.ResolvedTarget(address.Host, ClusterClientDiscoverySpecConfig.HttpPorts[0]));

var resolved = _discoveryService.Lookup(new Lookup("test-cluster"), TimeSpan.FromSeconds(1)).Result;
resolved.Addresses.Count.Should().Be(1);
}, _config.Client);
EnterBarrier("discovery-entry-added");
});
var resolved = _discoveryService.Lookup(new Lookup("test-cluster"), TimeSpan.FromSeconds(1)).Result;
resolved.Addresses.Count.Should().Be(1);
}, _config.Client);

EnterBarrier("discovery-entry-added");
}

private void ClusterClient_must_establish_connection_to_first_node()
{
RunOn(() =>
{
_clusterClient = Sys.ActorOf(ClusterClient.Props(ClusterClientSettings.Create(Sys)), "client1");
{
_clusterClient = Sys.ActorOf(ClusterClient.Props(ClusterClientSettings.Create(Sys)), "client1");

Within(TimeSpan.FromSeconds(5), () =>
{
AwaitAssert(() =>
{
_clusterClient.Tell(GetContactPoints.Instance, TestActor);
var contacts = ExpectMsg<ContactPoints>(TimeSpan.FromSeconds(1)).ContactPointsList;
contacts.Count.Should().Be(1);
contacts.First().Address.Should().Be(Node(_config.First).Address);
}, RemainingOrDefault);
});
AwaitAssert(() =>
{
_clusterClient.Tell(GetContactPoints.Instance, TestActor);
var contacts = ExpectMsg<ContactPoints>(TimeSpan.FromSeconds(1)).ContactPointsList;
contacts.Count.Should().Be(1);
contacts.First().Address.Should().Be(Node(_config.First).Address);
}, 10.Seconds());

_clusterClient.Tell(new ClusterClient.Send("/user/testService", "hello", localAffinity:true));
ExpectMsg<string>().Should().Be("hello");
}, _config.Client);
_clusterClient.Tell(new ClusterClient.Send("/user/testService", "hello", localAffinity:true));
FishForMessage(msg => msg is string).Should().Be("hello");
}, _config.Client);

EnterBarrier("established");
}

private void ClusterClient_must_down_existing_cluster()
{
RunOn(() =>
{
AkkaManagement.Get(Sys).Stop().Wait();

Cluster.Get(Sys).Leave(Node(_config.First).Address);
}, _config.First);

Expand All @@ -184,11 +199,13 @@ private void ClusterClient_must_down_existing_cluster()
RunOn(() =>
{
var address = GetAddress(_config.First);
_discoveryService.TryRemoveEndpoint("test-cluster", new ServiceDiscovery.ResolvedTarget(address.Host, address.Port));
_discoveryService.TryRemoveEndpoint("test-cluster",
new ServiceDiscovery.ResolvedTarget(address.Host, ClusterClientDiscoverySpecConfig.HttpPorts[0]));

var resolved = _discoveryService.Lookup(new Lookup("test-cluster"), TimeSpan.FromSeconds(1)).Result;
resolved.Addresses.Count.Should().Be(0);
}, _config.Client);

EnterBarrier("discovery-entry-removed");

}
Expand All @@ -198,6 +215,8 @@ private void ClusterClient_second_node_must_form_a_new_cluster()
Join(_config.Second, _config.Second);
RunOn(() =>
{
AkkaManagement.Get(Sys).Start().Wait();

var service = Sys.ActorOf(EchoActor.Props(this), "testService");
ClusterClientReceptionist.Get(Sys).RegisterService(service);
AwaitMembersUp(1);
Expand All @@ -208,43 +227,45 @@ private void ClusterClient_second_node_must_form_a_new_cluster()
RunOn(() =>
{
var address = GetAddress(_config.Second);
_discoveryService.TryAddEndpoint("test-cluster", new ServiceDiscovery.ResolvedTarget(address.Host, address.Port));
_discoveryService.TryAddEndpoint("test-cluster",
new ServiceDiscovery.ResolvedTarget(address.Host, ClusterClientDiscoverySpecConfig.HttpPorts[1]));

var resolved = _discoveryService.Lookup(new Lookup("test-cluster"), TimeSpan.FromSeconds(1)).Result;
resolved.Addresses.Count.Should().Be(1);
}, _config.Client);

EnterBarrier("discovery-entry-updated");
}

private void ClusterClient_must_re_establish_on_cluster_restart()
{
RunOn(() =>
{
Within(TimeSpan.FromSeconds(5), () =>
AwaitAssert(() =>
{
AwaitAssert(() =>
{
_clusterClient.Tell(GetContactPoints.Instance, TestActor);
var contacts = ExpectMsg<ContactPoints>(TimeSpan.FromSeconds(1)).ContactPointsList;
contacts.Count.Should().Be(1);
contacts.First().Address.Should().Be(Node(_config.Second).Address);
}, RemainingOrDefault);
});
_clusterClient.Tell(GetContactPoints.Instance, TestActor);
var contacts = ExpectMsg<ContactPoints>(TimeSpan.FromSeconds(1)).ContactPointsList;
contacts.Count.Should().Be(1);
contacts.First().Address.Should().Be(Node(_config.Second).Address);
}, 10.Seconds());

_clusterClient.Tell(new ClusterClient.Send("/user/testService", "hello", localAffinity: true));
ExpectMsg<string>().Should().Be("hello");

FishForMessage(msg => msg is string).Should().Be("hello");
}, _config.Client);

EnterBarrier("re-establish-successful");
}

private void ClusterClient_must_simulate_a_cluster_forced_shutdown()
{
RunOn(() =>
{
AkkaManagement.Get(Sys).Stop().Wait();

// simulate a hard shutdown
TestConductor.Exit(_config.Second, 0).Wait();
}, _config.Client);

EnterBarrier("hard-shutdown-and-discovery-entry-updated");
}

Expand All @@ -253,6 +274,8 @@ private void ClusterClient_third_node_formed_a_cluster()
Join(_config.Third, _config.Third);
RunOn(() =>
{
AkkaManagement.Get(Sys).Start().Wait();

var service = Sys.ActorOf(EchoActor.Props(this), "testService");
ClusterClientReceptionist.Get(Sys).RegisterService(service);
AwaitMembersUp(1);
Expand All @@ -263,33 +286,32 @@ private void ClusterClient_third_node_formed_a_cluster()
RunOn(() =>
{
var address = GetAddress(_config.Third);
_discoveryService.TryAddEndpoint("test-cluster", new ServiceDiscovery.ResolvedTarget(address.Host, address.Port));
_discoveryService.TryAddEndpoint("test-cluster",
new ServiceDiscovery.ResolvedTarget(address.Host, ClusterClientDiscoverySpecConfig.HttpPorts[2]));

var resolved = _discoveryService.Lookup(new Lookup("test-cluster"), TimeSpan.FromSeconds(1)).Result;
resolved.Addresses.Count.Should().Be(2);
}, _config.Client);

EnterBarrier("discovery-entry-updated");
}

private void ClusterClient_must_re_establish_on_cluster_restart_after_hard_shutdown()
{
RunOn(() =>
{
Within(TimeSpan.FromSeconds(20), () =>
AwaitAssert(() =>
{
AwaitAssert(() =>
{
_clusterClient.Tell(GetContactPoints.Instance, TestActor);
var contacts = ExpectMsg<ContactPoints>(TimeSpan.FromSeconds(1)).ContactPointsList;
contacts.Count.Should().Be(1);
contacts.First().Address.Should().Be(Node(_config.Third).Address);
}, TimeSpan.FromSeconds(20));
});
_clusterClient.Tell(GetContactPoints.Instance, TestActor);
var contacts = ExpectMsg<ContactPoints>(TimeSpan.FromSeconds(1)).ContactPointsList;
contacts.Count.Should().Be(1);
contacts.First().Address.Should().Be(Node(_config.Third).Address);
}, 20.Seconds());

_clusterClient.Tell(new ClusterClient.Send("/user/testService", "hello", localAffinity: true));
ExpectMsg<string>().Should().Be("hello");

FishForMessage(msg => msg is string).Should().Be("hello");
}, _config.Client);

EnterBarrier("re-establish-successful");
}

Expand Down
Loading

0 comments on commit e4e4ca4

Please sign in to comment.