Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report cause for Akka/IO TCP CommandFailed events #6221

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3311,6 +3311,11 @@ namespace Akka.IO
public static byte[] op_Explicit(Akka.IO.ByteString byteString) { }
public static bool !=(Akka.IO.ByteString x, Akka.IO.ByteString y) { }
}
[Akka.Annotations.InternalApiAttribute()]
public class ConnectException : System.Exception
{
public ConnectException(string message) { }
}
public class Dns : Akka.Actor.ExtensionIdProvider<Akka.IO.DnsExt>
{
public static readonly Akka.IO.Dns Instance;
Expand Down Expand Up @@ -3525,8 +3530,13 @@ namespace Akka.IO
public sealed class CommandFailed : Akka.IO.Tcp.Event
{
public CommandFailed(Akka.IO.Tcp.Command cmd) { }
public Akka.Util.Option<System.Exception> Cause { get; }
[Akka.Annotations.InternalApiAttribute()]
public string CauseString { get; }
public Akka.IO.Tcp.Command Cmd { get; }
public override string ToString() { }
[Akka.Annotations.InternalApiAttribute()]
public Akka.IO.Tcp.CommandFailed WithCause(System.Exception cause) { }
}
public class CompoundWrite : Akka.IO.Tcp.WriteCommand, System.Collections.Generic.IEnumerable<Akka.IO.Tcp.SimpleWriteCommand>, System.Collections.IEnumerable
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3318,6 +3318,11 @@ namespace Akka.IO
public static byte[] op_Explicit(Akka.IO.ByteString byteString) { }
public static bool !=(Akka.IO.ByteString x, Akka.IO.ByteString y) { }
}
[Akka.Annotations.InternalApiAttribute()]
public class ConnectException : System.Exception
{
public ConnectException(string message) { }
}
public class Dns : Akka.Actor.ExtensionIdProvider<Akka.IO.DnsExt>
{
public static readonly Akka.IO.Dns Instance;
Expand Down Expand Up @@ -3532,8 +3537,13 @@ namespace Akka.IO
public sealed class CommandFailed : Akka.IO.Tcp.Event
{
public CommandFailed(Akka.IO.Tcp.Command cmd) { }
public Akka.Util.Option<System.Exception> Cause { get; }
[Akka.Annotations.InternalApiAttribute()]
public string CauseString { get; }
public Akka.IO.Tcp.Command Cmd { get; }
public override string ToString() { }
[Akka.Annotations.InternalApiAttribute()]
public Akka.IO.Tcp.CommandFailed WithCause(System.Exception cause) { }
}
public class CompoundWrite : Akka.IO.Tcp.WriteCommand, System.Collections.Generic.IEnumerable<Akka.IO.Tcp.SimpleWriteCommand>, System.Collections.IEnumerable
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3311,6 +3311,11 @@ namespace Akka.IO
public static byte[] op_Explicit(Akka.IO.ByteString byteString) { }
public static bool !=(Akka.IO.ByteString x, Akka.IO.ByteString y) { }
}
[Akka.Annotations.InternalApiAttribute()]
public class ConnectException : System.Exception
{
public ConnectException(string message) { }
}
public class Dns : Akka.Actor.ExtensionIdProvider<Akka.IO.DnsExt>
{
public static readonly Akka.IO.Dns Instance;
Expand Down Expand Up @@ -3525,8 +3530,13 @@ namespace Akka.IO
public sealed class CommandFailed : Akka.IO.Tcp.Event
{
public CommandFailed(Akka.IO.Tcp.Command cmd) { }
public Akka.Util.Option<System.Exception> Cause { get; }
[Akka.Annotations.InternalApiAttribute()]
public string CauseString { get; }
public Akka.IO.Tcp.Command Cmd { get; }
public override string ToString() { }
[Akka.Annotations.InternalApiAttribute()]
public Akka.IO.Tcp.CommandFailed WithCause(System.Exception cause) { }
}
public class CompoundWrite : Akka.IO.Tcp.WriteCommand, System.Collections.Generic.IEnumerable<Akka.IO.Tcp.SimpleWriteCommand>, System.Collections.IEnumerable
{
Expand Down
32 changes: 24 additions & 8 deletions src/core/Akka/IO/Tcp.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
using System.Linq;
using System.Net;
using Akka.Actor;
using Akka.Annotations;
using Akka.Configuration;
using Akka.Dispatch;
using Akka.Event;
using Akka.IO.Buffers;
using Akka.Util;

namespace Akka.IO
{
Expand Down Expand Up @@ -85,7 +87,7 @@ private SocketConnected() { }
public class Message : INoSerializationVerificationNeeded { }

#region user commands

// COMMANDS
/// <summary>
/// TBD
Expand Down Expand Up @@ -733,7 +735,7 @@ public override string ToString() =>
#endregion

#region user events

/// <summary>
/// Common interface for all events generated by the TCP layer actors.
/// </summary>
Expand Down Expand Up @@ -808,18 +810,32 @@ public sealed class CommandFailed : Event
/// TBD
/// </summary>
/// <param name="cmd">TBD</param>
public CommandFailed(Command cmd)
{
Cmd = cmd;
}
public CommandFailed(Command cmd) => Cmd = cmd;

/// <summary>
/// TBD
/// </summary>
public Command Cmd { get; }

public override string ToString() =>
$"CommandFailed({Cmd})";
/// <summary>
/// Optionally contains the cause why the command failed.
/// </summary>
public Option<Exception> Cause { get; private set; } = Option<Exception>.None;

/// <summary>
/// Creates a copy of this object with a new cause set.
/// </summary>
[InternalApi]
public CommandFailed WithCause(Exception cause)
{
// Needs to be added with a mutable property for compatibility reasons
return new CommandFailed(Cmd) { Cause = cause };
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this holds true in C# or we could just add a second constructor instead.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could just add a constructor overload and that would work

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although I think this is probably a fine way of doing it

}

[InternalApi]
public string CauseString => Cause.HasValue ? $" because of {Cause.Value.Message}" : "";

public override string ToString() => $"CommandFailed({Cmd}){CauseString}";
}

/// <summary>
Expand Down
10 changes: 8 additions & 2 deletions src/core/Akka/IO/TcpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ enum ConnectionStatus

private IActorRef _watchedActor = Context.System.DeadLetters;

private readonly IOException droppingWriteBecauseWritingIsSuspendedException =
new IOException("Dropping write because writing is suspended");

private readonly IOException droppingWriteBecauseQueueIsFullException =
new IOException("Dropping write because queue is full");

protected TcpConnection(TcpExt tcp, Socket socket, bool pullMode, Option<int> writeCommandsBufferMaxSize)
{
if (socket == null) throw new ArgumentNullException(nameof(socket));
Expand Down Expand Up @@ -328,7 +334,7 @@ private Receive HandleWriteMessages(ConnectionInfo info)
if (HasStatus(ConnectionStatus.WritingSuspended))
{
if (_traceLogging) Log.Debug("Dropping write because writing is suspended");
Sender.Tell(write.FailureMessage);
Sender.Tell(write.FailureMessage.WithCause(droppingWriteBecauseWritingIsSuspendedException));
}

if (HasStatus(ConnectionStatus.Sending))
Expand Down Expand Up @@ -405,7 +411,7 @@ private Receive HandleWriteMessages(ConnectionInfo info)
private void DropWrite(ConnectionInfo info, WriteCommand write)
{
if (_traceLogging) Log.Debug("Dropping write because queue is full");
Sender.Tell(write.FailureMessage);
Sender.Tell(write.FailureMessage.WithCause(droppingWriteBecauseQueueIsFullException));
if (info.UseResumeWriting) SetStatus(ConnectionStatus.WritingSuspended);
}

Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka/IO/TcpListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private Receive Initializing() => message =>
return true;

case Status.Failure fail:
_bindCommander.Tell(_bind.FailureMessage);
_bindCommander.Tell(_bind.FailureMessage.WithCause(fail.Cause));
_log.Error(fail.Cause, "Bind failed for TCP channel on endpoint [{0}]", _bind.LocalAddress);
Context.Stop(Self);
_binding = false;
Expand Down
44 changes: 27 additions & 17 deletions src/core/Akka/IO/TcpOutgoingConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
using System.Net.Sockets;
using System.Runtime.CompilerServices;
using Akka.Actor;
using Akka.Annotations;
using Akka.Util;

namespace Akka.IO
{
/// <summary>
/// TBD
/// An actor handling the connection state machine for an outgoing connection
/// to be established.
/// </summary>
internal sealed class TcpOutgoingConnection : TcpConnection
{
Expand All @@ -26,6 +28,9 @@ internal sealed class TcpOutgoingConnection : TcpConnection

private SocketAsyncEventArgs _connectArgs;

private readonly ConnectException finishConnectNeverReturnedTrueException =
new ConnectException("Could not establish connection because finishConnect never returned true");

public TcpOutgoingConnection(TcpExt tcp, IActorRef commander, Tcp.Connect connect)
: base(
tcp,
Expand Down Expand Up @@ -61,11 +66,11 @@ private void ReleaseConnectionSocketArgs()
}
}

private void Stop()
private void Stop(Exception cause)
{
ReleaseConnectionSocketArgs();

StopWith(new CloseInformation(new HashSet<IActorRef>(new[] {_commander}), _connect.FailureMessage));
StopWith(new CloseInformation(new HashSet<IActorRef>(new[] {_commander}), _connect.FailureMessage.WithCause(cause)));
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand All @@ -77,30 +82,29 @@ private void ReportConnectFailure(Action thunk)
}
catch (Exception e)
{
Log.Error(e, "Could not establish connection to [{0}].", _connect.RemoteAddress);
Stop();
Log.Debug(e, "Could not establish connection to [{0}] due to {1}", _connect.RemoteAddress, e.Message);
Stop(e);
}
}

protected override void PreStart()
{
ReportConnectFailure(() =>
{
if (_connect.RemoteAddress is DnsEndPoint)
if (_connect.RemoteAddress is DnsEndPoint remoteAddress)
{
var remoteAddress = (DnsEndPoint) _connect.RemoteAddress;
Log.Debug("Resolving {0} before connecting", remoteAddress.Host);
var resolved = Dns.ResolveName(remoteAddress.Host, Context.System, Self);
if (resolved == null)
Become(Resolving(remoteAddress));
else if(resolved.Ipv4.Any() && resolved.Ipv6.Any()) // one of both families
else if (resolved.Ipv4.Any() && resolved.Ipv6.Any()) // one of both families
Register(new IPEndPoint(resolved.Ipv4.FirstOrDefault(), remoteAddress.Port), new IPEndPoint(resolved.Ipv6.FirstOrDefault(), remoteAddress.Port));
else // one or the other
Register(new IPEndPoint(resolved.Addr, remoteAddress.Port), null);
}
else if(_connect.RemoteAddress is IPEndPoint)
else if (_connect.RemoteAddress is IPEndPoint point)
{
Register((IPEndPoint)_connect.RemoteAddress, null);
Register(point, null);
}
else throw new NotSupportedException($"Couldn't connect to [{_connect.RemoteAddress}]: only IP and DNS-based endpoints are supported");
});
Expand All @@ -123,8 +127,7 @@ private Receive Resolving(DnsEndPoint remoteAddress)
{
return message =>
{
var resolved = message as Dns.Resolved;
if (resolved != null)
if (message is Dns.Resolved resolved)
{
if (resolved.Ipv4.Any() && resolved.Ipv6.Any()) // multiple addresses
{
Expand All @@ -144,7 +147,6 @@ private Receive Resolving(DnsEndPoint remoteAddress)
};
}


private void Register(IPEndPoint address, IPEndPoint fallbackAddress)
{
ReportConnectFailure(() =>
Expand All @@ -165,7 +167,7 @@ private Receive Connecting(int remainingFinishConnectRetries, SocketAsyncEventAr
{
return message =>
{
if (message is IO.Tcp.SocketConnected)
if (message is Tcp.SocketConnected)
{
if (args.SocketError == SocketError.Success)
{
Expand Down Expand Up @@ -202,19 +204,27 @@ private Receive Connecting(int remainingFinishConnectRetries, SocketAsyncEventAr
else
{
Log.Debug("Could not establish connection because finishConnect never returned true (consider increasing akka.io.tcp.finish-connect-retries)");
Stop();
Stop(finishConnectNeverReturnedTrueException);
}
return true;
}
if (message is ReceiveTimeout)
{
if (_connect.Timeout.HasValue) Context.SetReceiveTimeout(null); // Clear the timeout
Log.Error("Connect timeout expired, could not establish connection to [{0}]", _connect.RemoteAddress);
Stop();
Log.Debug("Connect timeout expired, could not establish connection to [{0}]", _connect.RemoteAddress);
Stop(new ConnectException($"Connect timeout of {_connect.Timeout} expired"));
return true;
}
return false;
};
}
}

[InternalApi]
public class ConnectException : Exception
{
public ConnectException(string message)
: base(message)
{ }
}
}