Skip to content

Commit

Permalink
Separate wire protocol from internal models (#6206)
Browse files Browse the repository at this point in the history
* Separate wire protocol from internal models

* Move .proto file and fix wire compat
  • Loading branch information
Arkatufus authored Oct 27, 2022
1 parent 9f84438 commit 61df6fc
Show file tree
Hide file tree
Showing 12 changed files with 997 additions and 650 deletions.
3 changes: 2 additions & 1 deletion build.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,8 @@ Target "Protobuf" <| fun _ ->
("Persistence.proto", "/src/core/Akka.Persistence/Serialization/Proto/");
("StreamRefMessages.proto", "/src/core/Akka.Streams/Serialization/Proto/");
("ReplicatorMessages.proto", "/src/contrib/cluster/Akka.DistributedData/Serialization/Proto/");
("ReplicatedDataMessages.proto", "/src/contrib/cluster/Akka.DistributedData/Serialization/Proto/"); ]
("ReplicatedDataMessages.proto", "/src/contrib/cluster/Akka.DistributedData/Serialization/Proto/")
("ClusterMetricsMessages.proto", "/src/contrib/cluster/Akka.Cluster.Metrics/Serialization/Proto/") ]

printfn "Using proto.exe: %s" protocPath

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ Option<WeightedRoutees> UpdateWeightedRoutees()
///
/// The supervision strategy of the router actor can be configured with
/// [[#withSupervisorStrategy]]. If no strategy is provided, routers default to
/// a strategy of “always escalate”. This means that errors are passed up to the
/// a strategy of [[always escalate]]. This means that errors are passed up to the
/// router's supervisor for handling.
///
/// The router's supervisor will treat the error as an error with the router itself.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ public interface IClusterMetricMessage { }
/// Envelope adding a sender address to the cluster metrics gossip.
/// </summary>
[InternalApi]
public sealed partial class MetricsGossipEnvelope : IClusterMetricMessage, IDeadLetterSuppression
public sealed class MetricsGossipEnvelope : IClusterMetricMessage, IDeadLetterSuppression
{
/// <summary>
/// Akka's actor address
/// </summary>
public Actor.Address FromAddress { get; }
public MetricsGossip Gossip { get; }
public bool Reply { get; }

/// <summary>
/// Creates new instance of <see cref="MetricsGossipEnvelope"/>
Expand Down

Large diffs are not rendered by default.

33 changes: 28 additions & 5 deletions src/contrib/cluster/Akka.Cluster.Metrics/Serialization/EWMA.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@ public static partial class Types
/// the sampled value resulting from the previous smoothing iteration.
/// This value is always used as the previous EWMA to calculate the new EWMA.
/// </summary>
public sealed partial class EWMA
public sealed class EWMA : IEquatable<EWMA>
{
public double Value { get; }
public double Alpha { get; }

/// <summary>
/// Creates new instance of <see cref="EWMA"/>
/// </summary>
Expand All @@ -47,10 +50,10 @@ public sealed partial class EWMA
public EWMA(double value, double alpha)
{
if (alpha < 0 || alpha > 1)
throw new ArgumentException(nameof(alpha), "alpha must be between 0.0 and 1.0");
throw new ArgumentException("alpha must be between 0.0 and 1.0", nameof(alpha));

value_ = value;
alpha_ = alpha;
Value = value;
Alpha = alpha;
}

/// <summary>
Expand Down Expand Up @@ -83,11 +86,31 @@ public static double GetAlpha(TimeSpan halfLife, TimeSpan collectInterval)

var halfLifeMillis = halfLife.TotalMilliseconds;
if (halfLifeMillis <= 0)
throw new ArgumentException(nameof(halfLife), "halfLife must be > 0 s");
throw new ArgumentException("halfLife must be > 0 s", nameof(halfLife));

var decayRate = logOf2 / halfLifeMillis;
return 1 - Math.Exp(-decayRate * collectInterval.TotalMilliseconds);
}

public bool Equals(EWMA other)
{
if (ReferenceEquals(null, other)) return false;
if (ReferenceEquals(this, other)) return true;
return Value.Equals(other.Value) && Alpha.Equals(other.Alpha);
}

public override bool Equals(object obj)
{
return obj is EWMA other && Equals(other);
}

public override int GetHashCode()
{
unchecked
{
return (Value.GetHashCode() * 397) ^ Alpha.GetHashCode();
}
}
}
}
}
Expand Down
15 changes: 4 additions & 11 deletions src/contrib/cluster/Akka.Cluster.Metrics/Serialization/Metric.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public static partial class Types
///
/// Equality of Metric is based on its name index.
/// </summary>
public sealed partial class Metric
public sealed class Metric: IEquatable<Metric>
{
/// <summary>
/// Metric average value
Expand Down Expand Up @@ -79,7 +79,6 @@ public Metric(string name, AnyNumber value, Option<EWMA> average)
Name = name;
Value = value;
Average = average;
ewma_ = average.HasValue ? average.Value : default(EWMA);
}

