Skip to content

Commit

Permalink
Close accept loop when closing connection for Quic (#44885)
Browse files Browse the repository at this point in the history
  • Loading branch information
jkotalik authored Nov 26, 2020
1 parent 31ffad2 commit 1503364
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,21 +161,21 @@ private uint HandleEventShutdownInitiatedByTransport(ref ConnectionEvent connect
_connectTcs.SetException(ExceptionDispatchInfo.SetCurrentStackTrace(ex));
}

_acceptQueue.Writer.Complete();

return MsQuicStatusCodes.Success;
}

private uint HandleEventShutdownInitiatedByPeer(ref ConnectionEvent connectionEvent)
{
_abortErrorCode = connectionEvent.Data.ShutdownInitiatedByPeer.ErrorCode;
_acceptQueue.Writer.Complete();
return MsQuicStatusCodes.Success;
}

private uint HandleEventShutdownComplete(ref ConnectionEvent connectionEvent)
{
_shutdownTcs.SetResult(MsQuicStatusCodes.Success);

// Stop accepting new streams.
_acceptQueue?.Writer.Complete();
return MsQuicStatusCodes.Success;
}

Expand Down Expand Up @@ -291,7 +291,7 @@ private MsQuicStream StreamOpen(

private void SetCallbackHandler()
{
Debug.Assert(!_handle.IsAllocated);
Debug.Assert(!_handle.IsAllocated, "callback handler allocated already");
_handle = GCHandle.Alloc(this);

MsQuicApi.Api.SetCallbackHandlerDelegate(
Expand All @@ -310,8 +310,6 @@ private ValueTask ShutdownAsync(
ErrorCode);
QuicExceptionHelpers.ThrowIfFailed(status, "Failed to shutdown connection.");

Debug.Assert(_shutdownTcs.Task.IsCompleted == false);

return new ValueTask(_shutdownTcs.Task);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ internal sealed class MsQuicListener : QuicListenerProvider, IDisposable
private QuicListenerOptions _options;
private volatile bool _disposed;
private IPEndPoint _listenEndPoint;

private bool _started;
private readonly Channel<MsQuicConnection> _acceptConnectionQueue;

internal MsQuicListener(QuicListenerOptions options)
Expand Down Expand Up @@ -120,6 +120,13 @@ internal override void Start()
{
ThrowIfDisposed();

// protect against double starts.
if (_started)
{
throw new QuicException("Cannot start Listener multiple times");
}

_started = true;
SetCallbackHandler();

SOCKADDR_INET address = MsQuicAddressHelpers.IPEndPointToINet(_listenEndPoint);
Expand Down Expand Up @@ -202,7 +209,7 @@ private static uint NativeCallbackHandler(

internal void SetCallbackHandler()
{
Debug.Assert(!_handle.IsAllocated);
Debug.Assert(!_handle.IsAllocated, "listener allocated");
_handle = GCHandle.Alloc(this);

MsQuicApi.Api.SetCallbackHandlerDelegate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ internal sealed class MsQuicStream : QuicStreamProvider
// Creates a new MsQuicStream
internal MsQuicStream(MsQuicConnection connection, QUIC_STREAM_OPEN_FLAG flags, IntPtr nativeObjPtr, bool inbound)
{
Debug.Assert(connection != null);
Debug.Assert(connection != null, "Connection null");

_ptr = nativeObjPtr;

Expand Down Expand Up @@ -936,7 +936,7 @@ private unsafe ValueTask SendReadOnlyMemoryListAsync(
/// </summary>
private void StartLocalStream()
{
Debug.Assert(!_started);
Debug.Assert(!_started, "start local stream");
uint status = MsQuicApi.Api.StreamStartDelegate(
_ptr,
(uint)QUIC_STREAM_START_FLAG.ASYNC);
Expand Down
17 changes: 17 additions & 0 deletions src/libraries/System.Net.Quic/tests/FunctionalTests/MsQuicTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,23 @@ public async Task CallDifferentWriteMethodsWorks()
Assert.Equal(24, res);
}

[Fact]
public async Task CloseAsync_ByServer_AcceptThrows()
{
await RunClientServer(
clientConnection =>
{
return Task.CompletedTask;
},
async serverConnection =>
{
var acceptTask = serverConnection.AcceptStreamAsync();
await serverConnection.CloseAsync(errorCode: 0);
// make sure
await Assert.ThrowsAsync<QuicOperationAbortedException>(() => acceptTask.AsTask());
});
}

private static ReadOnlySequence<byte> CreateReadOnlySequenceFromBytes(byte[] data)
{
List<byte[]> segments = new List<byte[]>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ await Task.Run(async () =>
using QuicListener listener = CreateQuicListener();
using QuicConnection clientConnection = CreateQuicConnection(listener.ListenEndPoint);
await clientConnection.ConnectAsync();
var clientStreamTask = clientConnection.ConnectAsync();
using QuicConnection serverConnection = await listener.AcceptConnectionAsync();
}).TimeoutAfter(millisecondsTimeout: 5_000);
await clientStreamTask;
}).TimeoutAfter(millisecondsTimeout: 6_000);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,8 +305,6 @@ public async Task LargeDataSentAndReceived()
}
}



[Fact]
public async Task TestStreams()
{
Expand Down

0 comments on commit 1503364

Please sign in to comment.