Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add current-outgoing-connect-attempts counter to Sockets #66651

Merged
merged 4 commits into from
May 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 20 additions & 30 deletions src/libraries/System.Net.Sockets/src/System/Net/Sockets/Socket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1037,11 +1037,7 @@ public Socket Accept()
}
catch (Exception ex)
{
if (SocketsTelemetry.Log.IsEnabled())
{
SocketsTelemetry.Log.AfterAccept(SocketError.Interrupted, ex.Message);
}

SocketsTelemetry.Log.AfterAccept(SocketError.Interrupted, ex.Message);
throw;
}

Expand All @@ -1051,12 +1047,12 @@ public Socket Accept()
Debug.Assert(acceptedSocketHandle.IsInvalid);
UpdateAcceptSocketErrorForDisposed(ref errorCode);

if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AfterAccept(errorCode);
SocketsTelemetry.Log.AfterAccept(errorCode);

UpdateStatusAfterSocketErrorAndThrowException(errorCode);
}

if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AfterAccept(SocketError.Success);
SocketsTelemetry.Log.AfterAccept(SocketError.Success);

Debug.Assert(!acceptedSocketHandle.IsInvalid);

Expand Down Expand Up @@ -2622,10 +2618,7 @@ private bool AcceptAsync(SocketAsyncEventArgs e, CancellationToken cancellationT
}
catch (Exception ex)
{
if (SocketsTelemetry.Log.IsEnabled())
{
SocketsTelemetry.Log.AfterAccept(SocketError.Interrupted, ex.Message);
}
SocketsTelemetry.Log.AfterAccept(SocketError.Interrupted, ex.Message);

// Clear in-use flag on event args object.
e.Complete();
Expand Down Expand Up @@ -2703,14 +2696,19 @@ internal bool ConnectAsync(SocketAsyncEventArgs e, bool userSocket, bool saeaCan

WildcardBindForConnectIfNecessary(endPointSnapshot.AddressFamily);

if (SocketsTelemetry.Log.IsEnabled())
{
SocketsTelemetry.Log.ConnectStart(e._socketAddress!);
}
SocketsTelemetry.Log.ConnectStart(e._socketAddress!);

// Prepare for the native call.
e.StartOperationCommon(this, SocketAsyncOperation.Connect);
e.StartOperationConnect(saeaMultiConnectCancelable: false, userSocket);
try
{
e.StartOperationCommon(this, SocketAsyncOperation.Connect);
e.StartOperationConnect(saeaMultiConnectCancelable: false, userSocket);
}
catch (Exception ex)
{
SocketsTelemetry.Log.AfterConnect(SocketError.NotSocket, ex.Message);
throw;
}

// Make the native call.
try
Expand All @@ -2724,10 +2722,7 @@ internal bool ConnectAsync(SocketAsyncEventArgs e, bool userSocket, bool saeaCan
}
catch (Exception ex)
{
if (SocketsTelemetry.Log.IsEnabled())
{
SocketsTelemetry.Log.AfterConnect(SocketError.NotSocket, ex.Message);
}
SocketsTelemetry.Log.AfterConnect(SocketError.NotSocket, ex.Message);

_localEndPoint = null;

Expand Down Expand Up @@ -3085,20 +3080,15 @@ private Internals.SocketAddress Serialize(ref EndPoint remoteEP)

private void DoConnect(EndPoint endPointSnapshot, Internals.SocketAddress socketAddress)
{
if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.ConnectStart(socketAddress);

SocketsTelemetry.Log.ConnectStart(socketAddress);
SocketError errorCode;
try
{
errorCode = SocketPal.Connect(_handle, socketAddress.Buffer, socketAddress.Size);
}
catch (Exception ex)
{
if (SocketsTelemetry.Log.IsEnabled())
{
SocketsTelemetry.Log.AfterConnect(SocketError.NotSocket, ex.Message);
}

SocketsTelemetry.Log.AfterConnect(SocketError.NotSocket, ex.Message);
throw;
}

Expand All @@ -3111,12 +3101,12 @@ private void DoConnect(EndPoint endPointSnapshot, Internals.SocketAddress socket
UpdateStatusAfterSocketError(socketException);
if (NetEventSource.Log.IsEnabled()) NetEventSource.Error(this, socketException);

if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AfterConnect(errorCode);
SocketsTelemetry.Log.AfterConnect(errorCode);

throw socketException;
}

if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AfterConnect(SocketError.Success);
SocketsTelemetry.Log.AfterConnect(SocketError.Success);

if (NetEventSource.Log.IsEnabled()) NetEventSource.Info(this, $"connection to:{endPointSnapshot}");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,11 @@ public int BytesTransferred

private void OnCompletedInternal()
{
if (SocketsTelemetry.Log.IsEnabled()) AfterConnectAcceptTelemetry();
// The following check checks if the operation was Accept (1) or Connect (2)
if (LastOperation <= SocketAsyncOperation.Connect)
MihaZupan marked this conversation as resolved.
Show resolved Hide resolved
{
AfterConnectAcceptTelemetry();
}

OnCompleted(this);
}
Expand All @@ -222,6 +226,10 @@ private void AfterConnectAcceptTelemetry()
case SocketAsyncOperation.Connect:
SocketsTelemetry.Log.AfterConnect(SocketError);
break;

default:
Debug.Fail($"Callers should guard against calling this method for '{LastOperation}'");
break;
}
}

Expand Down Expand Up @@ -999,7 +1007,11 @@ private void FinishOperationSync(SocketError socketError, int bytesTransferred,
FinishOperationSyncFailure(socketError, bytesTransferred, flags);
}

if (SocketsTelemetry.Log.IsEnabled()) AfterConnectAcceptTelemetry();
// The following check checks if the operation was Accept (1) or Connect (2)
if (LastOperation <= SocketAsyncOperation.Connect)
{
AfterConnectAcceptTelemetry();
}
}

private static void LogBytesTransferEvents(SocketType? socketType, SocketAsyncOperation operation, int bytesTransferred)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ internal sealed class SocketsTelemetry : EventSource
{
public static readonly SocketsTelemetry Log = new SocketsTelemetry();

private PollingCounter? _currentOutgoingConnectAttemptsCounter;
private PollingCounter? _outgoingConnectionsEstablishedCounter;
private PollingCounter? _incomingConnectionsEstablishedCounter;
private PollingCounter? _bytesReceivedCounter;
private PollingCounter? _bytesSentCounter;
private PollingCounter? _datagramsReceivedCounter;
private PollingCounter? _datagramsSentCounter;

private long _currentOutgoingConnectAttempts;
private long _outgoingConnectionsEstablished;
private long _incomingConnectionsEstablished;
private long _bytesReceived;
Expand Down Expand Up @@ -77,6 +79,8 @@ private void AcceptFailed(SocketError error, string? exceptionMessage)
[NonEvent]
public void ConnectStart(Internals.SocketAddress address)
{
Interlocked.Increment(ref _currentOutgoingConnectAttempts);
stephentoub marked this conversation as resolved.
Show resolved Hide resolved

if (IsEnabled(EventLevel.Informational, EventKeywords.All))
{
ConnectStart(address.ToString());
Expand All @@ -86,6 +90,9 @@ public void ConnectStart(Internals.SocketAddress address)
[NonEvent]
public void AfterConnect(SocketError error, string? exceptionMessage = null)
{
long newCount = Interlocked.Decrement(ref _currentOutgoingConnectAttempts);
Debug.Assert(newCount >= 0);

if (error == SocketError.Success)
{
Debug.Assert(exceptionMessage is null);
Expand Down Expand Up @@ -165,6 +172,10 @@ protected override void OnEventCommand(EventCommandEventArgs command)
{
// This is the convention for initializing counters in the RuntimeEventSource (lazily on the first enable command).

_currentOutgoingConnectAttemptsCounter ??= new PollingCounter("current-outgoing-connect-attempts", this, () => Interlocked.Read(ref _currentOutgoingConnectAttempts))
{
DisplayName = "Current Outgoing Connect Attempts",
};
_outgoingConnectionsEstablishedCounter ??= new PollingCounter("outgoing-connections-established", this, () => Interlocked.Read(ref _outgoingConnectionsEstablished))
{
DisplayName = "Outgoing Connections Established",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,29 +96,40 @@ public void EventSource_SocketConnectsLoopback_LogsConnectAcceptStartStop(string
listener.AddActivityTracking();

var events = new ConcurrentQueue<(EventWrittenEventArgs Event, Guid ActivityId)>();
await listener.RunWithCallbackAsync(e => events.Enqueue((e, e.ActivityId)), async () =>
await listener.RunWithCallbackAsync(e =>
{
events.Enqueue((e, e.ActivityId));

if (e.EventName == "ConnectStart")
{
// Make sure we observe a non-zero current-outgoing-connect-attempts counter
WaitForEventCountersAsync(events).GetAwaiter().GetResult();
}
},
async () =>
{
using var server = new Socket(SocketType.Stream, ProtocolType.Tcp);
server.Bind(new IPEndPoint(IPAddress.Loopback, 0));
server.Listen();

using var client = new Socket(SocketType.Stream, ProtocolType.Tcp);

Task acceptTask = GetHelperBase(acceptMethod).AcceptAsync(server);
await WaitForEventAsync(events, "AcceptStart");
Task connectTask = GetHelperBase(connectMethod).ConnectAsync(client, server.LocalEndPoint);
await WaitForEventAsync(events, "ConnectStart");
await WaitForEventCountersAsync(events); // Wait for current-outgoing-connect-attempts = 1

await GetHelperBase(connectMethod).ConnectAsync(client, server.LocalEndPoint);
await acceptTask;
await GetHelperBase(acceptMethod).AcceptAsync(server);

await WaitForEventAsync(events, "AcceptStop");
await WaitForEventAsync(events, "ConnectStop");
await connectTask;

await WaitForEventCountersAsync(events);
});

VerifyEvents(events, connect: true, expectedCount: 1);
VerifyEvents(events, connect: false, expectedCount: 1);
VerifyEventCounters(events, connectCount: 1);
VerifyEventCounters(events, connectCount: 1, hasCurrentConnectCounter: true);
}, connectMethod, acceptMethod).Dispose();
}

Expand Down Expand Up @@ -445,7 +456,7 @@ private static void VerifyEvents(ConcurrentQueue<(EventWrittenEventArgs Event, G
}
}

private static void VerifyEventCounters(ConcurrentQueue<(EventWrittenEventArgs Event, Guid ActivityId)> events, int connectCount, bool connectOnly = false, bool shouldHaveTransferedBytes = false, bool shouldHaveDatagrams = false)
private static void VerifyEventCounters(ConcurrentQueue<(EventWrittenEventArgs Event, Guid ActivityId)> events, int connectCount, bool hasCurrentConnectCounter = false, bool connectOnly = false, bool shouldHaveTransferedBytes = false, bool shouldHaveDatagrams = false)
{
Dictionary<string, double[]> eventCounters = events
.Where(e => e.Event.EventName == "EventCounters")
Expand All @@ -459,6 +470,13 @@ private static void VerifyEventCounters(ConcurrentQueue<(EventWrittenEventArgs E
Assert.True(eventCounters.TryGetValue("incoming-connections-established", out double[] incomingConnections));
Assert.Equal(connectOnly ? 0 : connectCount, incomingConnections[^1]);

Assert.True(eventCounters.TryGetValue("current-outgoing-connect-attempts", out double[] currentOutgoingConnectAttempts));
if (hasCurrentConnectCounter)
{
Assert.Contains(currentOutgoingConnectAttempts, c => c > 0);
}
Assert.Equal(0, currentOutgoingConnectAttempts[^1]);

Assert.True(eventCounters.TryGetValue("bytes-received", out double[] bytesReceived));
if (shouldHaveTransferedBytes)
{
Expand Down