diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.verified.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.verified.txt index 9cbd3d435c3..a9cc7e1c2a4 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.verified.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.verified.txt @@ -3328,6 +3328,11 @@ namespace Akka.IO public static byte[] op_Explicit(Akka.IO.ByteString byteString) { } public static bool !=(Akka.IO.ByteString x, Akka.IO.ByteString y) { } } + [Akka.Annotations.InternalApiAttribute()] + public class ConnectException : System.Exception + { + public ConnectException(string message) { } + } public class Dns : Akka.Actor.ExtensionIdProvider { public static readonly Akka.IO.Dns Instance; @@ -3542,8 +3547,13 @@ namespace Akka.IO public sealed class CommandFailed : Akka.IO.Tcp.Event { public CommandFailed(Akka.IO.Tcp.Command cmd) { } + public Akka.Util.Option Cause { get; } + [Akka.Annotations.InternalApiAttribute()] + public string CauseString { get; } public Akka.IO.Tcp.Command Cmd { get; } public override string ToString() { } + [Akka.Annotations.InternalApiAttribute()] + public Akka.IO.Tcp.CommandFailed WithCause(System.Exception cause) { } } public class CompoundWrite : Akka.IO.Tcp.WriteCommand, System.Collections.Generic.IEnumerable, System.Collections.IEnumerable { diff --git a/src/core/Akka/IO/Tcp.cs b/src/core/Akka/IO/Tcp.cs index 78996792f51..b6f190cc664 100644 --- a/src/core/Akka/IO/Tcp.cs +++ b/src/core/Akka/IO/Tcp.cs @@ -11,10 +11,12 @@ using System.Linq; using System.Net; using Akka.Actor; +using Akka.Annotations; using Akka.Configuration; using Akka.Dispatch; using Akka.Event; using Akka.IO.Buffers; +using Akka.Util; namespace Akka.IO { @@ -85,7 +87,7 @@ private SocketConnected() { } public class Message : INoSerializationVerificationNeeded { } #region user commands - + // COMMANDS /// /// TBD @@ -729,7 +731,7 @@ public override string ToString() => #endregion #region user events - + /// /// Common interface for all events generated by the TCP layer actors. /// @@ -804,18 +806,32 @@ public sealed class CommandFailed : Event /// TBD /// /// TBD - public CommandFailed(Command cmd) - { - Cmd = cmd; - } + public CommandFailed(Command cmd) => Cmd = cmd; /// /// TBD /// public Command Cmd { get; } - public override string ToString() => - $"CommandFailed({Cmd})"; + /// + /// Optionally contains the cause why the command failed. + /// + public Option Cause { get; private set; } = Option.None; + + /// + /// Creates a copy of this object with a new cause set. + /// + [InternalApi] + public CommandFailed WithCause(Exception cause) + { + // Needs to be added with a mutable property for compatibility reasons + return new CommandFailed(Cmd) { Cause = cause }; + } + + [InternalApi] + public string CauseString => Cause.HasValue ? $" because of {Cause.Value.Message}" : ""; + + public override string ToString() => $"CommandFailed({Cmd}){CauseString}"; } /// diff --git a/src/core/Akka/IO/TcpConnection.cs b/src/core/Akka/IO/TcpConnection.cs index a25a5ce799c..e21e61f8f56 100644 --- a/src/core/Akka/IO/TcpConnection.cs +++ b/src/core/Akka/IO/TcpConnection.cs @@ -104,6 +104,12 @@ enum ConnectionStatus private IActorRef _watchedActor = Context.System.DeadLetters; + private readonly IOException droppingWriteBecauseWritingIsSuspendedException = + new IOException("Dropping write because writing is suspended"); + + private readonly IOException droppingWriteBecauseQueueIsFullException = + new IOException("Dropping write because queue is full"); + protected TcpConnection(TcpExt tcp, Socket socket, bool pullMode, Option writeCommandsBufferMaxSize) { if (socket == null) throw new ArgumentNullException(nameof(socket)); @@ -328,7 +334,7 @@ private Receive HandleWriteMessages(ConnectionInfo info) if (HasStatus(ConnectionStatus.WritingSuspended)) { if (_traceLogging) Log.Debug("Dropping write because writing is suspended"); - Sender.Tell(write.FailureMessage); + Sender.Tell(write.FailureMessage.WithCause(droppingWriteBecauseWritingIsSuspendedException)); } if (HasStatus(ConnectionStatus.Sending)) @@ -405,7 +411,7 @@ private Receive HandleWriteMessages(ConnectionInfo info) private void DropWrite(ConnectionInfo info, WriteCommand write) { if (_traceLogging) Log.Debug("Dropping write because queue is full"); - Sender.Tell(write.FailureMessage); + Sender.Tell(write.FailureMessage.WithCause(droppingWriteBecauseQueueIsFullException)); if (info.UseResumeWriting) SetStatus(ConnectionStatus.WritingSuspended); } diff --git a/src/core/Akka/IO/TcpListener.cs b/src/core/Akka/IO/TcpListener.cs index a26d8f80592..8397742191a 100644 --- a/src/core/Akka/IO/TcpListener.cs +++ b/src/core/Akka/IO/TcpListener.cs @@ -57,7 +57,7 @@ public TcpListener(TcpExt tcp, IActorRef bindCommander, } catch (Exception e) { - _bindCommander.Tell(bind.FailureMessage); + _bindCommander.Tell(_bind.FailureMessage.WithCause(e)); _log.Error(e, "Bind failed for TCP channel on endpoint [{0}]", bind.LocalAddress); Context.Stop(Self); } diff --git a/src/core/Akka/IO/TcpOutgoingConnection.cs b/src/core/Akka/IO/TcpOutgoingConnection.cs index add58d20723..689a47869f3 100644 --- a/src/core/Akka/IO/TcpOutgoingConnection.cs +++ b/src/core/Akka/IO/TcpOutgoingConnection.cs @@ -12,12 +12,14 @@ using System.Net.Sockets; using System.Runtime.CompilerServices; using Akka.Actor; +using Akka.Annotations; using Akka.Util; namespace Akka.IO { /// - /// TBD + /// An actor handling the connection state machine for an outgoing connection + /// to be established. /// internal sealed class TcpOutgoingConnection : TcpConnection { @@ -26,6 +28,9 @@ internal sealed class TcpOutgoingConnection : TcpConnection private SocketAsyncEventArgs _connectArgs; + private readonly ConnectException finishConnectNeverReturnedTrueException = + new ConnectException("Could not establish connection because finishConnect never returned true"); + public TcpOutgoingConnection(TcpExt tcp, IActorRef commander, Tcp.Connect connect) : base( tcp, @@ -61,11 +66,11 @@ private void ReleaseConnectionSocketArgs() } } - private void Stop() + private void Stop(Exception cause) { ReleaseConnectionSocketArgs(); - StopWith(new CloseInformation(new HashSet(new[] {_commander}), _connect.FailureMessage)); + StopWith(new CloseInformation(new HashSet(new[] {_commander}), _connect.FailureMessage.WithCause(cause))); } [MethodImpl(MethodImplOptions.AggressiveInlining)] @@ -77,8 +82,8 @@ private void ReportConnectFailure(Action thunk) } catch (Exception e) { - Log.Error(e, "Could not establish connection to [{0}].", _connect.RemoteAddress); - Stop(); + Log.Debug(e, "Could not establish connection to [{0}] due to {1}", _connect.RemoteAddress, e.Message); + Stop(e); } } @@ -86,21 +91,20 @@ protected override void PreStart() { ReportConnectFailure(() => { - if (_connect.RemoteAddress is DnsEndPoint) + if (_connect.RemoteAddress is DnsEndPoint remoteAddress) { - var remoteAddress = (DnsEndPoint) _connect.RemoteAddress; Log.Debug("Resolving {0} before connecting", remoteAddress.Host); var resolved = Dns.ResolveName(remoteAddress.Host, Context.System, Self); if (resolved == null) Become(Resolving(remoteAddress)); - else if(resolved.Ipv4.Any() && resolved.Ipv6.Any()) // one of both families + else if (resolved.Ipv4.Any() && resolved.Ipv6.Any()) // one of both families Register(new IPEndPoint(resolved.Ipv4.FirstOrDefault(), remoteAddress.Port), new IPEndPoint(resolved.Ipv6.FirstOrDefault(), remoteAddress.Port)); else // one or the other Register(new IPEndPoint(resolved.Addr, remoteAddress.Port), null); } - else if(_connect.RemoteAddress is IPEndPoint) + else if (_connect.RemoteAddress is IPEndPoint point) { - Register((IPEndPoint)_connect.RemoteAddress, null); + Register(point, null); } else throw new NotSupportedException($"Couldn't connect to [{_connect.RemoteAddress}]: only IP and DNS-based endpoints are supported"); }); @@ -123,8 +127,7 @@ private Receive Resolving(DnsEndPoint remoteAddress) { return message => { - var resolved = message as Dns.Resolved; - if (resolved != null) + if (message is Dns.Resolved resolved) { if (resolved.Ipv4.Any() && resolved.Ipv6.Any()) // multiple addresses { @@ -144,7 +147,6 @@ private Receive Resolving(DnsEndPoint remoteAddress) }; } - private void Register(IPEndPoint address, IPEndPoint fallbackAddress) { ReportConnectFailure(() => @@ -165,7 +167,7 @@ private Receive Connecting(int remainingFinishConnectRetries, SocketAsyncEventAr { return message => { - if (message is IO.Tcp.SocketConnected) + if (message is Tcp.SocketConnected) { if (args.SocketError == SocketError.Success) { @@ -202,19 +204,27 @@ private Receive Connecting(int remainingFinishConnectRetries, SocketAsyncEventAr else { Log.Debug("Could not establish connection because finishConnect never returned true (consider increasing akka.io.tcp.finish-connect-retries)"); - Stop(); + Stop(finishConnectNeverReturnedTrueException); } return true; } if (message is ReceiveTimeout) { if (_connect.Timeout.HasValue) Context.SetReceiveTimeout(null); // Clear the timeout - Log.Error("Connect timeout expired, could not establish connection to [{0}]", _connect.RemoteAddress); - Stop(); + Log.Debug("Connect timeout expired, could not establish connection to [{0}]", _connect.RemoteAddress); + Stop(new ConnectException($"Connect timeout of {_connect.Timeout} expired")); return true; } return false; }; } } + + [InternalApi] + public class ConnectException : Exception + { + public ConnectException(string message) + : base(message) + { } + } }