diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingRememberEntitiesNewExtractorSpec.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingRememberEntitiesNewExtractorSpec.cs index e15c9ac9ccb..4b6250782c6 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingRememberEntitiesNewExtractorSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingRememberEntitiesNewExtractorSpec.cs @@ -63,6 +63,7 @@ class = ""Akka.Cluster.Sharding.Tests.MemoryJournalShared, Akka.Cluster.Sharding }} ")) .WithFallback(Sharding.ClusterSharding.DefaultConfig()) + .WithFallback(DistributedData.DistributedData.DefaultConfig()) .WithFallback(Tools.Singleton.ClusterSingletonManager.DefaultConfig()) .WithFallback(MultiNodeClusterSpec.ClusterConfig()); @@ -177,7 +178,7 @@ protected ClusterShardingRememberEntitiesNewExtractorSpec(ClusterShardingRemembe EnterBarrier("startup"); } protected bool IsDDataMode { get; } - + protected override void AfterTermination() { base.AfterTermination(); diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingRememberEntitiesSpec.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingRememberEntitiesSpec.cs index 7b776ca9ff1..62061fdedc7 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingRememberEntitiesSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingRememberEntitiesSpec.cs @@ -63,6 +63,7 @@ class = ""Akka.Cluster.Sharding.Tests.MemoryJournalShared, Akka.Cluster.Sharding }} ")) .WithFallback(Sharding.ClusterSharding.DefaultConfig()) + .WithFallback(DistributedData.DistributedData.DefaultConfig()) .WithFallback(Tools.Singleton.ClusterSingletonManager.DefaultConfig()) .WithFallback(MultiNodeClusterSpec.ClusterConfig()); @@ -158,7 +159,7 @@ protected ClusterShardingRememberEntitiesSpec(ClusterShardingRememberEntitiesSpe } protected bool IsDDataMode { get; } - + protected override void AfterTermination() { base.AfterTermination(); diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingSpec.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingSpec.cs index 3a2ff20d60b..d18d2615d5c 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests.MultiNode/ClusterShardingSpec.cs @@ -77,7 +77,7 @@ protected ClusterShardingSpecConfig(string mode, string entityRecoveryStrategy = number-of-entities = 1 }} least-shard-allocation-strategy {{ - rebalance-threshold = 2 + rebalance-threshold = 1 max-simultaneous-rebalance = 1 }} distributed-data.durable.lmdb {{ @@ -468,7 +468,7 @@ private void CreateCoordinator() "coordinator", TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5), - 0.1, + 0.1, -1).WithDeploy(Deploy.Local); Sys.ActorOf(ClusterSingletonManager.Props( diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ClusterShardingConfigSpec.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ClusterShardingConfigSpec.cs index 18152aa79cc..21a9b5ce21f 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ClusterShardingConfigSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ClusterShardingConfigSpec.cs @@ -47,7 +47,7 @@ public void Should_cluster_sharding_settings_have_default_config() Assert.Equal("akka.cluster.singleton", config.GetString("coordinator-singleton")); Assert.Equal(string.Empty, config.GetString("use-dispatcher")); - Assert.Equal(10, config.GetInt("least-shard-allocation-strategy.rebalance-threshold")); + Assert.Equal(1, config.GetInt("least-shard-allocation-strategy.rebalance-threshold")); Assert.Equal(3, config.GetInt("least-shard-allocation-strategy.max-simultaneous-rebalance")); Assert.Equal("all", config.GetString("entity-recovery-strategy")); @@ -60,7 +60,7 @@ public void Should_cluster_sharding_settings_have_default_config() Assert.Equal("singleton", singletonConfig.GetString("singleton-name")); Assert.Equal(string.Empty, singletonConfig.GetString("role")); Assert.Equal(TimeSpan.FromSeconds(1), singletonConfig.GetTimeSpan("hand-over-retry-interval")); - Assert.Equal(10, singletonConfig.GetInt("min-number-of-hand-over-retries")); + Assert.Equal(15, singletonConfig.GetInt("min-number-of-hand-over-retries")); } } } diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ClusterShardingInternalsSpec.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ClusterShardingInternalsSpec.cs index bee6a932e0d..bac80564b98 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ClusterShardingInternalsSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/ClusterShardingInternalsSpec.cs @@ -78,5 +78,40 @@ public void ClusterSharding_must_start_a_region_in_proxy_mode_in_case_of_node_ro region.Should().BeSameAs(proxy); } + + [Fact] + public void HandOffStopper_must_stop_the_entity_even_if_the_entity_doesnt_handle_handOffStopMessage() + { + + var probe = CreateTestProbe(); + var shardName = "test"; + var emptyHandlerActor = Sys.ActorOf(Props.Create(() => new EmptyHandlerActor())); + var handOffStopper = Sys.ActorOf( + Props.Create(() => new ShardRegion.HandOffStopper(shardName, probe.Ref, new IActorRef[] { emptyHandlerActor }, HandOffStopMessage.Instsnce, TimeSpan.FromMilliseconds(10))) + ); + + Watch(emptyHandlerActor); + ExpectTerminated(emptyHandlerActor, TimeSpan.FromSeconds(1)); + + probe.ExpectMsg(new PersistentShardCoordinator.ShardStopped(shardName), TimeSpan.FromSeconds(1)); + + Watch(handOffStopper); + ExpectTerminated(handOffStopper, TimeSpan.FromSeconds(1)); + } + + internal class HandOffStopMessage : INoSerializationVerificationNeeded + { + public static readonly HandOffStopMessage Instsnce = new HandOffStopMessage(); + private HandOffStopMessage() + { + } + } + + internal class EmptyHandlerActor : UntypedActor + { + protected override void OnReceive(object message) + { + } + } } } diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/CoordinatedShutdownShardingSpec.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/CoordinatedShutdownShardingSpec.cs index 38b2f5ae966..7a42a1aa9cf 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/CoordinatedShutdownShardingSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/CoordinatedShutdownShardingSpec.cs @@ -105,12 +105,15 @@ protected override void BeforeTermination() /// private void PingEntities() { - _region2.Tell(1, _probe2.Ref); - _probe2.ExpectMsg(10.Seconds()).Should().Be(1); - _region2.Tell(2, _probe2.Ref); - _probe2.ExpectMsg(10.Seconds()).Should().Be(2); - _region2.Tell(3, _probe2.Ref); - _probe2.ExpectMsg(10.Seconds()).Should().Be(3); + AwaitAssert(() => + { + _region2.Tell(1, _probe2.Ref); + _probe2.ExpectMsg(1.Seconds()).Should().Be(1); + _region2.Tell(2, _probe2.Ref); + _probe2.ExpectMsg(1.Seconds()).Should().Be(2); + _region2.Tell(3, _probe2.Ref); + _probe2.ExpectMsg(1.Seconds()).Should().Be(3); + }, TimeSpan.FromSeconds(10)); } [Fact] diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/LeastShardAllocationStrategySpec.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/LeastShardAllocationStrategySpec.cs index 9a13cae9610..810ba650a23 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/LeastShardAllocationStrategySpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/LeastShardAllocationStrategySpec.cs @@ -13,11 +13,120 @@ using Akka.TestKit.Xunit2; using Xunit; using FluentAssertions; +using System.Collections; +using System; namespace Akka.Cluster.Sharding.Tests { public class LeastShardAllocationStrategySpec : TestKitBase { + /// + /// Test dictionary, will keep the order of items as they were added + /// Needed because scala Map is having similar behaviour and tests depends on it + /// + /// + /// + internal sealed class ImmutableDictionaryKeepOrder : IImmutableDictionary + { + public static readonly ImmutableDictionaryKeepOrder Empty = new ImmutableDictionaryKeepOrder(ImmutableDictionary.Empty, ImmutableList>.Empty); + + private readonly ImmutableDictionary _dictionary = ImmutableDictionary.Empty; + private readonly ImmutableList> _items = ImmutableList>.Empty; + + private ImmutableDictionaryKeepOrder(ImmutableDictionary dictionary, ImmutableList> items) + { + _dictionary = dictionary; + _items = items; + } + + public TValue this[TKey key] => _dictionary[key]; + + public IEnumerable Keys => _items.Select(i => i.Key); + + public IEnumerable Values => _items.Select(i => i.Value); + + public int Count => _dictionary.Count; + + public IImmutableDictionary Add(TKey key, TValue value) + { + return new ImmutableDictionaryKeepOrder( + _dictionary.Add(key, value), + _items.Add(new KeyValuePair(key, value)) + ); + } + + public IImmutableDictionary AddRange(IEnumerable> pairs) + { + return new ImmutableDictionaryKeepOrder( + _dictionary.AddRange(pairs), + _items.AddRange(pairs) + ); + } + + public IImmutableDictionary Clear() + { + return Empty; + } + + public bool Contains(KeyValuePair pair) + { + return _dictionary.Contains(pair); + } + + public bool ContainsKey(TKey key) + { + return _dictionary.ContainsKey(key); + } + + public IEnumerator> GetEnumerator() + { + return _items.GetEnumerator(); + } + + public IImmutableDictionary Remove(TKey key) + { + return new ImmutableDictionaryKeepOrder( + _dictionary.Remove(key), + _items.RemoveAll(i => EqualityComparer.Default.Equals(key, i.Key)) + ); + //throw new NotSupportedException(); + } + + public IImmutableDictionary RemoveRange(IEnumerable keys) + { + return new ImmutableDictionaryKeepOrder( + _dictionary.RemoveRange(keys), + _items.RemoveAll(i => keys.Any(j => EqualityComparer.Default.Equals(j, i.Key))) + ); + //throw new NotSupportedException(); + } + + public IImmutableDictionary SetItem(TKey key, TValue value) + { + throw new NotSupportedException(); + } + + public IImmutableDictionary SetItems(IEnumerable> items) + { + throw new NotSupportedException(); + } + + public bool TryGetKey(TKey equalKey, out TKey actualKey) + { + return _dictionary.TryGetKey(equalKey, out actualKey); + } + + public bool TryGetValue(TKey key, out TValue value) + { + return _dictionary.TryGetValue(key, out value); + } + + IEnumerator IEnumerable.GetEnumerator() + { + return _items.GetEnumerator(); + } + } + private readonly IShardAllocationStrategy _allocationStrategy; private readonly IActorRef _regionA; private readonly IActorRef _regionB; @@ -32,95 +141,158 @@ public LeastShardAllocationStrategySpec() : base(new XunitAssertions(), "LeastSh _allocationStrategy = new LeastShardAllocationStrategy(3, 2); } + private IImmutableDictionary> CreateAllocations(int aCount, int bCount = 0, int cCount = 0) + { + var shards = Enumerable.Range(1, (aCount + bCount + cCount)).Select(i => i.ToString("000")); + + IImmutableDictionary> allocations = ImmutableDictionaryKeepOrder>.Empty; + allocations = allocations.Add(_regionA, shards.Take(aCount).ToImmutableList()); + allocations = allocations.Add(_regionB, shards.Skip(aCount).Take(bCount).ToImmutableList()); + allocations = allocations.Add(_regionC, shards.Skip(aCount + bCount).Take(cCount).ToImmutableList()); + return allocations; + } + [Fact] - public void LeastShardAllocationStrategy_should_allocate_to_region_with_least_number_of_shards() + public void LeastShardAllocationStrategy_must_allocate_to_region_with_least_number_of_shards() { - var allocations = new Dictionary> - { - {_regionA, new []{"shard1"}.ToImmutableList() }, - {_regionB, new []{"shard2"}.ToImmutableList() }, - {_regionC, ImmutableList.Empty } - }.ToImmutableDictionary(); + var allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold: 3, maxSimultaneousRebalance: 10); + var allocations = CreateAllocations(aCount: 1, bCount: 1); + allocationStrategy.AllocateShard(_regionA, "003", allocations).Result.Should().Be(_regionC); + } - var result = _allocationStrategy.AllocateShard(_regionA, "shard3", allocations).Result; - result.Should().Be(_regionC); + [Fact] + public void LeastShardAllocationStrategy_must_rebalance_from_region_with_most_number_of_shards_2_0_0_rebalanceThreshold_1() + { + var allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold: 1, maxSimultaneousRebalance: 10); + var allocations = CreateAllocations(aCount: 2); + allocationStrategy.Rebalance(allocations, ImmutableHashSet.Empty).Result.Should().BeEquivalentTo("001"); + allocationStrategy.Rebalance(allocations, ImmutableHashSet.Empty.Add("001")).Result.Should().BeEmpty(); } [Fact] - public void LeastShardAllocationStrategy_should_rebalance_from_region_with_most_number_of_shards() + public void LeastShardAllocationStrategy_must_not_rebalance_when_diff_equal_to_threshold_1_1_0_rebalanceThreshold_1() { - var allocations = new Dictionary> - { - {_regionA, new []{"shard1"}.ToImmutableList() }, - {_regionB, new []{"shard2", "shard3"}.ToImmutableList() }, - {_regionC, ImmutableList.Empty } - }.ToImmutableDictionary(); + var allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold: 1, maxSimultaneousRebalance: 10); + var allocations = CreateAllocations(aCount: 1, bCount: 1); + allocationStrategy.Rebalance(allocations, ImmutableHashSet.Empty).Result.Should().BeEmpty(); + } - // so far regionB has 2 shards and regionC has 0 shards, but the diff is less than rebalanceThreshold - var r1 = _allocationStrategy.Rebalance(allocations, ImmutableHashSet.Empty).Result; - r1.Count.Should().Be(0); + [Fact] + public void LeastShardAllocationStrategy_must_rebalance_from_region_with_most_number_of_shards_1_2_0_rebalanceThreshold_1() + { + var allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold: 1, maxSimultaneousRebalance: 10); + var allocations = CreateAllocations(aCount: 1, bCount: 2); + allocationStrategy.Rebalance(allocations, ImmutableHashSet.Empty).Result.Should().BeEquivalentTo("002"); + allocationStrategy.Rebalance(allocations, ImmutableHashSet.Empty.Add("002")).Result.Should().BeEmpty(); + } - allocations = allocations.SetItem(_regionB, new[] { "shard2", "shard3", "shard4" }.ToImmutableList()); - var r2 = _allocationStrategy.Rebalance(allocations, ImmutableHashSet.Empty).Result; - r2.Should().BeEquivalentTo(new[] { "shard2", "shard3" }); + [Fact] + public void LeastShardAllocationStrategy_must_rebalance_from_region_with_most_number_of_shards_3_0_0_rebalanceThreshold_1() + { + var allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold: 1, maxSimultaneousRebalance: 10); + var allocations = CreateAllocations(aCount: 3); + allocationStrategy.Rebalance(allocations, ImmutableHashSet.Empty).Result.Should().BeEquivalentTo("001"); + allocationStrategy.Rebalance(allocations, ImmutableHashSet.Empty.Add("001")).Result.Should().BeEquivalentTo("002"); + } - var r3 = _allocationStrategy.Rebalance(allocations, ImmutableHashSet.Create("shard4")).Result; - r3.Count.Should().Be(0); + [Fact] + public void LeastShardAllocationStrategy_must_rebalance_from_region_with_most_number_of_shards_4_4_0_rebalanceThreshold_1() + { + var allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold: 1, maxSimultaneousRebalance: 10); + var allocations = CreateAllocations(aCount: 4, bCount: 4); + allocationStrategy.Rebalance(allocations, ImmutableHashSet.Empty).Result.Should().BeEquivalentTo("001"); + allocationStrategy.Rebalance(allocations, ImmutableHashSet.Empty.Add("001")).Result.Should().BeEquivalentTo("005"); + } - allocations = allocations.SetItem(_regionA, new[] { "shard1", "shard5", "shard6" }.ToImmutableList()); - var r4 = _allocationStrategy.Rebalance(allocations, ImmutableHashSet.Create("shard1")).Result; - r4.Should().BeEquivalentTo(new[] { "shard2" }); + [Fact] + public void LeastShardAllocationStrategy_must_rebalance_from_region_with_most_number_of_shards_4_4_2_rebalanceThreshold_1() + { + var allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold: 1, maxSimultaneousRebalance: 10); + var allocations = CreateAllocations(aCount: 4, bCount: 4, cCount: 2); + allocationStrategy.Rebalance(allocations, ImmutableHashSet.Empty).Result.Should().BeEquivalentTo("001"); + // not optimal, 005 stopped and started again, but ok + allocationStrategy.Rebalance(allocations, ImmutableHashSet.Empty.Add("001")).Result.Should().BeEquivalentTo("005"); } [Fact] - public void LeastShardAllocationStrategy_should_rebalance_multiple_shards_if_max_simultaneous_rebalances_is_not_exceeded() + public void LeastShardAllocationStrategy_must_rebalance_rebalance_from_region_with_most_number_of_shards_1_3_0_rebalanceThreshold_2() { - var allocations = new Dictionary> - { - {_regionA, new []{"shard1"}.ToImmutableList() }, - {_regionB, new []{ "shard2", "shard3", "shard4", "shard5", "shard6" }.ToImmutableList() }, - {_regionC, ImmutableList.Empty} - }.ToImmutableDictionary(); + var allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold: 2, maxSimultaneousRebalance: 10); + var allocations = CreateAllocations(aCount: 1, bCount: 2); - var r1 = _allocationStrategy.Rebalance(allocations, ImmutableHashSet.Empty).Result; - r1.Should().BeEquivalentTo(new[] { "shard2", "shard3" }); + // so far regionB has 2 shards and regionC has 0 shards, but the diff is <= rebalanceThreshold + allocationStrategy.Rebalance(allocations, ImmutableHashSet.Empty).Result.Should().BeEmpty(); - var r2 = _allocationStrategy.Rebalance(allocations, ImmutableHashSet.Create("shard2", "shard3")).Result; - r2.Count.Should().Be(0); + var allocations2 = CreateAllocations(aCount: 1, bCount: 3); + allocationStrategy.Rebalance(allocations2, ImmutableHashSet.Empty).Result.Should().BeEquivalentTo("002"); + allocationStrategy.Rebalance(allocations2, ImmutableHashSet.Empty.Add("002")).Result.Should().BeEmpty(); } [Fact] - public void LeastShardAllocationStrategy_should_limit_number_of_simultaneous_rebalances() + public void LeastShardAllocationStrategy_must_not_rebalance_when_diff_equal_to_threshold_2_2_0_rebalanceThreshold_2() { - var allocations = new Dictionary> - { - {_regionA, new []{"shard1"}.ToImmutableList() }, - {_regionB, new []{ "shard2", "shard3", "shard4", "shard5", "shard6" }.ToImmutableList() }, - {_regionC, ImmutableList.Empty} - }.ToImmutableDictionary(); + var allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold: 2, maxSimultaneousRebalance: 10); + var allocations = CreateAllocations(aCount: 2, bCount: 2); + allocationStrategy.Rebalance(allocations, ImmutableHashSet.Empty).Result.Should().BeEmpty(); + } - var r1 = _allocationStrategy.Rebalance(allocations, ImmutableHashSet.Create("shard2")).Result; - r1.Should().BeEquivalentTo(new[] { "shard3" }); + [Fact] + public void LeastShardAllocationStrategy_must_rebalance_from_region_with_most_number_of_shards_3_3_0_rebalanceThreshold_2() + { + var allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold: 2, maxSimultaneousRebalance: 10); + var allocations = CreateAllocations(aCount: 3, bCount: 3); + allocationStrategy.Rebalance(allocations, ImmutableHashSet.Empty).Result.Should().BeEquivalentTo("001"); + allocationStrategy.Rebalance(allocations, ImmutableHashSet.Empty.Add("001")).Result.Should().BeEquivalentTo("004"); + allocationStrategy.Rebalance(allocations, ImmutableHashSet.Empty.Add("001").Add("004")).Result.Should().BeEmpty(); + } - var r2 = _allocationStrategy.Rebalance(allocations, ImmutableHashSet.Create("shard2", "shard3")).Result; - r2.Count.Should().Be(0); + [Fact] + public void LeastShardAllocationStrategy_must_rebalance_from_region_with_most_number_of_shards_4_4_0_rebalanceThreshold_2() + { + var allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold: 2, maxSimultaneousRebalance: 10); + var allocations = CreateAllocations(aCount: 4, bCount: 4); + allocationStrategy.Rebalance(allocations, ImmutableHashSet.Empty).Result.Should().BeEquivalentTo("001", "002"); + allocationStrategy.Rebalance(allocations, ImmutableHashSet.Empty.Add("001").Add("002")).Result.Should().BeEquivalentTo("005", "006"); + allocationStrategy.Rebalance(allocations, ImmutableHashSet.Empty.Add("001").Add("002").Add("005").Add("006")).Result.Should().BeEmpty(); } [Fact] - public void LeastShardAllocationStrategy_dont_rebalance_excessive_shards_if_maxSimultaneousRebalance_gt_rebalanceThreshold() + public void LeastShardAllocationStrategy_must_rebalance_from_region_with_most_number_of_shards_5_5_0_rebalanceThreshold_2() { - var allocationStrategy = new LeastShardAllocationStrategy(2, 5); - var allocations = new Dictionary> - { - {_regionA, new []{"shard1", "shard2", "shard3", "shard4", "shard5", "shard6", "shard7", "shard8"}.ToImmutableList() }, - {_regionB, new []{"shard9", "shard10", "shard11", "shard12" }.ToImmutableList() } - }.ToImmutableDictionary(); + var allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold: 2, maxSimultaneousRebalance: 10); + var allocations = CreateAllocations(aCount: 5, bCount: 5); + // optimal would => [4, 4, 2] or even => [3, 4, 3] + allocationStrategy.Rebalance(allocations, ImmutableHashSet.Empty).Result.Should().BeEquivalentTo("001", "002"); + // if 001 and 002 are not started quickly enough this is stopping more than optimal + allocationStrategy.Rebalance(allocations, ImmutableHashSet.Empty.Add("001").Add("002")).Result.Should().BeEquivalentTo("006", "007"); + allocationStrategy.Rebalance(allocations, ImmutableHashSet.Empty.Add("001").Add("002").Add("006").Add("007")).Result.Should().BeEquivalentTo("003"); + } - var r1 = allocationStrategy.Rebalance(allocations, ImmutableHashSet.Create("shard2")).Result; - r1.Should().BeEquivalentTo(new[] { "shard1", "shard3", "shard4" }); + [Fact] + public void LeastShardAllocationStrategy_must_rebalance_from_region_with_most_number_of_shards_50_50_0_rebalanceThreshold_2() + { + var allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold: 2, maxSimultaneousRebalance: 100); + var allocations = CreateAllocations(aCount: 50, bCount: 50); + allocationStrategy.Rebalance(allocations, ImmutableHashSet.Empty).Result.Should().BeEquivalentTo("001", "002"); + allocationStrategy.Rebalance(allocations, ImmutableHashSet.Empty.Add("001").Add("002")).Result.Should().BeEquivalentTo("051", "052"); + allocationStrategy.Rebalance(allocations, ImmutableHashSet.Empty.Add("001").Add("002").Add("051").Add("052")).Result.Should().BeEquivalentTo("003", "004"); + } - var r2 = _allocationStrategy.Rebalance(allocations, ImmutableHashSet.Create("shard5", "shard6", "shard7", "shard8")).Result; - r2.Count.Should().Be(0); + [Fact] + public void LeastShardAllocationStrategy_must_limit_number_of_simultaneous_rebalance() + { + var allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold: 3, maxSimultaneousRebalance: 2); + var allocations = CreateAllocations(aCount: 1, bCount: 10); + allocationStrategy.Rebalance(allocations, ImmutableHashSet.Empty).Result.Should().BeEquivalentTo("002", "003"); + allocationStrategy.Rebalance(allocations, ImmutableHashSet.Empty.Add("002").Add("003")).Result.Should().BeEmpty(); + } + + [Fact] + public void LeastShardAllocationStrategy_must_not_pick_shards_that_are_in_progress() + { + var allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold: 3, maxSimultaneousRebalance: 4); + var allocations = CreateAllocations(aCount: 10); + allocationStrategy.Rebalance(allocations, ImmutableHashSet.Empty.Add("002").Add("003")).Result.Should().BeEquivalentTo("001", "004"); } } } diff --git a/src/contrib/cluster/Akka.Cluster.Sharding.Tests/PersistentShardSpec.cs b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/PersistentShardSpec.cs new file mode 100644 index 00000000000..fa203365070 --- /dev/null +++ b/src/contrib/cluster/Akka.Cluster.Sharding.Tests/PersistentShardSpec.cs @@ -0,0 +1,88 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2016 Lightbend Inc. +// Copyright (C) 2013-2016 Akka.NET project +// +//----------------------------------------------------------------------- + +using System; +using System.Linq; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Cluster.Tools.Singleton; +using Akka.Configuration; +using Akka.TestKit; +using FluentAssertions; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Cluster.Sharding.Tests +{ + public class PersistentShardSpec : AkkaSpec + { + private static readonly Config SpecConfig; + + internal class EntityActor : UntypedActor + { + private readonly string id; + + public EntityActor(string id) + { + this.id = id; + } + + protected override void OnReceive(object message) + { + } + } + + static PersistentShardSpec() + { + SpecConfig = ConfigurationFactory.ParseString(@" + akka.loglevel = DEBUG + akka.actor.provider = cluster + akka.persistence.journal.plugin = ""akka.persistence.journal.inmem"" + akka.remote.dot-netty.tcp.port = 0") + .WithFallback(ClusterSingletonManager.DefaultConfig() + .WithFallback(ClusterSharding.DefaultConfig())); + } + + public PersistentShardSpec(ITestOutputHelper helper) : base(SpecConfig, helper) + { + } + + [Fact] + public void Persistent_Shard_must_remember_entities_started_with_StartEntity() + { + Func ep = id => Props.Create(() => new EntityActor(id)); + + var props = Props.Create(() => new PersistentShard( + "cats", + "shard-1", + ep, + ClusterShardingSettings.Create(Sys), + _ => Tuple.Create("entity-1", (object)"msg"), + _ => "shard-1", + PoisonPill.Instance + )); + + var persistentShard = Sys.ActorOf(props); + Watch(persistentShard); + + persistentShard.Tell(new ShardRegion.StartEntity("entity-1")); + ExpectMsg(new ShardRegion.StartEntityAck("entity-1", "shard-1")); + + persistentShard.Tell(PoisonPill.Instance); + ExpectTerminated(persistentShard); + + Sys.Log.Info("Starting shard again"); + var secondIncarnation = Sys.ActorOf(props); + + secondIncarnation.Tell(Shard.GetShardStats.Instance); + AwaitAssert(() => + { + ExpectMsgAllOf(new Shard.ShardStats("shard-1", 1)); + }); + } + } +} diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ClusterSharding.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ClusterSharding.cs index c9cda07a2c5..f6fd7eabe9a 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ClusterSharding.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ClusterSharding.cs @@ -1109,7 +1109,7 @@ public IActorRef ShardRegionProxy(string typeName) throw new ArgumentException($"Shard type [{typeName}] must be started first"); } - private IShardAllocationStrategy DefaultShardAllocationStrategy(ClusterShardingSettings settings) + public IShardAllocationStrategy DefaultShardAllocationStrategy(ClusterShardingSettings settings) { return new LeastShardAllocationStrategy( Settings.TunningParameters.LeastShardAllocationRebalanceThreshold, diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/DDataShard.cs b/src/contrib/cluster/Akka.Cluster.Sharding/DDataShard.cs index ebe8bee28ca..6fd40ccdbf3 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/DDataShard.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/DDataShard.cs @@ -45,7 +45,7 @@ internal sealed class DDataShard : ActorBase, IShard, IWithUnboundedStash public Shard.ShardState State { get; set; } = Shard.ShardState.Empty; public ImmutableDictionary RefById { get; set; } = ImmutableDictionary.Empty; public ImmutableDictionary IdByRef { get; set; } = ImmutableDictionary.Empty; - public ImmutableDictionary LastMessageTimestamp { get; set; } + public ImmutableDictionary LastMessageTimestamp { get; set; } = ImmutableDictionary.Empty; public ImmutableHashSet Passivating { get; set; } = ImmutableHashSet.Empty; public ImmutableDictionary>> MessageBuffers { get; set; } = ImmutableDictionary>>.Empty; public ICancelable PassivateIdleTask { get; } diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShard.cs b/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShard.cs index 58cdfeb86c5..ac4beae3c86 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShard.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShard.cs @@ -96,20 +96,7 @@ protected override bool ReceiveCommand(object message) { case SaveSnapshotSuccess m: Log.Debug("PersistentShard snapshot saved successfully"); - /* - * delete old events but keep the latest around because - * - * it's not safe to delete all events immediately because snapshots are typically stored with a weaker consistency - * level which means that a replay might "see" the deleted events before it sees the stored snapshot, - * i.e. it will use an older snapshot and then not replay the full sequence of events - * - * for debugging if something goes wrong in production it's very useful to be able to inspect the events - */ - var deleteToSequenceNr = m.Metadata.SequenceNr - Settings.TunningParameters.KeepNrOfBatches * Settings.TunningParameters.SnapshotAfter; - if (deleteToSequenceNr > 0) - { - DeleteMessages(deleteToSequenceNr); - } + InternalDeleteMessagesBeforeSnapshot(m, Settings.TunningParameters.KeepNrOfBatches, Settings.TunningParameters.SnapshotAfter); break; case SaveSnapshotFailure m: Log.Warning("PersistentShard snapshot failure: [{0}]", m.Cause.Message); @@ -221,7 +208,7 @@ public void DeliverTo(string id, object message, object payload, IActorRef sende { throw new InvalidOperationException($"Message buffers contains id [{id}]."); } - this.GetEntity(id).Tell(payload, sender); + this.GetOrCreateEntity(id).Tell(payload, sender); } else { @@ -231,7 +218,10 @@ public void DeliverTo(string id, object message, object payload, IActorRef sende } } else + { + this.TouchLastMessageTimestamp(id); child.Tell(payload, sender); + } } } } diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShardCoordinator.cs b/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShardCoordinator.cs index c3fe4348232..314ce460571 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShardCoordinator.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/PersistentShardCoordinator.cs @@ -1233,7 +1233,7 @@ private StateInitialized() { } } #endregion - + /// /// Factory method for the of the actor. /// @@ -1241,7 +1241,7 @@ private StateInitialized() { } /// TBD /// TBD /// TBD - internal static Props Props(string typeName, ClusterShardingSettings settings, IShardAllocationStrategy allocationStrategy) => + internal static Props Props(string typeName, ClusterShardingSettings settings, IShardAllocationStrategy allocationStrategy) => Actor.Props.Create(() => new PersistentShardCoordinator(typeName, settings, allocationStrategy)).WithDeploy(Deploy.Local); public Cluster Cluster { get; } = Cluster.Get(Context.System); @@ -1414,20 +1414,7 @@ private bool HandleSnapshotResult(object message) { case SaveSnapshotSuccess m: Log.Debug("Persistent snapshot saved successfully"); - /* - * delete old events but keep the latest around because - * - * it's not safe to delete all events immediate because snapshots are typically stored with a weaker consistency - * level which means that a replay might "see" the deleted events before it sees the stored snapshot, - * i.e. it will use an older snapshot and then not replay the full sequence of events - * - * for debugging if something goes wrong in production it's very useful to be able to inspect the events - */ - var deleteToSequenceNr = m.Metadata.SequenceNr - Settings.TunningParameters.KeepNrOfBatches * Settings.TunningParameters.SnapshotAfter; - if (deleteToSequenceNr > 0) - { - DeleteMessages(deleteToSequenceNr); - } + InternalDeleteMessagesBeforeSnapshot(m, Settings.TunningParameters.KeepNrOfBatches, Settings.TunningParameters.SnapshotAfter); break; case SaveSnapshotFailure m: diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs b/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs index a6ebbb4d250..c4b5aada690 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs @@ -500,7 +500,7 @@ private static void HandleShardCommand(this TShard shard, Shard.IShardCo switch (message) { case Shard.RestartEntity restartEntity: - shard.GetEntity(restartEntity.EntityId); + shard.GetOrCreateEntity(restartEntity.EntityId); break; case Shard.RestartEntities restartEntities: shard.HandleRestartEntities(restartEntities.Entries); @@ -510,13 +510,24 @@ private static void HandleShardCommand(this TShard shard, Shard.IShardCo private static void HandleStartEntity(this TShard shard, ShardRegion.StartEntity start) where TShard : IShard { - shard.Log.Debug("Got a request from [{0}] to start entity [{1}] in shard [{2}]", shard.Sender, start.EntityId, shard.ShardId); - if (shard.PassivateIdleTask != null) + var requester = shard.Sender; + shard.Log.Debug("Got a request from [{0}] to start entity [{1}] in shard [{2}]", requester, start.EntityId, shard.ShardId); + shard.TouchLastMessageTimestamp(start.EntityId); + + if (shard.State.Entries.Contains(start.EntityId)) { - shard.LastMessageTimestamp = shard.LastMessageTimestamp.SetItem(start.EntityId, DateTime.Now.Ticks); + shard.GetOrCreateEntity(start.EntityId); + requester.Tell(new ShardRegion.StartEntityAck(start.EntityId, shard.ShardId)); } - shard.GetEntity(start.EntityId); - shard.Context.Sender.Tell(new ShardRegion.StartEntityAck(start.EntityId, shard.ShardId)); + else + { + shard.ProcessChange(new Shard.EntityStarted(start.EntityId), e => + { + shard.GetOrCreateEntity(start.EntityId); + shard.SendMessageBuffer(e); + requester.Tell(new ShardRegion.StartEntityAck(start.EntityId, shard.ShardId)); + }); + }; } private static void HandleStartEntityAck(this TShard shard, ShardRegion.StartEntityAck ack) where TShard : IShard @@ -570,8 +581,11 @@ private static void HandOff(this TShard shard, IActorRef replyTo) where if (shard.State.Entries.Count != 0) { + var entityHandOffTimeout = (shard.Settings.TunningParameters.HandOffTimeout - TimeSpan.FromSeconds(5)); + if (entityHandOffTimeout < TimeSpan.FromSeconds(1)) + entityHandOffTimeout = TimeSpan.FromSeconds(1); shard.HandOffStopper = shard.Context.Watch(shard.Context.ActorOf( - ShardRegion.HandOffStopper.Props(shard.ShardId, replyTo, shard.IdByRef.Keys, shard.HandOffStopMessage))); + ShardRegion.HandOffStopper.Props(shard.ShardId, replyTo, shard.IdByRef.Keys, shard.HandOffStopMessage, entityHandOffTimeout))); //During hand off we only care about watching for termination of the hand off stopper shard.Context.Become(message => @@ -624,6 +638,14 @@ private static void Passivate(this TShard shard, IActorRef entity, objec } } + public static void TouchLastMessageTimestamp(this TShard shard, EntityId id) where TShard : IShard + { + if (shard.PassivateIdleTask != null) + { + shard.LastMessageTimestamp = shard.LastMessageTimestamp.SetItem(id, DateTime.Now.Ticks); + } + } + private static void PassivateIdleEntities(this TShard shard) where TShard : IShard { var idleEntitiesCount = 0; @@ -658,7 +680,7 @@ public static void SendMessageBuffer(this TShard shard, Shard.EntityStar { shard.Log.Debug("Sending message buffer for entity [{0}] ([{1}] messages)", id, buffer.Count); - shard.GetEntity(id); + shard.GetOrCreateEntity(id); // Now there is no deliveryBuffer we can try to redeliver // and as the child exists, the message will be directly forwarded @@ -730,21 +752,11 @@ public static void BaseEntityTerminated(this TShard shard, IActorRef tre internal static void BaseDeliverTo(this TShard shard, string id, object message, object payload, IActorRef sender) where TShard : IShard { - if (shard.PassivateIdleTask != null) - { - shard.LastMessageTimestamp = shard.LastMessageTimestamp.SetItem(id, DateTime.Now.Ticks); - } - - var name = Uri.EscapeDataString(id); - var child = shard.Context.Child(name); - - if (Equals(child, ActorRefs.Nobody)) - shard.GetEntity(id).Tell(payload, sender); - else - child.Tell(payload, sender); + shard.TouchLastMessageTimestamp(id); + shard.GetOrCreateEntity(id).Tell(payload, sender); } - internal static IActorRef GetEntity(this TShard shard, string id) where TShard : IShard + internal static IActorRef GetOrCreateEntity(this TShard shard, string id, Action onCreate = null) where TShard : IShard { var name = Uri.EscapeDataString(id); var child = shard.Context.Child(name).GetOrElse(() => @@ -754,11 +766,9 @@ internal static IActorRef GetEntity(this TShard shard, string id) where var a = shard.Context.Watch(shard.Context.ActorOf(shard.EntityProps(id), name)); shard.IdByRef = shard.IdByRef.SetItem(a, id); shard.RefById = shard.RefById.SetItem(id, a); - if (shard.PassivateIdleTask != null) - { - shard.LastMessageTimestamp = shard.LastMessageTimestamp.SetItem(id, DateTime.Now.Ticks); - } + shard.TouchLastMessageTimestamp(id); shard.State = new Shard.ShardState(shard.State.Entries.Add(id)); + onCreate?.Invoke(a); return a; }); @@ -846,6 +856,8 @@ private void SendStart(IImmutableSet ids) { foreach (var id in ids) { + // these go through the region rather the directly to the shard + // so that shard mapping changes are picked up _region.Tell(new ShardRegion.StartEntity(id)); } } diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ShardAllocationStrategy.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ShardAllocationStrategy.cs index 45ef7772d89..cac16f7c951 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ShardAllocationStrategy.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ShardAllocationStrategy.cs @@ -53,11 +53,28 @@ public interface IShardAllocationStrategy : INoSerializationVerificationNeeded /// /// The default implementation of allocates new shards - /// to the with least number of previously allocated shards. It picks shards - /// for rebalancing handoff from the with most number of previously allocated shards. + /// to the with least number of previously allocated shards. + /// + /// When a node is added to the cluster the shards on the existing nodes will be rebalanced to the new node. + /// evenly spread on the remaining nodes (by picking regions with least shards). + /// + /// When a node is added to the cluster the shards on the existing nodes will be rebalanced to the new node. + /// It picks shards for rebalancing from the `ShardRegion` with most number of previously allocated shards. + /// /// They will then be allocated to the with least number of previously allocated shards, /// i.e. new members in the cluster. There is a configurable threshold of how large the difference - /// must be to begin the rebalancing. The number of ongoing rebalancing processes can be limited. + /// must be to begin the rebalancing.The difference between number of shards in the region with most shards and + /// the region with least shards must be greater than the `rebalanceThreshold` for the rebalance to occur. + /// + /// A `rebalanceThreshold` of 1 gives the best distribution and therefore typically the best choice. + /// A higher threshold means that more shards can be rebalanced at the same time instead of one-by-one. + /// That has the advantage that the rebalance process can be quicker but has the drawback that the + /// the number of shards (and therefore load) between different nodes may be significantly different. + /// Given the recommendation of using 10x shards than number of nodes and `rebalanceThreshold=10` can result + /// in one node hosting ~2 times the number of shards of other nodes.Example: 1000 shards on 100 nodes means + /// 10 shards per node.One node may have 19 shards and others 10 without a rebalance occurring. + /// + /// The number of ongoing rebalancing processes can be limited by `maxSimultaneousRebalance`. /// [Serializable] public class LeastShardAllocationStrategy : IShardAllocationStrategy @@ -85,7 +102,7 @@ public LeastShardAllocationStrategy(int rebalanceThreshold, int maxSimultaneousR /// TBD public Task AllocateShard(IActorRef requester, string shardId, IImmutableDictionary> currentShardAllocations) { - var min = GetMinBy(currentShardAllocations, kv => kv.Value.Count); + var min = currentShardAllocations.OrderBy(i => i.Value.Count).FirstOrDefault(); return Task.FromResult(min.Key); } @@ -99,51 +116,21 @@ public Task> Rebalance(IImmutableDictionary kv.Value.Count); - var shards = - currentShardAllocations.Select(kv => kv.Value.Where(s => !rebalanceInProgress.Contains(s)).ToArray()); - var mostShards = GetMaxBy(shards, x => x.Length); + var leastShardsRegion = currentShardAllocations.OrderBy(i => i.Value.Count).FirstOrDefault(); + var mostShards = currentShardAllocations.Select(kv => kv.Value.Where(s => !rebalanceInProgress.Contains(s))).OrderByDescending(i => i.Count()).FirstOrDefault()?.ToArray(); var difference = mostShards.Length - leastShardsRegion.Value.Count; if (difference >= _rebalanceThreshold) { - return Task.FromResult>(mostShards.Take(Math.Min(difference, _maxSimultaneousRebalance - rebalanceInProgress.Count)).ToImmutableHashSet()); - } - } - - return Task.FromResult>(ImmutableHashSet.Empty); - } + var n = Math.Min( + Math.Min(difference - _rebalanceThreshold, _rebalanceThreshold), + _maxSimultaneousRebalance - rebalanceInProgress.Count); - private static T GetMinBy(IEnumerable collection, Func extractor) - { - var minSize = int.MaxValue; - var result = default(T); - foreach (var value in collection) - { - var x = extractor(value); - if (x < minSize) - { - minSize = x; - result = value; + return Task.FromResult>(mostShards.OrderBy(i => i).Take(n).ToImmutableHashSet()); } } - return result; - } - private static T GetMaxBy(IEnumerable collection, Func extractor) - { - var minSize = int.MinValue; - var result = default(T); - foreach (var value in collection) - { - var x = extractor(value); - if (x > minSize) - { - minSize = x; - result = value; - } - } - return result; + return Task.FromResult>(ImmutableHashSet.Empty); } } } diff --git a/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs b/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs index b78dfe12471..e2e1e4970b7 100644 --- a/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs +++ b/src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs @@ -175,6 +175,12 @@ public override int GetHashCode() /// internal class HandOffStopper : ReceiveActor { + private ILoggingAdapter _log; + /// + /// TBD + /// + public ILoggingAdapter Log { get { return _log ?? (_log = Context.GetLogger()); } } + /// /// TBD /// @@ -183,22 +189,30 @@ internal class HandOffStopper : ReceiveActor /// TBD /// TBD /// TBD - public static Props Props(ShardId shard, IActorRef replyTo, IEnumerable entities, object stopMessage) + public static Props Props(ShardId shard, IActorRef replyTo, IEnumerable entities, object stopMessage, TimeSpan handoffTimeout) { - return Actor.Props.Create(() => new HandOffStopper(shard, replyTo, entities, stopMessage)).WithDeploy(Deploy.Local); + return Actor.Props.Create(() => new HandOffStopper(shard, replyTo, entities, stopMessage, handoffTimeout)).WithDeploy(Deploy.Local); } /// - /// TBD + ///Sends stopMessage (e.g. `PoisonPill`) to the entities and when all of + /// them have terminated it replies with `ShardStopped`. + /// If the entities don't terminate after `handoffTimeout` it will try stopping them forcefully. /// /// TBD /// TBD /// TBD /// TBD - public HandOffStopper(ShardId shard, IActorRef replyTo, IEnumerable entities, object stopMessage) + public HandOffStopper(ShardId shard, IActorRef replyTo, IEnumerable entities, object stopMessage, TimeSpan handoffTimeout) { var remaining = new HashSet(entities); + Receive(t => + { + Log.Warning("HandOffStopMessage[{0}] is not handled by some of the entities of the [{1}] shard, stopping the remaining entities.", stopMessage.GetType(), shard); + foreach (var r in remaining) + Context.Stop(r); + }); Receive(t => { remaining.Remove(t.ActorRef); @@ -209,6 +223,8 @@ public HandOffStopper(ShardId shard, IActorRef replyTo, IEnumerable e } }); + Context.SetReceiveTimeout(handoffTimeout); + foreach (var aref in remaining) { Context.Watch(aref); @@ -440,7 +456,7 @@ protected override void PreStart() Cluster.Subscribe(Self, typeof(ClusterEvent.IMemberEvent)); if (Settings.PassivateIdleEntityAfter > TimeSpan.Zero && !Settings.RememberEntities) { - Log.Info($"Idle entities will be passivated after [{Settings.PassivateIdleEntityAfter}]"); + Log.Info($"Idle entities will be passivated after [{Settings.PassivateIdleEntityAfter}]"); } } @@ -675,22 +691,27 @@ private void HandleShardRegionCommand(IShardRegionCommand command) switch (command) { case Retry _: + SendGracefulShutdownToCoordinator(); + if (ShardBuffers.Count != 0) _retryCount++; if (_coordinator == null) Register(); else { - SendGracefulShutdownToCoordinator(); RequestShardBufferHomes(); - TryCompleteGracefulShutdown(); } + + TryCompleteGracefulShutdown(); + break; + case GracefulShutdown _: Log.Debug("Starting graceful shutdown of region and all its shards"); GracefulShutdownInProgress = true; SendGracefulShutdownToCoordinator(); TryCompleteGracefulShutdown(); break; + default: Unhandled(command); break; @@ -984,6 +1005,14 @@ private void HandleClusterEvent(ClusterEvent.IClusterDomainEvent e) ChangeMembers(MembersByAge.Remove(m)); } break; + + case ClusterEvent.MemberDowned md: + if (md.Member.UniqueAddress == Cluster.SelfUniqueAddress) + { + Context.Stop(Self); + } + Log.Info("Self downed, stopping ShardRegion [{0}]", Self.Path); + break; case ClusterEvent.IMemberEvent _: // these are expected, no need to warn about them break; diff --git a/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/Singleton/ClusterSingletonManagerDownedSpec.cs b/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/Singleton/ClusterSingletonManagerDownedSpec.cs new file mode 100644 index 00000000000..c76491f25bf --- /dev/null +++ b/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/Singleton/ClusterSingletonManagerDownedSpec.cs @@ -0,0 +1,179 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2018 Lightbend Inc. +// Copyright (C) 2013-2018 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using System.Linq; +using Akka.Actor; +using Akka.Cluster.TestKit; +using Akka.Cluster.Tools.Singleton; +using Akka.Configuration; +using Akka.Remote.TestKit; +using Akka.Remote.Transport; +using FluentAssertions; + +namespace Akka.Cluster.Tools.Tests.MultiNode.Singleton +{ + public class ClusterSingletonManagerDownedSpecConfig : MultiNodeConfig + { + public RoleName First { get; } + public RoleName Second { get; } + public RoleName Third { get; } + + public ClusterSingletonManagerDownedSpecConfig() + { + First = Role("first"); + Second = Role("second"); + Third = Role("third"); + + CommonConfig = ConfigurationFactory.ParseString(@" + akka.loglevel = INFO + akka.actor.provider = ""Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"" + akka.remote.log-remote-lifecycle-events = off + akka.cluster.auto-down-unreachable-after = off + ") + .WithFallback(ClusterSingletonManager.DefaultConfig()) + .WithFallback(ClusterSingletonProxy.DefaultConfig()) + .WithFallback(MultiNodeClusterSpec.ClusterConfig()); + } + + internal class EchoStarted + { + public static readonly EchoStarted Instance = new EchoStarted(); + private EchoStarted() + { + } + } + + internal class EchoStopped + { + public static readonly EchoStopped Instance = new EchoStopped(); + private EchoStopped() + { + } + } + + /// + /// The singleton actor + /// + internal class Echo : UntypedActor + { + private readonly IActorRef _testActorRef; + + public Echo(IActorRef testActorRef) + { + _testActorRef = testActorRef; + _testActorRef.Tell(EchoStarted.Instance); + } + + protected override void PostStop() + { + _testActorRef.Tell(EchoStopped.Instance); + } + + public static Props Props(IActorRef testActorRef) + => Actor.Props.Create(() => new Echo(testActorRef)); + + protected override void OnReceive(object message) + { + Sender.Tell(message); + } + } + } + + public class ClusterSingletonManagerDownedSpec : MultiNodeClusterSpec + { + private readonly ClusterSingletonManagerDownedSpecConfig _config; + private readonly Lazy _echoProxy; + + protected override int InitialParticipantsValueFactory => Roles.Count; + + public ClusterSingletonManagerDownedSpec() : this(new ClusterSingletonManagerDownedSpecConfig()) + { + } + + protected ClusterSingletonManagerDownedSpec(ClusterSingletonManagerDownedSpecConfig config) : base(config, typeof(ClusterSingletonManagerDownedSpec)) + { + _config = config; + + _echoProxy = new Lazy(() => Watch(Sys.ActorOf(ClusterSingletonProxy.Props( + singletonManagerPath: "/user/echo", + settings: ClusterSingletonProxySettings.Create(Sys)), + name: "echoProxy"))); + } + + private void Join(RoleName from, RoleName to) + { + RunOn(() => + { + Cluster.Join(Node(to).Address); + CreateSingleton(); + }, from); + } + + private IActorRef CreateSingleton() + { + return Sys.ActorOf(ClusterSingletonManager.Props( + singletonProps: ClusterSingletonManagerDownedSpecConfig.Echo.Props(TestActor), + terminationMessage: PoisonPill.Instance, + settings: ClusterSingletonManagerSettings.Create(Sys)), + name: "echo"); + } + + [MultiNodeFact] + public void ClusterSingletonManagerDownedSpecs() + { + ClusterSingletonManager_downing_must_startup_3_node(); + } + + private void ClusterSingletonManager_downing_must_startup_3_node() + { + Join(_config.First, _config.First); + Join(_config.Second, _config.First); + Join(_config.Third, _config.First); + + Within(15.Seconds(), () => + { + AwaitAssert(() => Cluster.State.Members.Count(m => m.Status == MemberStatus.Up).Should().Be(3)); + }); + + RunOn(() => + { + ExpectMsg(ClusterSingletonManagerDownedSpecConfig.EchoStarted.Instance); + }, _config.First); + + EnterBarrier("started"); + } + + private void ClusterSingletonManager_downing_must_stop_instance_when_member_is_downed() + { + RunOn(() => + { + TestConductor.Blackhole(_config.First, _config.Third, ThrottleTransportAdapter.Direction.Both).Wait(); + TestConductor.Blackhole(_config.Second, _config.Third, ThrottleTransportAdapter.Direction.Both).Wait(); + + Within(15.Seconds(), () => + { + AwaitAssert(() => Cluster.State.Unreachable.Count.Should().Be(1)); + }); + }, _config.First); + + EnterBarrier("blackhole-1"); + + RunOn(() => + { + // another blackhole so that second can't mark gossip as seen and thereby deferring shutdown of first + TestConductor.Blackhole(_config.First, _config.Second, ThrottleTransportAdapter.Direction.Both).Wait(); + Cluster.Down(Node(_config.Second).Address); + Cluster.Down(Cluster.SelfAddress); + // singleton instance stopped, before failure detection of first-second + ExpectMsg(TimeSpan.FromSeconds(3)); + }, _config.First); + + EnterBarrier("stopped"); + } + } +} diff --git a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Akka.Cluster.Tools.Tests.csproj b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Akka.Cluster.Tools.Tests.csproj index e3accb2e180..7163dffda84 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Akka.Cluster.Tools.Tests.csproj +++ b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Akka.Cluster.Tools.Tests.csproj @@ -34,4 +34,4 @@ $(DefineConstants);RELEASE - \ No newline at end of file + diff --git a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonConfigSpec.cs b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonConfigSpec.cs index 8f111b975b9..95a67c26661 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonConfigSpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonConfigSpec.cs @@ -36,7 +36,7 @@ public void ClusterSingletonManagerSettings_must_have_default_config() clusterSingletonManagerSettings.RemovalMargin.TotalSeconds.ShouldBe(0); var config = Sys.Settings.Config.GetConfig("akka.cluster.singleton"); - config.GetInt("min-number-of-hand-over-retries").ShouldBe(10); + config.GetInt("min-number-of-hand-over-retries").ShouldBe(15); } [Fact] diff --git a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonLeavingSpeedSpec.cs b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonLeavingSpeedSpec.cs new file mode 100644 index 00000000000..b28d7b783bb --- /dev/null +++ b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonLeavingSpeedSpec.cs @@ -0,0 +1,168 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2018 Lightbend Inc. +// Copyright (C) 2013-2018 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using System.Collections.Generic; +using System.Collections.Immutable; +using System.Diagnostics; +using System.Linq; +using Akka.Actor; +using Akka.Cluster.Tools.Singleton; +using Akka.Configuration; +using Akka.TestKit; +using Akka.TestKit.TestActors; +using FluentAssertions; +using Xunit; + +namespace Akka.Cluster.Tools.Tests.Singleton +{ + public class ClusterSingletonLeavingSpeedSpec : AkkaSpec + { + + internal class TheSingleton : UntypedActor + { + private readonly IActorRef probe; + + public static Props props(IActorRef probe) => Props.Create(() => new TheSingleton(probe)); + + public TheSingleton(IActorRef probe) + { + this.probe = probe; + probe.Tell("started"); + } + + protected override void OnReceive(object message) + { + Sender.Tell(message); + } + + protected override void PostStop() + { + probe.Tell("stopped"); + } + } + + private readonly ActorSystem[] _systems; + private readonly TestProbe[] _probes; + + public ClusterSingletonLeavingSpeedSpec() : base(@" + akka.loglevel = INFO + akka.actor.provider = ""cluster"" + akka.cluster.auto-down-unreachable-after = 2s + + # With 10 systems and setting min-number-of-hand-over-retries to 5 and gossip-interval to 2s it's possible to + # reproduce the ClusterSingletonManagerIsStuck and slow hand over in issue #25639 + # akka.cluster.singleton.min-number-of-hand-over-retries = 5 + # akka.cluster.gossip-interval = 2s + + akka.remote { + dot-netty.tcp { + hostname = ""127.0.0.1"" + port = 0 + } + }") + { + _systems = Enumerable.Range(1, 3).Select(n => + ActorSystem.Create(Sys.Name, ConfigurationFactory.ParseString($"akka.cluster.roles=[role-{n % 3}]").WithFallback(Sys.Settings.Config))).ToArray(); + + _probes = _systems.Select(i => CreateTestProbe()).ToArray(); + } + + public void Join(ActorSystem from, ActorSystem to, IActorRef probe) + { + from.ActorOf(ClusterSingletonManager.Props( + TheSingleton.props(probe), + PoisonPill.Instance, + ClusterSingletonManagerSettings.Create(from)), "echo"); + + Cluster.Get(from).Join(Cluster.Get(to).SelfAddress); + + Within(TimeSpan.FromSeconds(15), () => + { + AwaitAssert(() => + { + Cluster.Get(from).State.Members.Select(x => x.UniqueAddress).Should().Contain(Cluster.Get(from).SelfUniqueAddress); + Cluster.Get(from) + .State.Members.Select(x => x.Status) + .ToImmutableHashSet() + .Should() + .Equal(ImmutableHashSet.Empty.Add(MemberStatus.Up)); + }); + }); + } + + [Fact] + public void ClusterSingleton_that_is_leaving_must() + { + ClusterSingleton_that_is_leaving_must_join_cluster(); + ClusterSingleton_that_is_leaving_must_quickly_hand_over_to_next_oldest(); + } + + private void ClusterSingleton_that_is_leaving_must_join_cluster() + { + for (int i = 0; i < _systems.Length; i++) + Join(_systems[i], _systems[0], _probes[i]); + + // leader is most likely on system, lowest port + Join(Sys, _systems[0], TestActor); + + _probes[0].ExpectMsg("started"); + } + + private void ClusterSingleton_that_is_leaving_must_quickly_hand_over_to_next_oldest() + { + List<(TimeSpan stoppedDuration, TimeSpan startedDuration)> durations = new List<(TimeSpan, TimeSpan)>(); + + Stopwatch sw = new Stopwatch(); + sw.Start(); + for (int i = 0; i < _systems.Length; i++) + { + var leaveAddress = Cluster.Get(_systems[i]).SelfAddress; + CoordinatedShutdown.Get(_systems[i]).Run(CoordinatedShutdown.ClusterLeavingReason.Instance); + _probes[i].ExpectMsg("stopped", TimeSpan.FromSeconds(10)); + var stoppedDuration = sw.Elapsed; + + if (i != _systems.Length - 1) + _probes[i + 1].ExpectMsg("started", TimeSpan.FromSeconds(30)); + else + ExpectMsg("started", TimeSpan.FromSeconds(30)); + + var startedDuration = sw.Elapsed; + + Within(TimeSpan.FromSeconds(15), () => + { + AwaitAssert(() => + { + Cluster.Get(_systems[i]).IsTerminated.Should().BeTrue(); + Cluster.Get(Sys).State.Members.Select(m => m.Address).Should().NotContain(leaveAddress); + + foreach (var sys in _systems) + { + if (!Cluster.Get(sys).IsTerminated) + Cluster.Get(sys).State.Members.Select(m => m.Address).Should().NotContain(leaveAddress); + } + }); + }); + + Log.Info($"Singleton {i} stopped in {(int)stoppedDuration.TotalMilliseconds} ms, started in {(int)startedDuration.Milliseconds} ms, diff ${(int)(startedDuration - stoppedDuration).TotalMilliseconds} ms"); + durations.Add((stoppedDuration, startedDuration)); + } + sw.Stop(); + + for (int i = 0; i < durations.Count; i++) + { + Log.Info($"Singleton {i} stopped in {(int)durations[i].stoppedDuration.TotalMilliseconds} ms, started in {(int)durations[i].startedDuration.Milliseconds} ms, diff ${(int)(durations[i].startedDuration - durations[i].stoppedDuration).TotalMilliseconds} ms"); + } + } + + protected override void AfterTermination() + { + foreach (var s in _systems) + Shutdown(s); + } + } +} diff --git a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonRestart2Spec.cs b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonRestart2Spec.cs index df3486327e4..0c19e95e704 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonRestart2Spec.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools.Tests/Singleton/ClusterSingletonRestart2Spec.cs @@ -30,6 +30,7 @@ public ClusterSingletonRestart2Spec() : base(@" akka.actor.provider = ""cluster"" akka.cluster.roles = [singleton] akka.cluster.auto-down-unreachable-after = 2s + akka.cluster.singleton.min-number-of-hand-over-retries = 5 akka.remote { dot-netty.tcp { hostname = ""127.0.0.1"" @@ -107,7 +108,7 @@ public void Restarting_cluster_node_during_hand_over_must_restart_singletons_in_ // let it stabilize Task.Delay(TimeSpan.FromSeconds(5)).Wait(); - + Within(TimeSpan.FromSeconds(10), () => { AwaitAssert(() => diff --git a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs index 81020dbffd9..84481dae06b 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/DistributedPubSubMediator.cs @@ -167,8 +167,8 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) { var routees = new List(); ValueHolder valueHolder; - if (_registry.TryGetValue(_cluster.SelfAddress, out var bucket) && - bucket.Content.TryGetValue(send.Path, out valueHolder) && + if (_registry.TryGetValue(_cluster.SelfAddress, out var bucket) && + bucket.Content.TryGetValue(send.Path, out valueHolder) && send.LocalAffinity) { var routee = valueHolder.Routee; @@ -275,7 +275,7 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) child.Forward(unsubscribe); else { - // no such topic here + // no such topic here } }); }); @@ -357,6 +357,14 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) _registry.Remove(left.Member.Address); } }); + Receive(downed => + { + if (IsMatchingRole(downed.Member)) + { + _nodes.Remove(downed.Member.Address); + _registry.Remove(downed.Member.Address); + } + }); Receive(removed => { var member = removed.Member; @@ -380,7 +388,7 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings) }); Receive(msg => { - var encTopic = Internal.Utils.EncodeName(msg.Topic); + var encTopic = Internal.Utils.EncodeName(msg.Topic); _buffer.BufferOr(Internal.Utils.MakeKey(Self.Path / encTopic), msg, Sender, () => { var child = Context.Child(encTopic); diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonManager.cs b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonManager.cs index e6fc0ea1daf..d91fc690f14 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonManager.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonManager.cs @@ -598,7 +598,7 @@ private void SetupCoordinatedShutdown() protected override void PreStart() { // subscribe to cluster changes, re-subscribe when restart - _cluster.Subscribe(Self, ClusterEvent.InitialStateAsEvents, typeof(ClusterEvent.MemberRemoved)); + _cluster.Subscribe(Self, ClusterEvent.InitialStateAsEvents, typeof(ClusterEvent.MemberRemoved), typeof(ClusterEvent.MemberDowned)); SetTimer(CleanupTimer, Cleanup.Instance, TimeSpan.FromMinutes(1.0), repeat: true); @@ -678,12 +678,14 @@ private State GoToHandingOver(IAct } handOverTo?.Tell(HandOverInProgress.Instance); + Log.Info("Singleton manager stopping singleton actor [{0}]", singleton.Path); singleton.Tell(_terminationMessage); return GoTo(ClusterSingletonState.HandingOver).Using(new HandingOverData(singleton, handOverTo)); } private State GoToStopping(IActorRef singleton) { + Log.Info("Singleton manager stopping singleton actor [{0}]", singleton.Path); singleton.Tell(_terminationMessage); return GoTo(ClusterSingletonState.Stopping).Using(new StoppingData(singleton)); } @@ -741,6 +743,11 @@ private void InitializeFSM() return Stay().Using(new YoungerData(oldestChanged.Oldest)); } } + else if (e.FsmEvent is MemberDowned memberDowned && memberDowned.Member.UniqueAddress.Equals(_cluster.SelfUniqueAddress)) + { + Log.Info("Self downed, stopping ClusterSingletonManager"); + return Stop(); + } else if (e.FsmEvent is MemberRemoved memberRemoved) { if (memberRemoved.Member.UniqueAddress.Equals(_cluster.SelfUniqueAddress)) @@ -799,6 +806,11 @@ private void InitializeFSM() return Stay(); } } + else if (e.FsmEvent is MemberDowned memberDowned && memberDowned.Member.UniqueAddress.Equals(_cluster.SelfUniqueAddress)) + { + Log.Info("Self downed, stopping ClusterSingletonManager"); + return Stop(); + } else if (e.FsmEvent is MemberRemoved memberRemoved) { if (memberRemoved.Member.UniqueAddress.Equals(_cluster.SelfUniqueAddress)) @@ -925,6 +937,7 @@ private void InitializeFSM() } else if (e.FsmEvent is Terminated terminated && e.StateData is OldestData o && terminated.ActorRef.Equals(o.Singleton)) { + Log.Info("Singleton actor [{0}] was terminated", o.Singleton.Path); return Stay().Using(new OldestData(o.Singleton, true)); } else if (e.FsmEvent is SelfExiting) @@ -934,6 +947,19 @@ private void InitializeFSM() Sender.Tell(Done.Instance); // reply to ask return Stay(); } + else if (e.FsmEvent is MemberDowned memberDowned && e.StateData is OldestData od && memberDowned.Member.UniqueAddress.Equals(_cluster.SelfUniqueAddress)) + { + if (od.SingletonTerminated) + { + Log.Info("Self downed, stopping ClusterSingletonManager"); + return Stop(); + } + else + { + Log.Info("Self downed, stopping"); + return GoToStopping(od.Singleton); + } + } return null; }); @@ -949,7 +975,10 @@ private void InitializeFSM() } else if (takeOverRetry.Count <= _maxTakeOverRetries) { - Log.Info("Retry [{0}], sending TakeOverFromMe to [{1}]", takeOverRetry.Count, wasOldestData.NewOldest?.Address); + if (_maxTakeOverRetries - takeOverRetry.Count <= 3) + Log.Info("Retry [{0}], sending TakeOverFromMe to [{1}]", takeOverRetry.Count, wasOldestData.NewOldest?.Address); + else + Log.Debug("Retry [{0}], sending TakeOverFromMe to [{1}]", takeOverRetry.Count, wasOldestData.NewOldest?.Address); if (wasOldestData.NewOldest != null) Peer(wasOldestData.NewOldest.Address).Tell(TakeOverFromMe.Instance); @@ -986,6 +1015,7 @@ private void InitializeFSM() && e.StateData is WasOldestData oldestData && t.ActorRef.Equals(oldestData.Singleton)) { + Log.Info("Singleton actor [{0}] was terminated", oldestData.Singleton.Path); return Stay().Using(new WasOldestData(oldestData.Singleton, true, oldestData.NewOldest)); } else if (e.FsmEvent is SelfExiting) @@ -995,6 +1025,19 @@ private void InitializeFSM() Sender.Tell(Done.Instance); // reply to ask return Stay(); } + else if (e.FsmEvent is MemberDowned memberDowned && e.StateData is WasOldestData od && memberDowned.Member.UniqueAddress.Equals(_cluster.SelfUniqueAddress)) + { + if (od.SingletonTerminated) + { + Log.Info("Self downed, stopping ClusterSingletonManager"); + return Stop(); + } + else + { + Log.Info("Self downed, stopping"); + return GoToStopping(od.Singleton); + } + } return null; }); @@ -1032,6 +1075,7 @@ private void InitializeFSM() && e.StateData is StoppingData stoppingData && terminated.ActorRef.Equals(stoppingData.Singleton)) { + Log.Info("Singleton actor [{0}] was terminated", stoppingData.Singleton.Path); return Stop(); } @@ -1046,6 +1090,11 @@ private void InitializeFSM() Log.Info("Self removed, stopping ClusterSingletonManager"); return Stop(); } + if (e.FsmEvent is OldestChangedBuffer.OldestChanged || e.FsmEvent is HandOverToMe) + { + // not interested anymore - waiting for removal + return Stay(); + } return null; }); @@ -1086,7 +1135,7 @@ private void InitializeFSM() } if (e.FsmEvent is TakeOverFromMe) { - Log.Info("Ignoring TakeOver request in [{0}] from [{1}].", StateName, Sender.Path.Address); + Log.Debug("Ignoring TakeOver request in [{0}] from [{1}].", StateName, Sender.Path.Address); return Stay(); } if (e.FsmEvent is Cleanup) @@ -1094,6 +1143,12 @@ private void InitializeFSM() CleanupOverdueNotMemberAnyMore(); return Stay(); } + if (e.FsmEvent is MemberDowned memberDowned) + { + if (memberDowned.Member.UniqueAddress.Equals(_cluster.SelfUniqueAddress)) + Log.Info("Self downed, waiting for removal"); + return Stay(); + } return null; }); diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonProxy.cs b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonProxy.cs index 303ad6ee63c..473d7801a20 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonProxy.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/ClusterSingletonProxy.cs @@ -41,7 +41,7 @@ public sealed class ClusterSingletonProxy : ReceiveActor /// /// TBD /// - internal sealed class TryToIdentifySingleton + internal sealed class TryToIdentifySingleton : INoSerializationVerificationNeeded { /// /// TBD diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/OldestChangedBuffer.cs b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/OldestChangedBuffer.cs index bf7ff57398a..9ea79362490 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/OldestChangedBuffer.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/OldestChangedBuffer.cs @@ -153,10 +153,15 @@ private bool MatchingRole(Member member) private void HandleInitial(ClusterEvent.CurrentClusterState state) { _membersByAge = state.Members - .Where(m => (m.Status == MemberStatus.Up || m.Status == MemberStatus.Leaving) && MatchingRole(m)) + .Where(m => (m.Status == MemberStatus.Up) && MatchingRole(m)) .ToImmutableSortedSet(MemberAgeOrdering.Descending); + // If there is some removal in progress of an older node it's not safe to immediately become oldest, + // removal of younger nodes doesn't matter. Note that it can also be started via restart after + // ClusterSingletonManagerIsStuck. - var safeToBeOldest = !state.Members.Any(m => m.Status == MemberStatus.Down || m.Status == MemberStatus.Exiting); + int selfUpNumber = state.Members.Where(m => m.UniqueAddress == _cluster.SelfUniqueAddress).Select(m => (int?)m.UpNumber).FirstOrDefault() ?? int.MaxValue; + + var safeToBeOldest = !state.Members.Any(m => (m.UpNumber < selfUpNumber && MatchingRole(m)) && (m.Status == MemberStatus.Down || m.Status == MemberStatus.Exiting || m.Status == MemberStatus.Leaving)); var initial = new InitialOldestState(_membersByAge.FirstOrDefault()?.UniqueAddress, safeToBeOldest); _changes = _changes.Enqueue(initial); } diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/reference.conf b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/reference.conf index 49d7a62c7c0..06c0be61f17 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/Singleton/reference.conf +++ b/src/contrib/cluster/Akka.Cluster.Tools/Singleton/reference.conf @@ -9,21 +9,31 @@ akka.cluster.singleton { # The actor name of the child singleton actor. singleton-name = "singleton" - + # Singleton among the nodes tagged with specified role. # If the role is not specified it's a singleton among all nodes in the cluster. role = "" - - # When a node is becoming oldest it sends hand-over request to previous oldest, - # that might be leaving the cluster. This is retried with this interval until - # the previous oldest confirms that the hand over has started or the previous + + # When a node is becoming oldest it sends hand-over request to previous oldest, + # that might be leaving the cluster. This is retried with this interval until + # the previous oldest confirms that the hand over has started or the previous # oldest member is removed from the cluster (+ akka.cluster.down-removal-margin). hand-over-retry-interval = 1s - + # The number of retries are derived from hand-over-retry-interval and # akka.cluster.down-removal-margin (or ClusterSingletonManagerSettings.removalMargin), # but it will never be less than this property. - min-number-of-hand-over-retries = 10 + # After the hand over retries and it's still not able to exchange the hand over messages + # with the previous oldest it will restart itself by throwing ClusterSingletonManagerIsStuck, + # to start from a clean state. After that it will still not start the singleton instance + # until the previous oldest node has been removed from the cluster. + # On the other side, on the previous oldest node, the same number of retries - 3 are used + # and after that the singleton instance is stopped. + # For large clusters it might be necessary to increase this to avoid too early timeouts while + # gossip dissemination of the Leaving to Exiting phase occurs. For normal leaving scenarios + # it will not be a quicker hand over by reducing this value, but in extreme failure scenarios + # the recovery might be faster. + min-number-of-hand-over-retries = 15 } # //#singleton-config @@ -31,22 +41,22 @@ akka.cluster.singleton { akka.cluster.singleton-proxy { # The actor name of the singleton actor that is started by the ClusterSingletonManager singleton-name = ${akka.cluster.singleton.singleton-name} - - # The role of the cluster nodes where the singleton can be deployed. + + # The role of the cluster nodes where the singleton can be deployed. # If the role is not specified then any node will do. role = "" - + # Interval at which the proxy will try to resolve the singleton instance. singleton-identification-interval = 1s - + # If the location of the singleton is unknown the proxy will buffer this - # number of messages and deliver them when the singleton is identified. + # number of messages and deliver them when the singleton is identified. # When the buffer is full old messages will be dropped when new messages are # sent via the proxy. # Use 0 to disable buffering, i.e. messages will be dropped immediately if # the location of the singleton is unknown. # Maximum allowed buffer size is 10000. - buffer-size = 1000 + buffer-size = 1000 } # //#singleton-proxy-config diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCluster.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCluster.approved.txt index 96ac4d2ebe0..f6b124bb75b 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCluster.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCluster.approved.txt @@ -92,6 +92,10 @@ namespace Akka.Cluster public override int GetHashCode() { } public override string ToString() { } } + public sealed class MemberDowned : Akka.Cluster.ClusterEvent.MemberStatusChange + { + public MemberDowned(Akka.Cluster.Member member) { } + } public sealed class MemberExited : Akka.Cluster.ClusterEvent.MemberStatusChange { public MemberExited(Akka.Cluster.Member member) { } diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApprovePersistence.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApprovePersistence.approved.txt index 2389db69aa1..a2b798b4688 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApprovePersistence.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApprovePersistence.approved.txt @@ -1,4 +1,5 @@ -[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Persistence.TCK")] +[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.Sharding")] +[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Persistence.TCK")] [assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Persistence.Tests")] [assembly: System.Runtime.InteropServices.ComVisibleAttribute(false)] [assembly: System.Runtime.InteropServices.GuidAttribute("e3bcba88-003c-4cda-8a60-f0c2553fe3c8")] diff --git a/src/core/Akka.Cluster.Tests/ClusterDomainEventSpec.cs b/src/core/Akka.Cluster.Tests/ClusterDomainEventSpec.cs index d9f8a80996e..049367583c8 100644 --- a/src/core/Akka.Cluster.Tests/ClusterDomainEventSpec.cs +++ b/src/core/Akka.Cluster.Tests/ClusterDomainEventSpec.cs @@ -157,6 +157,24 @@ public void DomainEvents_must_be_produced_for_members_becoming_reachable_after_u .BeEquivalentTo(ImmutableList.Create()); } + [Fact] + public void DomainEvents_must_be_produced_for_downed_members() + { + var t1 = Converge(new Gossip(ImmutableSortedSet.Create(aUp, eUp))); + var t2 = Converge(new Gossip(ImmutableSortedSet.Create(aUp, eDown))); + + var g1 = t1.Item1; + var g2 = t2.Item1; + + ClusterEvent.DiffMemberEvents(g1, g2) + .Should() + .BeEquivalentTo(ImmutableList.Create(new ClusterEvent.MemberDowned(eDown))); + + ClusterEvent.DiffUnreachable(g1, g2, selfDummyAddress) + .Should() + .BeEquivalentTo(ImmutableList.Create()); + } + [Fact] public void DomainEvents_must_be_produced_for_removed_members() { diff --git a/src/core/Akka.Cluster.Tests/ClusterSpec.cs b/src/core/Akka.Cluster.Tests/ClusterSpec.cs index 86a976eaff6..ab8c4065d89 100644 --- a/src/core/Akka.Cluster.Tests/ClusterSpec.cs +++ b/src/core/Akka.Cluster.Tests/ClusterSpec.cs @@ -593,6 +593,7 @@ public void A_cluster_must_terminate_ActorSystem_via_Down_CoordinatedShutdown() Cluster.Get(sys3).Down(Cluster.Get(sys3).SelfAddress); + probe.ExpectMsg(); probe.ExpectMsg(); AwaitCondition(() => sys3.WhenTerminated.IsCompleted, TimeSpan.FromSeconds(10)); Cluster.Get(sys3).IsTerminated.Should().BeTrue(); diff --git a/src/core/Akka.Cluster.Tests/GossipSpec.cs b/src/core/Akka.Cluster.Tests/GossipSpec.cs index 5864d29be7f..67e538c1735 100644 --- a/src/core/Akka.Cluster.Tests/GossipSpec.cs +++ b/src/core/Akka.Cluster.Tests/GossipSpec.cs @@ -142,7 +142,7 @@ public void A_gossip_must_merge_unreachable() var merged1 = g1.Merge(g2); merged1.Overview.Reachability.AllUnreachable.Should() .BeEquivalentTo(ImmutableHashSet.Create(a1.UniqueAddress, c1.UniqueAddress, d1.UniqueAddress)); - + var merged2 = g2.Merge(g1); merged2.Overview.Reachability.AllUnreachable.Should() .BeEquivalentTo(merged1.Overview.Reachability.AllUnreachable); @@ -223,6 +223,40 @@ public void A_gossip_must_know_who_is_youngest() var g3 = new Gossip(ImmutableSortedSet.Create(a2, b1.CopyUp(3), e2.CopyUp(4))); g3.YoungestMember.Should().Be(e2); } + + [Fact] + public void A_gossip_must_find_two_oldest_as_targets_for_Exiting_change() + { + Member a1 = TestMember.Create(new Address("akka.tcp", "sys", "a4", 2552), MemberStatus.Up, ImmutableHashSet.Empty, upNumber: 1); + Member a2 = TestMember.Create(new Address("akka.tcp", "sys", "a3", 2552), MemberStatus.Up, ImmutableHashSet.Empty, upNumber: 2); + Member a3 = TestMember.Create(new Address("akka.tcp", "sys", "a2", 2552), MemberStatus.Up, ImmutableHashSet.Empty, upNumber: 3); + Member a4 = TestMember.Create(new Address("akka.tcp", "sys", "a1", 2552), MemberStatus.Up, ImmutableHashSet.Empty, upNumber: 4); + + var a1Exiting = a1.Copy(MemberStatus.Leaving).Copy(MemberStatus.Exiting); + var gossip = new Gossip(ImmutableSortedSet.Create(a1Exiting, a2, a3, a4)); + var r = ClusterCoreDaemon.GossipTargetsForExitingMembers(gossip, new Member[] { a1Exiting }); + r.Should().BeEquivalentTo(new[] { a1Exiting, a2 }); + } + + [Fact] + public void A_gossip_must_find_two_oldest_per_role_as_targets_for_Exiting_change() + { + Member a1 = TestMember.Create(new Address("akka.tcp", "sys", "a4", 2552), MemberStatus.Up, ImmutableHashSet.Empty, upNumber: 1); + Member a2 = TestMember.Create(new Address("akka.tcp", "sys", "a3", 2552), MemberStatus.Up, ImmutableHashSet.Empty, upNumber: 2); + Member a3 = TestMember.Create(new Address("akka.tcp", "sys", "a2", 2552), MemberStatus.Up, ImmutableHashSet.Empty, upNumber: 3); + Member a4 = TestMember.Create(new Address("akka.tcp", "sys", "a1", 2552), MemberStatus.Up, ImmutableHashSet.Empty, upNumber: 4); + Member a5 = TestMember.Create(new Address("akka.tcp", "sys", "a5", 2552), MemberStatus.Exiting, ImmutableHashSet.Empty.Add("role1").Add("role2"), upNumber: 5); + Member a6 = TestMember.Create(new Address("akka.tcp", "sys", "a6", 2552), MemberStatus.Exiting, ImmutableHashSet.Empty.Add("role1").Add("role3"), upNumber: 6); + Member a7 = TestMember.Create(new Address("akka.tcp", "sys", "a7", 2552), MemberStatus.Exiting, ImmutableHashSet.Empty.Add("role1"), upNumber: 7); + Member a8 = TestMember.Create(new Address("akka.tcp", "sys", "a8", 2552), MemberStatus.Exiting, ImmutableHashSet.Empty.Add("role1"), upNumber: 8); + Member a9 = TestMember.Create(new Address("akka.tcp", "sys", "a9", 2552), MemberStatus.Exiting, ImmutableHashSet.Empty.Add("role2"), upNumber: 9); + + IEnumerable theExiting = new Member[] { a5, a6 }; + var gossip = new Gossip(ImmutableSortedSet.Create(a1, a2, a3, a4, a5, a6, a7, a8, a9)); + + var r = ClusterCoreDaemon.GossipTargetsForExitingMembers(gossip, theExiting); + r.Should().BeEquivalentTo(new[] { a1, a2, a5, a6, a9 }); + } } } diff --git a/src/core/Akka.Cluster/ClusterDaemon.cs b/src/core/Akka.Cluster/ClusterDaemon.cs index 14e9171ad79..a41cbac29c6 100644 --- a/src/core/Akka.Cluster/ClusterDaemon.cs +++ b/src/core/Akka.Cluster/ClusterDaemon.cs @@ -980,6 +980,7 @@ internal class ClusterCoreDaemon : UntypedActor, IRequiresMessageQueue _selfExiting = new TaskCompletionSource(); @@ -1532,7 +1534,7 @@ public void Joining(UniqueAddress node, ImmutableHashSet roles) } else if (Gossip.RemoveUnreachableWithMemberStatus.Contains(selfStatus)) { - _cluster.LogInfo("Trying to join [{0}] to [{1}] member, ignoring. Use a member that is Up instead.", + _cluster.LogInfo("Trying to join [{0}] to [{1}] member, ignoring. Use a member that is Up instead.", node, selfStatus); } else @@ -1583,7 +1585,7 @@ public void Joining(UniqueAddress node, ImmutableHashSet roles) UpdateLatestGossip(newGossip); - + if (node.Equals(SelfUniqueAddress)) { @@ -1944,7 +1946,8 @@ public ReceiveGossipType ReceiveGossip(GossipEnvelope envelope) // ExitingCompleted will be received via CoordinatedShutdown to continue // the leaving process. Meanwhile the gossip state is not marked as seen. _exitingTasksInProgress = true; - _cluster.LogInfo("Exiting, starting coordinated shutdown."); + if (_coordShutdown.ShutdownReason == null) + _cluster.LogInfo("Exiting, starting coordinated shutdown."); _selfExiting.TrySetResult(Done.Instance); _coordShutdown.Run(CoordinatedShutdown.ClusterLeavingReason.Instance); } @@ -2162,7 +2165,7 @@ private void ShutdownSelfWhenDown() var unreachable = _latestGossip.Overview.Reachability.AllUnreachableOrTerminated; var downed = _latestGossip.Members.Where(m => m.Status == MemberStatus.Down) .Select(m => m.UniqueAddress).ToList(); - if (downed.All(node => unreachable.Contains(node) || _latestGossip.SeenByNode(node))) + if (_selfDownCounter >= MaxTicksBeforeShuttingDownMyself || downed.All(node => unreachable.Contains(node) || _latestGossip.SeenByNode(node))) { // the reason for not shutting down immediately is to give the gossip a chance to spread // the downing information to other downed nodes, so that they can shutdown themselves @@ -2172,6 +2175,10 @@ private void ShutdownSelfWhenDown() SendGossipRandom(MaxGossipsBeforeShuttingDownMyself); Shutdown(); } + else + { + _selfDownCounter++; + } } } @@ -2290,7 +2297,8 @@ public void LeaderActionsOnConvergence() // the leaving process. Meanwhile the gossip state is not marked as seen. _exitingTasksInProgress = true; - _cluster.LogInfo("Exiting (leader), starting coordinated shutdown."); + if (_coordShutdown.ShutdownReason == null) + _cluster.LogInfo("Exiting (leader), starting coordinated shutdown."); _selfExiting.TrySetResult(Done.Instance); _coordShutdown.Run(CoordinatedShutdown.ClusterLeavingReason.Instance); } @@ -2315,6 +2323,26 @@ public void LeaderActionsOnConvergence() } Publish(_latestGossip); + GossipExitingMembersToOldest(changedMembers.Where(i => i.Status == MemberStatus.Exiting)); + } + } + + /// + /// Gossip the Exiting change to the two oldest nodes for quick dissemination to potential Singleton nodes + /// + /// + private void GossipExitingMembersToOldest(IEnumerable exitingMembers) + { + var targets = GossipTargetsForExitingMembers(_latestGossip, exitingMembers); + if (targets != null && targets.Any()) + { + if (_log.IsDebugEnabled) + _log.Debug( + "Cluster Node [{0}] - Gossip exiting members [{1}] to the two oldest (per role) [{2}] (singleton optimization).", + SelfUniqueAddress, string.Join(", ", exitingMembers), string.Join(", ", targets)); + + foreach (var m in targets) + GossipTo(m.UniqueAddress); } } @@ -2465,6 +2493,35 @@ public bool ValidNodeForGossip(UniqueAddress node) _latestGossip.ReachabilityExcludingDownedObservers.Value.IsReachable(node); } + /// + /// The Exiting change is gossiped to the two oldest nodes for quick dissemination to potential Singleton nodes + /// + /// + /// + /// + public static IEnumerable GossipTargetsForExitingMembers(Gossip latestGossip, IEnumerable exitingMembers) + { + if (exitingMembers.Any()) + { + var roles = exitingMembers.SelectMany(m => m.Roles); + var membersSortedByAge = latestGossip.Members.OrderBy(m => m, Member.AgeOrdering); + var targets = new HashSet(); + + var t = membersSortedByAge.Take(2).ToArray(); // 2 oldest of all nodes + targets.UnionWith(t); + + foreach (var role in roles) + { + t = membersSortedByAge.Where(i => i.HasRole(role)).Take(2).ToArray(); // 2 oldest with the role + if (t.Length > 0) + targets.UnionWith(t); + } + + return targets; + } + return null; + } + /// /// Updates the local gossip with the latest received from over the network. /// diff --git a/src/core/Akka.Cluster/ClusterEvent.cs b/src/core/Akka.Cluster/ClusterEvent.cs index 666851045c0..17c79bad563 100644 --- a/src/core/Akka.Cluster/ClusterEvent.cs +++ b/src/core/Akka.Cluster/ClusterEvent.cs @@ -32,7 +32,7 @@ public class ClusterEvent /// public enum SubscriptionInitialStateMode { - /// + /// /// When using this subscription mode a snapshot of /// will be sent to the /// subscriber as the first message. @@ -84,7 +84,7 @@ public CurrentClusterState() : this( ImmutableHashSet
.Empty, null, ImmutableDictionary.Empty) - {} + { } /// /// Creates a new instance of the current cluster state. @@ -341,6 +341,22 @@ public MemberExited(Member member) } } + /// + /// Member status changed to `MemberStatus.Down` and will be removed + /// when all members have seen the `Down` status. + /// + public sealed class MemberDowned : MemberStatusChange + { + /// + /// Initializes a new instance of the class. + /// + /// The node that changed state. + public MemberDowned(Member member) + : base(member, MemberStatus.Down) + { + } + } + /// /// /// This class represents a event where the @@ -362,8 +378,8 @@ public sealed class MemberRemoved : MemberStatusChange /// /// The status of the node before the state change event. /// - public MemberStatus PreviousStatus - { + public MemberStatus PreviousStatus + { get { return _previousStatus; } } @@ -397,7 +413,7 @@ public override int GetHashCode() unchecked { var hash = 17; - hash = hash * + base.GetHashCode(); + hash = hash * +base.GetHashCode(); hash = hash * 23 + _previousStatus.GetHashCode(); return hash; } @@ -508,7 +524,7 @@ public override bool Equals(object obj) { var other = obj as RoleLeaderChanged; if (other == null) return false; - return _role.Equals(other._role) + return _role.Equals(other._role) && ((_leader == null && other._leader == null) || (_leader != null && _leader.Equals(other._leader))); } @@ -852,8 +868,8 @@ internal static ImmutableList DiffMemberEvents(Gossip oldGossip, G .GroupBy(m => m.UniqueAddress); var changedMembers = membersGroupedByAddress - .Where(g => g.Count() == 2 - && (g.First().Status != g.Skip(1).First().Status + .Where(g => g.Count() == 2 + && (g.First().Status != g.Skip(1).First().Status || g.First().UpNumber != g.Skip(1).First().UpNumber)) .Select(g => g.First()); @@ -885,6 +901,9 @@ private static IEnumerable CollectMemberEvents(IEnumerable case MemberStatus.Exiting: yield return new MemberExited(member); break; + case MemberStatus.Down: + yield return new MemberDowned(member); + break; } } } @@ -978,7 +997,7 @@ internal static ImmutableList DiffReachability(Gossip oldGo /// /// INTERNAL API. - /// + /// /// Publishes s out to all subscribers. /// internal sealed class ClusterDomainEventPublisher : ReceiveActor, IRequiresMessageQueue @@ -1019,7 +1038,7 @@ protected override void PostStop() private readonly EventStream _eventStream; /// - /// The current snapshot state corresponding to latest gossip + /// The current snapshot state corresponding to latest gossip /// to mimic what you would have seen if you were listening to the events. /// private void SendCurrentClusterState(IActorRef receiver) @@ -1054,7 +1073,7 @@ private void Subscribe(IActorRef subscriber, ClusterEvent.SubscriptionInitialSta }; PublishDiff(Gossip.Empty, _latestGossip, pub); } - else if(initMode == ClusterEvent.SubscriptionInitialStateMode.InitialStateAsSnapshot) + else if (initMode == ClusterEvent.SubscriptionInitialStateMode.InitialStateAsSnapshot) { SendCurrentClusterState(subscriber); } diff --git a/src/core/Akka.Cluster/CoordinatedShutdownLeave.cs b/src/core/Akka.Cluster/CoordinatedShutdownLeave.cs index 6fb3534237b..5148e2121ac 100644 --- a/src/core/Akka.Cluster/CoordinatedShutdownLeave.cs +++ b/src/core/Akka.Cluster/CoordinatedShutdownLeave.cs @@ -45,7 +45,7 @@ public CoordinatedShutdownLeave() { // MemberRemoved is needed in case it was downed instead _cluster.Leave(_cluster.SelfAddress); - _cluster.Subscribe(Self, typeof(ClusterEvent.MemberLeft), typeof(ClusterEvent.MemberRemoved)); + _cluster.Subscribe(Self, typeof(ClusterEvent.MemberLeft), typeof(ClusterEvent.MemberRemoved), typeof(ClusterEvent.MemberDowned)); var s = Sender; Become(() => WaitingLeaveCompleted(s)); }); diff --git a/src/core/Akka.Persistence/Eventsourced.cs b/src/core/Akka.Persistence/Eventsourced.cs index aba68f43706..012cc871b50 100644 --- a/src/core/Akka.Persistence/Eventsourced.cs +++ b/src/core/Akka.Persistence/Eventsourced.cs @@ -40,7 +40,7 @@ public StashingHandlerInvocation(object evt, Action handler) /// /// Unlike this one does not force actor to stash commands. - /// Originates from + /// Originates from /// or method calls. /// public sealed class AsyncHandlerInvocation : IPendingHandlerInvocation @@ -132,7 +132,7 @@ protected Eventsourced() /// Called when the persistent actor is started for the first time. /// The returned object defines how the actor /// will recover its persistent state before handling the first incoming message. - /// + /// /// To skip recovery completely return . /// public virtual Recovery Recovery => Recovery.Default; @@ -187,7 +187,7 @@ public IStash Stash public bool IsRecoveryFinished => !IsRecovering; /// - /// Highest received sequence number so far or `0L` if this actor + /// Highest received sequence number so far or `0L` if this actor /// hasn't replayed or stored any persistent events yet. /// public long LastSequenceNr { get; private set; } @@ -211,7 +211,7 @@ public void LoadSnapshot(string persistenceId, SnapshotSelectionCriteria criteri /// /// Saves of current state. - /// + /// /// The will be notified about the success or failure of this /// via an or message. /// @@ -223,7 +223,7 @@ public void SaveSnapshot(object snapshot) /// /// Deletes the snapshot identified by . - /// + /// /// The will be notified about the status of the deletion /// via an or message. /// @@ -235,7 +235,7 @@ public void DeleteSnapshot(long sequenceNr) /// /// Deletes all snapshots matching . - /// + /// /// The will be notified about the status of the deletion /// via an or message. /// @@ -245,14 +245,14 @@ public void DeleteSnapshots(SnapshotSelectionCriteria criteria) SnapshotStore.Tell(new DeleteSnapshots(SnapshotterId, criteria)); } - /// - /// Recovery handler that receives persistent events during recovery. If a state snapshot has been captured and saved, + /// + /// Recovery handler that receives persistent events during recovery. If a state snapshot has been captured and saved, /// this handler will receive a message followed by events that are younger than offer itself. - /// + /// /// This handler must not have side-effects other than changing persistent actor state i.e. it /// should not perform actions that may fail, such as interacting with external services, /// for example. - /// + /// /// If there is a problem with recovering the state of the actor from the journal, the error /// will be logged and the actor will be stopped. /// @@ -268,21 +268,21 @@ public void DeleteSnapshots(SnapshotSelectionCriteria criteria) /// TBD protected abstract bool ReceiveCommand(object message); - /// + /// /// Asynchronously persists an . On successful persistence, the /// is called with the persisted event. This method guarantees that no new commands will be received by a persistent actor /// between a call to and execution of its handler. It also /// holds multiple persist calls per received command. Internally this is done by stashing. The stash used /// for that is an internal stash which doesn't interfere with the inherited user stash. - /// - /// + /// + /// /// An event may close over eventsourced actor state and modify it. Sender of the persistent event /// is considered a sender of the corresponding command. That means one can respond to sender from within an event handler. - /// - /// - /// Within an event handler, applications usually update persistent actor state using + /// + /// + /// Within an event handler, applications usually update persistent actor state using /// persisted event data, notify listeners and reply to command senders. - /// + /// /// /// If persistence of an event fails, will be invoked and the actor will /// unconditionally be stopped. The reason that it cannot resume when persist fails is that it @@ -338,24 +338,24 @@ public void PersistAll(IEnumerable events, Action handle _eventBatch.AddFirst(new AtomicWrite(persistents.ToImmutable())); } - /// + /// /// Asynchronously persists an . On successful persistence, the /// is called with the persisted event. Unlike method, /// this one will continue to receive incoming commands between calls and executing it's event . - /// - /// - /// This version should be used in favor of + /// + /// + /// This version should be used in favor of /// method when throughput is more important that commands execution precedence. - /// - /// + /// + /// /// An event may close over eventsourced actor state and modify it. Sender of the persistent event /// is considered a sender of the corresponding command. That means, one can respond to sender from within an event handler. - /// - /// - /// Within an event handler, applications usually update persistent actor state using + /// + /// + /// Within an event handler, applications usually update persistent actor state using /// persisted event data, notify listeners and reply to command senders. - /// - /// + /// + /// /// If persistence of an event fails, will be invoked and the actor will /// unconditionally be stopped. The reason that it cannot resume when persist fails is that it /// is unknown if the event was actually persisted or not, and therefore it is in an inconsistent @@ -406,20 +406,20 @@ public void PersistAllAsync(IEnumerable events, Action h } /// - /// Defer the execution until all pending handlers have been executed. + /// Defer the execution until all pending handlers have been executed. /// Allows to define logic within the actor, which will respect the invocation-order-guarantee /// in respect to calls. /// That is, if was invoked before /// , the corresponding handlers will be /// invoked in the same order as they were registered in. - /// + /// /// This call will NOT result in being persisted, use /// or /// instead if the given /// should be possible to replay. - /// + /// /// If there are no pending persist handler calls, the will be called immediately. - /// + /// /// If persistence of an earlier event fails, the persistent actor will stop, and the /// will not be run. /// @@ -455,6 +455,30 @@ public void DeleteMessages(long toSequenceNr) Journal.Tell(new DeleteMessagesTo(PersistenceId, toSequenceNr, Self)); } + /// + /// An actor can request cleanup by deleting either a range of, or all persistent events. + /// For example, on successful snapshot completion, delete messages within a configurable + /// range that are less than or equal to the given + /// (provided the is <= to ). + /// + /// Or delete all by using `long.MaxValue` as the `toSequenceNr` + /// {{{ m.copy(sequenceNr = long.MaxValue) }}} + /// + /// + /// + /// + internal void InternalDeleteMessagesBeforeSnapshot(SaveSnapshotSuccess e, int keepNrOfBatches, int snapshotAfter) + { + // Delete old events but keep the latest around + // 1. It's not safe to delete all events immediately because snapshots are typically stored with + // a weaker consistency level. A replay might "see" the deleted events before it sees the stored + // snapshot, i.e. it could use an older snapshot and not replay the full sequence of events + // 2. If there is a production failure, it's useful to be able to inspect the events while debugging + var sequenceNr = e.Metadata.SequenceNr - keepNrOfBatches * snapshotAfter; + if (sequenceNr > 0) + DeleteMessages(sequenceNr); + } + /// /// Called whenever a message replay succeeds. /// diff --git a/src/core/Akka.Persistence/Properties/AssemblyInfo.cs b/src/core/Akka.Persistence/Properties/AssemblyInfo.cs index 7fd712a46fe..5e185d9d376 100644 --- a/src/core/Akka.Persistence/Properties/AssemblyInfo.cs +++ b/src/core/Akka.Persistence/Properties/AssemblyInfo.cs @@ -9,12 +9,12 @@ using System.Runtime.CompilerServices; using System.Runtime.InteropServices; -// General Information about an assembly is controlled through the following +// General Information about an assembly is controlled through the following // set of attributes. Change these attribute values to modify the information // associated with an assembly. -// Setting ComVisible to false makes the types in this assembly not visible -// to COM components. If you need to access a type in this assembly from +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from // COM, set the ComVisible attribute to true on that type. [assembly: ComVisible(false)] @@ -23,3 +23,4 @@ [assembly: InternalsVisibleTo("Akka.Persistence.Tests")] [assembly: InternalsVisibleTo("Akka.Persistence.TCK")] +[assembly: InternalsVisibleTo("Akka.Cluster.Sharding")]