Skip to content

Commit

Permalink
Updated ClusterSingletonProxy to also use OldestChangedBufferState
Browse files Browse the repository at this point in the history
  • Loading branch information
Aaronontheweb committed May 31, 2024
1 parent 01d8e57 commit 620e533
Showing 1 changed file with 80 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
// </copyright>
//-----------------------------------------------------------------------

#nullable enable
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
Expand Down Expand Up @@ -40,15 +41,15 @@ namespace Akka.Cluster.Tools.Singleton
public sealed class ClusterSingletonProxy : ReceiveActor
{
/// <summary>
/// TBD
/// Message used to tell the ClusterSingletonProxy to locate the currently active singleton.
/// </summary>
internal sealed class TryToIdentifySingleton : INoSerializationVerificationNeeded
{
/// <summary>
/// TBD
/// </summary>
public static TryToIdentifySingleton Instance { get; } = new();
private TryToIdentifySingleton() { }

private TryToIdentifySingleton()
{
}
}

/// <summary>
Expand All @@ -57,7 +58,8 @@ private TryToIdentifySingleton() { }
/// <returns>TBD</returns>
public static Config DefaultConfig()
{
return ConfigurationFactory.FromResource<ClusterSingletonManager>("Akka.Cluster.Tools.Singleton.reference.conf");
return ConfigurationFactory.FromResource<ClusterSingletonManager>(
"Akka.Cluster.Tools.Singleton.reference.conf");
}

/// <summary>
Expand All @@ -68,7 +70,7 @@ public static Config DefaultConfig()
/// which ends with the name you defined in `actorOf` when creating the <see cref="ClusterSingletonManager"/>.
/// </param>
/// <param name="settings">Cluster singleton proxy settings.</param>
/// <returns>TBD</returns>
/// <returns>The props for the singleton proxy</returns>
public static Props Props(string singletonManagerPath, ClusterSingletonProxySettings settings)
{
return Actor.Props.Create(() => new ClusterSingletonProxy(singletonManagerPath, settings))
Expand All @@ -83,16 +85,11 @@ public static Props Props(string singletonManagerPath, ClusterSingletonProxySett
private readonly string[] _singletonPath;
private int _identityCounter = 0;
private string _identityId;
private IActorRef _singleton = null;
private ICancelable _identityTimer = null;
private ImmutableSortedSet<Member> _membersByAge;
private IActorRef? _singleton;
private ICancelable? _identityTimer;
private OldestChangedBufferState _state;
private ILoggingAdapter _log;

/// <summary>
/// TBD
/// </summary>
/// <param name="singletonManagerPath">TBD</param>
/// <param name="settings">TBD</param>
public ClusterSingletonProxy(string singletonManagerPath, ClusterSingletonProxySettings settings)
{
_settings = settings;
Expand All @@ -102,7 +99,8 @@ public ClusterSingletonProxy(string singletonManagerPath, ClusterSingletonProxyS
_memberAgeComparer = settings.ConsiderAppVersion
? MemberAgeOrdering.DescendingWithAppVersion
: MemberAgeOrdering.Descending;
_membersByAge = ImmutableSortedSet<Member>.Empty.WithComparer(_memberAgeComparer);
_state = new OldestChangedBufferState(ImmutableSortedSet<Member>.Empty.WithComparer(_memberAgeComparer),
settings.Role);

Receive<ClusterEvent.CurrentClusterState>(s => HandleInitial(s));
Receive<ClusterEvent.MemberUp>(m => Add(m.Member));
Expand All @@ -119,63 +117,58 @@ public ClusterSingletonProxy(string singletonManagerPath, ClusterSingletonProxyS
/* do nothing */
});
Receive<ActorIdentity>(identity =>
{
if (identity.Subject != null)
{
if (identity.Subject != null)
{
// if the new singleton is defined, deliver all buffered messages
var subject = identity.Subject;
Log.Info("Singleton identified at [{0}]", subject.Path);
_singleton = subject;
Context.Watch(subject);
CancelTimer();
SendBuffered();
}
});
// if the new singleton is defined, deliver all buffered messages
var subject = identity.Subject;
Log.Info("Singleton identified at [{0}]", subject.Path);
_singleton = subject;
Context.Watch(subject);
CancelTimer();
SendBuffered();
}
});
Receive<TryToIdentifySingleton>(_ =>
{
var oldest = _membersByAge.FirstOrDefault();
if (oldest != null && _identityTimer != null)
{
var singletonAddress = new RootActorPath(oldest.Address) / _singletonPath;
Log.Debug("Trying to identify singleton at [{0}]", singletonAddress);
Context.ActorSelection(singletonAddress).Tell(new Identify(_identityId));
}
});
{
var oldest = _state.CurrentOldest;
if (oldest != null && _identityTimer != null)
{
var singletonAddress = new RootActorPath(oldest.Address) / _singletonPath;
Log.Debug("Trying to identify singleton at [{0}]", singletonAddress);
Context.ActorSelection(singletonAddress).Tell(new Identify(_identityId));
}
});
Receive<Terminated>(terminated =>
{
if (Equals(_singleton, terminated.ActorRef))
{
if (Equals(_singleton, terminated.ActorRef))
{
// buffering mode, identification of new will start when old node is removed
_singleton = null;
}
});
// buffering mode, identification of new will start when old node is removed
_singleton = null;
}
});
ReceiveAny(msg =>
{
if (_singleton != null)
{
if (_singleton != null)
{
if (Log.IsDebugEnabled)
Log.Debug("Forwarding message of type [{0}] to current singleton instance at [{1}]", msg.GetType(), _singleton.Path);
_singleton.Forward(msg);
}
else
Buffer(msg);
});
if (Log.IsDebugEnabled)
Log.Debug("Forwarding message of type [{0}] to current singleton instance at [{1}]",
msg.GetType(), _singleton.Path);
_singleton.Forward(msg);
}
else
Buffer(msg);
});
}