/// <summary>
Expand Down Expand Up @@ -163,21 +162,15 @@ public static Either<long, double> ConvertNumber(AnyNumber number)
}
}

/*
* Two methods below, Equals and GetHashCode, should be used instead of generated in ClusterMetrics.Messages.g.cs
* file. Since we do not have an option to not generate those methods for this particular class,
* just stip them from generated code and paste here, with adding Address property check
*/
public override bool Equals(object obj)
=> obj is Metric other && Equals(other);



public bool Equals(Metric other)
{
if (ReferenceEquals(null, other)) return false;
if (ReferenceEquals(this, other)) return true;
return Name == other.Name;
return Name.Equals(other.Name);
}


public override int GetHashCode()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ namespace Akka.Cluster.Metrics.Serialization
/// Metrics gossip message
/// </summary>
[InternalApi]
public sealed partial class MetricsGossip
public sealed class MetricsGossip
{
public IImmutableSet<NodeMetrics> Nodes { get; private set; } = ImmutableHashSet<NodeMetrics>.Empty;
public IImmutableSet<NodeMetrics> Nodes { get; }

/// <summary>
/// Empty metrics gossip
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
using System.Collections.Immutable;
using System.Linq;
using Akka.Util;
using Google.Protobuf.Collections;

namespace Akka.Cluster.Metrics.Serialization
{
Expand All @@ -20,9 +19,11 @@ namespace Akka.Cluster.Metrics.Serialization
///
/// Equality of NodeMetrics is based on its address.
/// </summary>
public sealed partial class NodeMetrics
public sealed partial class NodeMetrics : IEquatable<NodeMetrics>
{
public Actor.Address Address { get; private set; }
public Actor.Address Address { get; }
public ImmutableList<Types.Metric> Metrics { get; }
public long Timestamp { get; }

/// <summary>
/// Creates new instance of <see cref="NodeMetrics"/>
Expand All @@ -33,9 +34,8 @@ public sealed partial class NodeMetrics
public NodeMetrics(Actor.Address address, long timestamp, IEnumerable<Types.Metric> metrics)
{
Address = address;
timestamp_ = timestamp;
metrics_ = new RepeatedField<Types.Metric>();
metrics_.AddRange(metrics);
Timestamp = timestamp;
Metrics = metrics.ToImmutableList();
}

/// <summary>
Expand Down Expand Up @@ -93,19 +93,19 @@ public NodeMetrics Update(NodeMetrics that)
* just stip them from generated code and paste here, with adding Address property check
*/


public override bool Equals(object obj)
=> obj is NodeMetrics other && Equals(other);

public bool Equals(NodeMetrics other)
{
if (ReferenceEquals(null, other)) return false;
if (ReferenceEquals(this, other)) return true;
return Equals(Address, other.Address);
return Address.Equals(other.Address);
}


public override int GetHashCode()
{
return (Address != null ? Address.GetHashCode() : 0);
return Address.GetHashCode();
}
}
}
Loading

0 comments on commit 61df6fc

Please sign in to comment.