private ILoggingAdapter Log => _log ??= Context.GetLogger();

/// <summary>
/// TBD
/// </summary>
protected override void PreStart()
{
CancelTimer();
_cluster.Subscribe(Self, typeof(ClusterEvent.IMemberEvent));
}

/// <summary>
/// TBD
/// </summary>
protected override void PostStop()
{
CancelTimer();
Expand All @@ -200,10 +193,19 @@ private bool MatchingRole(Member member)

private void HandleInitial(ClusterEvent.CurrentClusterState state)
{
TrackChanges(() =>
_membersByAge = state.Members
.Where(m => m.Status == MemberStatus.Up && MatchingRole(m))
.ToImmutableSortedSet(_memberAgeComparer));
var membersByAge = state.Members
.Where(m => m.Status == MemberStatus.Up && MatchingRole(m))
.ToImmutableSortedSet(_memberAgeComparer);

_state = _state with { MembersByAge = membersByAge};

// compute the initial oldest
var (newState, _) = _state.ComputeNextOldest();
_state = newState;

// if the oldest is defined, start the identification process
if (_state.CurrentOldest != null)
IdentifySingleton();
}

// Discard old singleton ActorRef and send a periodic message to self to identify the singleton.
Expand All @@ -221,31 +223,27 @@ private void IdentifySingleton()
message: TryToIdentifySingleton.Instance,
sender: Self);
}

private void TrackChanges(Action block)
{
var before = _membersByAge.FirstOrDefault();
block();
var after = _membersByAge.FirstOrDefault();

// if the head has changed, I need to find the new singleton
if (!Equals(before, after)) IdentifySingleton();
}


private void Add(Member member)
{
if (MatchingRole(member))
TrackChanges(() =>
{
_membersByAge = _membersByAge.Remove(member); //replace
_membersByAge = _membersByAge.Add(member);
});
{
var (newState, oldestChanged) = _state.AddMember(member);
_state = newState;
if (oldestChanged)
IdentifySingleton();
}
}

private void Remove(Member member)
{
if (MatchingRole(member))
TrackChanges(() => _membersByAge = _membersByAge.Remove(member));
{
var (newState, oldestChanged) = _state.RemoveMember(member);
_state = newState;
if (oldestChanged)
IdentifySingleton();
}
}

private string CreateIdentifyId(int i)
Expand All @@ -256,7 +254,8 @@ private string CreateIdentifyId(int i)
private void Buffer(object message)
{
if (_settings.BufferSize == 0)
Log.Debug("Singleton not available and buffering is disabled, dropping message [{0}]", message.GetType());
Log.Debug("Singleton not available and buffering is disabled, dropping message [{0}]",
message.GetType());
else if (_buffer.Count == _settings.BufferSize)
{
var first = _buffer.Dequeue();
Expand All @@ -276,8 +275,8 @@ private void SendBuffered()
while (_buffer.Count != 0)
{
var pair = _buffer.Dequeue();
_singleton.Tell(pair.Key, pair.Value);
_singleton?.Tell(pair.Key, pair.Value);
}
}
}
}
}

0 comments on commit 620e533

Please sign in to comment.