From 9dd028f026d1fffe73a86028c1a352f7225cc94c Mon Sep 17 00:00:00 2001 From: Radek Zikmund Date: Mon, 2 May 2022 18:52:04 +0200 Subject: [PATCH 1/8] Merge several version of MsQuicStream SendAsync code --- .../Implementations/MsQuic/MsQuicStream.cs | 340 ++++++++---------- 1 file changed, 143 insertions(+), 197 deletions(-) diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs index 0ef90fcea3c27..a20aae57ba66b 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs @@ -277,15 +277,9 @@ internal override ValueTask WriteAsync(ReadOnlySequence buffers, Cancellat return WriteAsync(buffers, endStream: false, cancellationToken); } - internal override async ValueTask WriteAsync(ReadOnlySequence buffers, bool endStream, CancellationToken cancellationToken = default) + internal override ValueTask WriteAsync(ReadOnlySequence buffers, bool endStream, CancellationToken cancellationToken = default) { - ThrowIfDisposed(); - - using CancellationTokenRegistration registration = SetupWriteStartState(buffers.IsEmpty, cancellationToken); - - await SendReadOnlySequenceAsync(buffers, endStream ? QUIC_SEND_FLAGS.FIN : QUIC_SEND_FLAGS.NONE).ConfigureAwait(false); - - CleanupWriteCompletedState(); + return WriteAsync(new WriteSequenceAdapter(buffers), endStream ? QUIC_SEND_FLAGS.FIN : QUIC_SEND_FLAGS.NONE, cancellationToken); } internal override ValueTask WriteAsync(ReadOnlyMemory> buffers, CancellationToken cancellationToken = default) @@ -293,26 +287,63 @@ internal override ValueTask WriteAsync(ReadOnlyMemory> buff return WriteAsync(buffers, endStream: false, cancellationToken); } - internal override async ValueTask WriteAsync(ReadOnlyMemory> buffers, bool endStream, CancellationToken cancellationToken = default) + internal override ValueTask WriteAsync(ReadOnlyMemory> buffers, bool endStream, CancellationToken cancellationToken = default) + { + return WriteAsync(new WriteMemoryOfMemoryAdapter(buffers), endStream ? QUIC_SEND_FLAGS.FIN : QUIC_SEND_FLAGS.NONE, cancellationToken); + } + + internal override ValueTask WriteAsync(ReadOnlyMemory buffer, bool endStream, CancellationToken cancellationToken = default) + { + return WriteAsync(new WriteMemoryAdapter(buffer), endStream ? QUIC_SEND_FLAGS.FIN : QUIC_SEND_FLAGS.NONE, cancellationToken); + } + + private async ValueTask WriteAsync(TAdapter adapter, QUIC_SEND_FLAGS flags, CancellationToken cancellationToken) where TAdapter : ISendBufferAdapter { ThrowIfDisposed(); - using CancellationTokenRegistration registration = SetupWriteStartState(buffers.IsEmpty, cancellationToken); + using CancellationTokenRegistration registration = SetupWriteStartState(adapter.IsEmpty, cancellationToken); - await SendReadOnlyMemoryListAsync(buffers, endStream ? QUIC_SEND_FLAGS.FIN : QUIC_SEND_FLAGS.NONE).ConfigureAwait(false); + await WriteAsyncInternal(adapter, flags).ConfigureAwait(false); CleanupWriteCompletedState(); } - internal override async ValueTask WriteAsync(ReadOnlyMemory buffer, bool endStream, CancellationToken cancellationToken = default) + private unsafe ValueTask WriteAsyncInternal(TAdapter adapter, QUIC_SEND_FLAGS flags) where TAdapter : ISendBufferAdapter { - ThrowIfDisposed(); + if (adapter.IsEmpty) + { + if ((flags & QUIC_SEND_FLAGS.FIN) == QUIC_SEND_FLAGS.FIN) + { + // Start graceful shutdown sequence if passed in the fin flag and there is an empty buffer. + StartShutdown(QUIC_STREAM_SHUTDOWN_FLAGS.GRACEFUL, errorCode: 0); + } + return default; + } - using CancellationTokenRegistration registration = SetupWriteStartState(buffer.IsEmpty, cancellationToken); + adapter.SetupSendState(_state); - await SendReadOnlyMemoryAsync(buffer, endStream ? QUIC_SEND_FLAGS.FIN : QUIC_SEND_FLAGS.NONE).ConfigureAwait(false); + Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)"); + uint status = MsQuicApi.Api.StreamSendDelegate( + _state.Handle, + (QuicBuffer*)_state.SendQuicBuffers, + (uint)_state.SendBufferCount, + flags, + IntPtr.Zero); - CleanupWriteCompletedState(); + if (!MsQuicStatusHelper.SuccessfulStatusCode(status)) + { + CleanupWriteFailedState(); + CleanupSendState(_state); + + if (status == MsQuicStatusCodes.Aborted) + { + throw ThrowHelper.GetConnectionAbortedException(_state.ConnectionState.AbortErrorCode); + } + QuicExceptionHelpers.ThrowIfFailed(status, + "Could not send data to peer."); + } + + return _state.SendResettableCompletionSource.GetTypelessValueTask(); } private CancellationTokenRegistration SetupWriteStartState(bool emptyBuffer, CancellationToken cancellationToken) @@ -1394,187 +1425,6 @@ private static void CleanupSendState(State state) } } - // TODO prevent overlapping sends or consider supporting it. - private unsafe ValueTask SendReadOnlyMemoryAsync( - ReadOnlyMemory buffer, - QUIC_SEND_FLAGS flags) - { - if (buffer.IsEmpty) - { - if ((flags & QUIC_SEND_FLAGS.FIN) == QUIC_SEND_FLAGS.FIN) - { - // Start graceful shutdown sequence if passed in the fin flag and there is an empty buffer. - StartShutdown(QUIC_STREAM_SHUTDOWN_FLAGS.GRACEFUL, errorCode: 0); - } - return default; - } - - MemoryHandle handle = buffer.Pin(); - if (_state.SendQuicBuffers == IntPtr.Zero) - { - _state.SendQuicBuffers = Marshal.AllocHGlobal(sizeof(QUIC_BUFFER)); - _state.SendBufferMaxCount = 1; - } - - QUIC_BUFFER* quicBuffers = (QUIC_BUFFER*)_state.SendQuicBuffers; - quicBuffers->Length = (uint)buffer.Length; - quicBuffers->Buffer = (byte*)handle.Pointer; - - _state.BufferArrays[0] = handle; - _state.SendBufferCount = 1; - - Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)"); - int status = MsQuicApi.Api.ApiTable->StreamSend( - _state.Handle.QuicHandle, - quicBuffers, - 1, - flags, - (void*)IntPtr.Zero); - - if (!StatusSucceeded(status)) - { - CleanupWriteFailedState(); - CleanupSendState(_state); - - if (status == QUIC_STATUS_ABORTED) - { - throw ThrowHelper.GetConnectionAbortedException(_state.ConnectionState.AbortErrorCode); - } - ThrowIfFailure(status, "Could not send data to peer"); - } - - return _state.SendResettableCompletionSource.GetTypelessValueTask(); - } - - private unsafe ValueTask SendReadOnlySequenceAsync( - ReadOnlySequence buffers, - QUIC_SEND_FLAGS flags) - { - if (buffers.IsEmpty) - { - if ((flags & QUIC_SEND_FLAGS.FIN) == QUIC_SEND_FLAGS.FIN) - { - // Start graceful shutdown sequence if passed in the fin flag and there is an empty buffer. - StartShutdown(QUIC_STREAM_SHUTDOWN_FLAGS.GRACEFUL, errorCode: 0); - } - return default; - } - - int count = 0; - - foreach (ReadOnlyMemory buffer in buffers) - { - ++count; - } - - if (_state.SendBufferMaxCount < count) - { - Marshal.FreeHGlobal(_state.SendQuicBuffers); - _state.SendQuicBuffers = IntPtr.Zero; - _state.SendQuicBuffers = Marshal.AllocHGlobal(sizeof(QUIC_BUFFER) * count); - _state.SendBufferMaxCount = count; - _state.BufferArrays = new MemoryHandle[count]; - } - - _state.SendBufferCount = count; - count = 0; - - QUIC_BUFFER* quicBuffers = (QUIC_BUFFER*)_state.SendQuicBuffers; - foreach (ReadOnlyMemory buffer in buffers) - { - MemoryHandle handle = buffer.Pin(); - quicBuffers[count].Length = (uint)buffer.Length; - quicBuffers[count].Buffer = (byte*)handle.Pointer; - _state.BufferArrays[count] = handle; - ++count; - } - - Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)"); - int status = MsQuicApi.Api.ApiTable->StreamSend( - _state.Handle.QuicHandle, - quicBuffers, - (uint)count, - flags, - (void*)IntPtr.Zero); - - if (!StatusSucceeded(status)) - { - CleanupWriteFailedState(); - CleanupSendState(_state); - - if (status == QUIC_STATUS_ABORTED) - { - throw ThrowHelper.GetConnectionAbortedException(_state.ConnectionState.AbortErrorCode); - } - ThrowIfFailure(status, "Could not send data to peer"); - } - - return _state.SendResettableCompletionSource.GetTypelessValueTask(); - } - - private unsafe ValueTask SendReadOnlyMemoryListAsync( - ReadOnlyMemory> buffers, - QUIC_SEND_FLAGS flags) - { - if (buffers.IsEmpty) - { - if ((flags & QUIC_SEND_FLAGS.FIN) == QUIC_SEND_FLAGS.FIN) - { - // Start graceful shutdown sequence if passed in the fin flag and there is an empty buffer. - StartShutdown(QUIC_STREAM_SHUTDOWN_FLAGS.GRACEFUL, errorCode: 0); - } - return default; - } - - ReadOnlyMemory[] array = buffers.ToArray(); - - uint length = (uint)array.Length; - - if (_state.SendBufferMaxCount < array.Length) - { - Marshal.FreeHGlobal(_state.SendQuicBuffers); - _state.SendQuicBuffers = IntPtr.Zero; - _state.SendQuicBuffers = Marshal.AllocHGlobal(sizeof(QUIC_BUFFER) * array.Length); - _state.SendBufferMaxCount = array.Length; - _state.BufferArrays = new MemoryHandle[array.Length]; - } - - _state.SendBufferCount = array.Length; - QUIC_BUFFER* quicBuffers = (QUIC_BUFFER*)_state.SendQuicBuffers; - for (int i = 0; i < length; i++) - { - ReadOnlyMemory buffer = array[i]; - MemoryHandle handle = buffer.Pin(); - - quicBuffers[i].Length = (uint)buffer.Length; - quicBuffers[i].Buffer = (byte*)handle.Pointer; - - _state.BufferArrays[i] = handle; - } - - Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)"); - int status = MsQuicApi.Api.ApiTable->StreamSend( - _state.Handle.QuicHandle, - quicBuffers, - length, - flags, - (void*)IntPtr.Zero); - - if (!StatusSucceeded(status)) - { - CleanupWriteFailedState(); - CleanupSendState(_state); - - if (status == QUIC_STATUS_ABORTED) - { - throw ThrowHelper.GetConnectionAbortedException(_state.ConnectionState.AbortErrorCode); - } - ThrowIfFailure(status, "Could not send data to peer"); - } - - return _state.SendResettableCompletionSource.GetTypelessValueTask(); - } - private unsafe void ReceiveComplete(int bufferLength) { Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)"); @@ -1857,5 +1707,101 @@ private enum SendState /// Closed } + + private interface ISendBufferAdapter + { + void SetupSendState(State state); + + bool IsEmpty { get; } + + static unsafe void Reserve(State state, int count) + { + if (state.SendBufferMaxCount < count) + { + NativeMemory.Free((void*)state.SendQuicBuffers); + state.SendQuicBuffers = IntPtr.Zero; + state.SendQuicBuffers = (IntPtr)NativeMemory.Alloc((nuint)count, (nuint)sizeof(QuicBuffer)); + state.SendBufferMaxCount = count; + state.BufferArrays = new MemoryHandle[count]; + } + } + + static unsafe void SetBuffer(State state, int index, ReadOnlyMemory buffer) + { + MemoryHandle handle = buffer.Pin(); + QuicBuffer* quicBuffer = (QuicBuffer*)state.SendQuicBuffers + index; + quicBuffer->Length = (uint)buffer.Length; + quicBuffer->Buffer = (byte*)handle.Pointer; + state.BufferArrays[index] = handle; + } + } + + private struct WriteMemoryAdapter : ISendBufferAdapter + { + private readonly ReadOnlyMemory _buffer; + + public WriteMemoryAdapter(ReadOnlyMemory buffer) => _buffer = buffer; + + public bool IsEmpty => _buffer.IsEmpty; + + public unsafe void SetupSendState(State state) + { + ISendBufferAdapter.Reserve(state, 1); + ISendBufferAdapter.SetBuffer(state, 0, _buffer); + state.SendBufferCount = 1; + } + } + + private struct WriteSequenceAdapter : ISendBufferAdapter + { + private readonly ReadOnlySequence _buffers; + + public WriteSequenceAdapter(ReadOnlySequence buffers) => _buffers = buffers; + + public bool IsEmpty => _buffers.IsEmpty; + + public unsafe void SetupSendState(State state) + { + int count = 0; + + foreach (ReadOnlyMemory _ in _buffers) + { + ++count; + } + + ISendBufferAdapter.Reserve(state, count); + state.SendBufferCount = count; + count = 0; + + foreach (ReadOnlyMemory buffer in _buffers) + { + ISendBufferAdapter.SetBuffer(state, count, buffer); + ++count; + } + } + } + + private struct WriteMemoryOfMemoryAdapter : ISendBufferAdapter + { + private readonly ReadOnlyMemory> _buffers; + + public WriteMemoryOfMemoryAdapter(ReadOnlyMemory> buffers) => _buffers = buffers; + + public bool IsEmpty => _buffers.IsEmpty; + + public unsafe void SetupSendState(State state) + { + int count = _buffers.Length; + ISendBufferAdapter.Reserve(state, count); + state.SendBufferCount = count; + count = 0; + + foreach (ReadOnlyMemory buffer in _buffers.Span) + { + ISendBufferAdapter.SetBuffer(state, count, buffer); + ++count; + } + } + } } } From d5feac52ab3795f1abafbdc92a6db52496d60c12 Mon Sep 17 00:00:00 2001 From: Radek Zikmund Date: Thu, 5 May 2022 14:47:12 +0200 Subject: [PATCH 2/8] Use shared MsQuicBuffers struct --- .../MsQuic/Internal/MsQuicBuffers.cs | 85 +++++++++++++++--- .../Implementations/MsQuic/MsQuicStream.cs | 88 ++++--------------- 2 files changed, 90 insertions(+), 83 deletions(-) diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/Internal/MsQuicBuffers.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/Internal/MsQuicBuffers.cs index e98e9777058fc..786bca1e4c5ff 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/Internal/MsQuicBuffers.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/Internal/MsQuicBuffers.cs @@ -39,8 +39,27 @@ private void FreeNativeMemory() NativeMemory.Free(buffers); } + private void Reserve(int count) + { + if (_handles.Length < count) + { + _handles = new MemoryHandle[count]; + FreeNativeMemory(); + _buffers = (QUIC_BUFFER*)NativeMemory.Alloc((nuint)count, (nuint)sizeof(QUIC_BUFFER)); + } + } + + private void SetBuffer(int index, ReadOnlyMemory buffer) + { + MemoryHandle handle = buffer.Pin(); + + _handles[index] = handle; + _buffers[index].Buffer = (byte*)handle.Pointer; + _buffers[index].Length = (uint)buffer.Length; + } + /// - /// The method initializes QUIC_BUFFER* with data from inputs, converted via toBuffer. + /// Initializes QUIC_BUFFER* with data from inputs, converted via toBuffer. /// Note that the struct either needs to be freshly created via new or previously cleaned up with Reset. /// /// Inputs to get their byte array, pin them and pepare them to be passed to MsQuic as QUIC_BUFFER*. @@ -48,26 +67,66 @@ private void FreeNativeMemory() /// The type of the inputs. public void Initialize(IList inputs, Func> toBuffer) { - if (_handles.Length < inputs.Count) + Reserve(inputs.Count); + _count = inputs.Count; + + for (int i = 0; i < inputs.Count; ++i) { - _handles = new MemoryHandle[inputs.Count]; + ReadOnlyMemory buffer = toBuffer(inputs[i]); + SetBuffer(i, buffer); } - if (_count < inputs.Count) + } + + /// + /// Initializes QUIC_BUFFER* with the provided buffer. + /// Note that the struct either needs to be freshly created via new or previously cleaned up with Reset. + /// + /// Buffer to be passed to MsQuic as QUIC_BUFFER*. + public void Initialize(ReadOnlyMemory buffer) + { + Reserve(1); + _count = 1; + SetBuffer(0, buffer); + } + + /// + /// Initializes QUIC_BUFFER* with the provided buffers. + /// Note that the struct either needs to be freshly created via new or previously cleaned up with Reset. + /// + /// Buffers to be passed to MsQuic as QUIC_BUFFER*. + public void Initialize(ReadOnlySequence buffers) + { + int count = 0; + foreach (ReadOnlyMemory _ in buffers) { - FreeNativeMemory(); - _buffers = (QUIC_BUFFER*)NativeMemory.Alloc((nuint)sizeof(QUIC_BUFFER), (nuint)inputs.Count); + ++count; } - _count = inputs.Count; + Reserve(count); + _count = count; + count = 0; - for (int i = 0; i < inputs.Count; ++i) + foreach (ReadOnlyMemory buffer in buffers) { - ReadOnlyMemory buffer = toBuffer(inputs[i]); - MemoryHandle handle = buffer.Pin(); + SetBuffer(count, buffer); + ++count; + } + } - _handles[i] = handle; - _buffers[i].Buffer = (byte*)handle.Pointer; - _buffers[i].Length = (uint)buffer.Length; + /// + /// Initializes QUIC_BUFFER* with the provided buffers. + /// Note that the struct either needs to be freshly created via new or previously cleaned up with Reset. + /// + /// Buffers to be passed to MsQuic as QUIC_BUFFER*. + public void Initialize(ReadOnlyMemory> buffers) + { + int count = buffers.Length; + Reserve(count); + _count = count; + + for (int i = 0; i < buffers.Length; i++) + { + SetBuffer(i, buffers.Span[i]); } } diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs index a20aae57ba66b..fa79886cc1a30 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs @@ -58,11 +58,7 @@ private sealed class State public SendState SendState; public long SendErrorCode = -1; - // Buffers to hold during a call to send. - public MemoryHandle[] BufferArrays = new MemoryHandle[1]; - public IntPtr SendQuicBuffers; - public int SendBufferMaxCount; - public int SendBufferCount; + public MsQuicBuffers SendBuffers; // Resettable completions to be used for multiple calls to send. public readonly ResettableCompletionSource SendResettableCompletionSource = new ResettableCompletionSource(); @@ -85,6 +81,11 @@ private sealed class State // Set once stream have been shutdown. public readonly TaskCompletionSource ShutdownCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + public State() + { + SendBuffers = new MsQuicBuffers(); + } + public void Cleanup() { if (NetEventSource.Log.IsEnabled()) NetEventSource.Info(this, $"{Handle} releasing handles."); @@ -92,8 +93,7 @@ public void Cleanup() ShutdownState = ShutdownState.Finished; CleanupSendState(this); Handle?.Dispose(); - Marshal.FreeHGlobal(SendQuicBuffers); - SendQuicBuffers = IntPtr.Zero; + SendBuffers.Dispose(); if (StateGCHandle.IsAllocated) StateGCHandle.Free(); ConnectionState?.RemoveStream(null); } @@ -323,10 +323,10 @@ private unsafe ValueTask WriteAsyncInternal(TAdapter adapter, QUIC_SEN adapter.SetupSendState(_state); Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)"); - uint status = MsQuicApi.Api.StreamSendDelegate( - _state.Handle, - (QuicBuffer*)_state.SendQuicBuffers, - (uint)_state.SendBufferCount, + int status = MsQuicApi.Api.ApiTable->StreamSend( + _state.Handle.QuicHandle, + _state.SendBuffers.Buffers, + (uint)_state.SendBuffers.Count, flags, IntPtr.Zero); @@ -1416,12 +1416,7 @@ private static void CleanupSendState(State state) lock (state) { Debug.Assert(state.SendState != SendState.Pending); - Debug.Assert(state.SendBufferCount <= state.BufferArrays.Length); - - for (int i = 0; i < state.SendBufferCount; i++) - { - state.BufferArrays[i].Dispose(); - } + state.SendBuffers.Reset(); } } @@ -1713,27 +1708,6 @@ private interface ISendBufferAdapter void SetupSendState(State state); bool IsEmpty { get; } - - static unsafe void Reserve(State state, int count) - { - if (state.SendBufferMaxCount < count) - { - NativeMemory.Free((void*)state.SendQuicBuffers); - state.SendQuicBuffers = IntPtr.Zero; - state.SendQuicBuffers = (IntPtr)NativeMemory.Alloc((nuint)count, (nuint)sizeof(QuicBuffer)); - state.SendBufferMaxCount = count; - state.BufferArrays = new MemoryHandle[count]; - } - } - - static unsafe void SetBuffer(State state, int index, ReadOnlyMemory buffer) - { - MemoryHandle handle = buffer.Pin(); - QuicBuffer* quicBuffer = (QuicBuffer*)state.SendQuicBuffers + index; - quicBuffer->Length = (uint)buffer.Length; - quicBuffer->Buffer = (byte*)handle.Pointer; - state.BufferArrays[index] = handle; - } } private struct WriteMemoryAdapter : ISendBufferAdapter @@ -1744,11 +1718,9 @@ private struct WriteMemoryAdapter : ISendBufferAdapter public bool IsEmpty => _buffer.IsEmpty; - public unsafe void SetupSendState(State state) + public void SetupSendState(State state) { - ISendBufferAdapter.Reserve(state, 1); - ISendBufferAdapter.SetBuffer(state, 0, _buffer); - state.SendBufferCount = 1; + state.SendBuffers.Initialize(_buffer); } } @@ -1760,24 +1732,9 @@ private struct WriteSequenceAdapter : ISendBufferAdapter public bool IsEmpty => _buffers.IsEmpty; - public unsafe void SetupSendState(State state) + public void SetupSendState(State state) { - int count = 0; - - foreach (ReadOnlyMemory _ in _buffers) - { - ++count; - } - - ISendBufferAdapter.Reserve(state, count); - state.SendBufferCount = count; - count = 0; - - foreach (ReadOnlyMemory buffer in _buffers) - { - ISendBufferAdapter.SetBuffer(state, count, buffer); - ++count; - } + state.SendBuffers.Initialize(_buffers); } } @@ -1789,18 +1746,9 @@ private struct WriteMemoryOfMemoryAdapter : ISendBufferAdapter public bool IsEmpty => _buffers.IsEmpty; - public unsafe void SetupSendState(State state) + public void SetupSendState(State state) { - int count = _buffers.Length; - ISendBufferAdapter.Reserve(state, count); - state.SendBufferCount = count; - count = 0; - - foreach (ReadOnlyMemory buffer in _buffers.Span) - { - ISendBufferAdapter.SetBuffer(state, count, buffer); - ++count; - } + state.SendBuffers.Initialize(_buffers); } } } From 4589cbcb843d7efcb153fd4066b5b348ca514ea3 Mon Sep 17 00:00:00 2001 From: Radek Zikmund Date: Thu, 5 May 2022 14:53:37 +0200 Subject: [PATCH 3/8] Minor changes --- .../Implementations/MsQuic/MsQuicStream.cs | 25 +++---------------- 1 file changed, 3 insertions(+), 22 deletions(-) diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs index fa79886cc1a30..bb230e5f6c815 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs @@ -1706,50 +1706,31 @@ private enum SendState private interface ISendBufferAdapter { void SetupSendState(State state); - bool IsEmpty { get; } } private struct WriteMemoryAdapter : ISendBufferAdapter { private readonly ReadOnlyMemory _buffer; - public WriteMemoryAdapter(ReadOnlyMemory buffer) => _buffer = buffer; - public bool IsEmpty => _buffer.IsEmpty; - - public void SetupSendState(State state) - { - state.SendBuffers.Initialize(_buffer); - } + public void SetupSendState(State state) => state.SendBuffers.Initialize(_buffer); } private struct WriteSequenceAdapter : ISendBufferAdapter { private readonly ReadOnlySequence _buffers; - public WriteSequenceAdapter(ReadOnlySequence buffers) => _buffers = buffers; - public bool IsEmpty => _buffers.IsEmpty; - - public void SetupSendState(State state) - { - state.SendBuffers.Initialize(_buffers); - } + public void SetupSendState(State state) => state.SendBuffers.Initialize(_buffers); } private struct WriteMemoryOfMemoryAdapter : ISendBufferAdapter { private readonly ReadOnlyMemory> _buffers; - public WriteMemoryOfMemoryAdapter(ReadOnlyMemory> buffers) => _buffers = buffers; - public bool IsEmpty => _buffers.IsEmpty; - - public void SetupSendState(State state) - { - state.SendBuffers.Initialize(_buffers); - } + public void SetupSendState(State state) => state.SendBuffers.Initialize(_buffers); } } } From 491ce390e2cabd0d74543ddbf0564aa9cd056d98 Mon Sep 17 00:00:00 2001 From: Radek Zikmund Date: Thu, 12 May 2022 15:53:34 +0200 Subject: [PATCH 4/8] Fixes after rebase --- .../Net/Quic/Implementations/MsQuic/MsQuicStream.cs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs index bb230e5f6c815..c86cecf0f50f4 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs @@ -328,19 +328,18 @@ private unsafe ValueTask WriteAsyncInternal(TAdapter adapter, QUIC_SEN _state.SendBuffers.Buffers, (uint)_state.SendBuffers.Count, flags, - IntPtr.Zero); + (void*)IntPtr.Zero); - if (!MsQuicStatusHelper.SuccessfulStatusCode(status)) + if (StatusFailed(status)) { CleanupWriteFailedState(); CleanupSendState(_state); - if (status == MsQuicStatusCodes.Aborted) + if (status == QUIC_STATUS_ABORTED) { throw ThrowHelper.GetConnectionAbortedException(_state.ConnectionState.AbortErrorCode); } - QuicExceptionHelpers.ThrowIfFailed(status, - "Could not send data to peer."); + ThrowIfFailure(status, "Could not send data to peer."); } return _state.SendResettableCompletionSource.GetTypelessValueTask(); From faf5e8b1d48b69d33cd75bc04c9f895107909c38 Mon Sep 17 00:00:00 2001 From: Radek Zikmund Date: Thu, 19 May 2022 19:42:16 +0200 Subject: [PATCH 5/8] Code review feedback --- .../MsQuic/Internal/MsQuicBuffers.cs | 16 +++++++--------- .../Quic/Implementations/MsQuic/MsQuicStream.cs | 4 ++-- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/Internal/MsQuicBuffers.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/Internal/MsQuicBuffers.cs index 786bca1e4c5ff..91dcd897e32d9 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/Internal/MsQuicBuffers.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/Internal/MsQuicBuffers.cs @@ -47,6 +47,8 @@ private void Reserve(int count) FreeNativeMemory(); _buffers = (QUIC_BUFFER*)NativeMemory.Alloc((nuint)count, (nuint)sizeof(QUIC_BUFFER)); } + + _count = count; } private void SetBuffer(int index, ReadOnlyMemory buffer) @@ -68,7 +70,6 @@ private void SetBuffer(int index, ReadOnlyMemory buffer) public void Initialize(IList inputs, Func> toBuffer) { Reserve(inputs.Count); - _count = inputs.Count; for (int i = 0; i < inputs.Count; ++i) { @@ -85,7 +86,6 @@ public void Initialize(IList inputs, Func> toBuffe public void Initialize(ReadOnlyMemory buffer) { Reserve(1); - _count = 1; SetBuffer(0, buffer); } @@ -103,13 +103,10 @@ public void Initialize(ReadOnlySequence buffers) } Reserve(count); - _count = count; - count = 0; - + int i = 0; foreach (ReadOnlyMemory buffer in buffers) { - SetBuffer(count, buffer); - ++count; + SetBuffer(i++, buffer); } } @@ -124,9 +121,10 @@ public void Initialize(ReadOnlyMemory> buffers) Reserve(count); _count = count; - for (int i = 0; i < buffers.Length; i++) + ReadOnlySpan> span = buffers.Span; + for (int i = 0; i < span.Length; i++) { - SetBuffer(i, buffers.Span[i]); + SetBuffer(i, span[i]); } } diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs index c86cecf0f50f4..2e232a0488f3a 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs @@ -303,12 +303,12 @@ private async ValueTask WriteAsync(TAdapter adapter, QUIC_SEND_FLAGS f using CancellationTokenRegistration registration = SetupWriteStartState(adapter.IsEmpty, cancellationToken); - await WriteAsyncInternal(adapter, flags).ConfigureAwait(false); + await WriteAsyncCore(adapter, flags).ConfigureAwait(false); CleanupWriteCompletedState(); } - private unsafe ValueTask WriteAsyncInternal(TAdapter adapter, QUIC_SEND_FLAGS flags) where TAdapter : ISendBufferAdapter + private unsafe ValueTask WriteAsyncCore(TAdapter adapter, QUIC_SEND_FLAGS flags) where TAdapter : ISendBufferAdapter { if (adapter.IsEmpty) { From 668b81f31adfe542a41a73b9695d30973c364734 Mon Sep 17 00:00:00 2001 From: Radek Zikmund Date: Mon, 23 May 2022 13:07:43 +0200 Subject: [PATCH 6/8] Use static lambdas instead of adapter structs --- .../Implementations/MsQuic/MsQuicStream.cs | 48 ++++--------------- 1 file changed, 9 insertions(+), 39 deletions(-) diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs index 2e232a0488f3a..5c8f16d6cd0cd 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs @@ -279,7 +279,7 @@ internal override ValueTask WriteAsync(ReadOnlySequence buffers, Cancellat internal override ValueTask WriteAsync(ReadOnlySequence buffers, bool endStream, CancellationToken cancellationToken = default) { - return WriteAsync(new WriteSequenceAdapter(buffers), endStream ? QUIC_SEND_FLAGS.FIN : QUIC_SEND_FLAGS.NONE, cancellationToken); + return WriteAsync(static (state, buffers) => state.SendBuffers.Initialize(buffers), buffers, buffers.IsEmpty, endStream, cancellationToken); } internal override ValueTask WriteAsync(ReadOnlyMemory> buffers, CancellationToken cancellationToken = default) @@ -289,28 +289,28 @@ internal override ValueTask WriteAsync(ReadOnlyMemory> buff internal override ValueTask WriteAsync(ReadOnlyMemory> buffers, bool endStream, CancellationToken cancellationToken = default) { - return WriteAsync(new WriteMemoryOfMemoryAdapter(buffers), endStream ? QUIC_SEND_FLAGS.FIN : QUIC_SEND_FLAGS.NONE, cancellationToken); + return WriteAsync(static (state, buffers) => state.SendBuffers.Initialize(buffers), buffers, buffers.IsEmpty, endStream, cancellationToken); } internal override ValueTask WriteAsync(ReadOnlyMemory buffer, bool endStream, CancellationToken cancellationToken = default) { - return WriteAsync(new WriteMemoryAdapter(buffer), endStream ? QUIC_SEND_FLAGS.FIN : QUIC_SEND_FLAGS.NONE, cancellationToken); + return WriteAsync(static (state, buffer) => state.SendBuffers.Initialize(buffer), buffer, buffer.IsEmpty, endStream, cancellationToken); } - private async ValueTask WriteAsync(TAdapter adapter, QUIC_SEND_FLAGS flags, CancellationToken cancellationToken) where TAdapter : ISendBufferAdapter + private async ValueTask WriteAsync(Action stateSetup, TBuffer buffer, bool isEmpty, bool endStream, CancellationToken cancellationToken) { ThrowIfDisposed(); - using CancellationTokenRegistration registration = SetupWriteStartState(adapter.IsEmpty, cancellationToken); + using CancellationTokenRegistration registration = SetupWriteStartState(isEmpty, cancellationToken); - await WriteAsyncCore(adapter, flags).ConfigureAwait(false); + await WriteAsyncCore(stateSetup, buffer, isEmpty, endStream ? QUIC_SEND_FLAGS.FIN : QUIC_SEND_FLAGS.NONE).ConfigureAwait(false); CleanupWriteCompletedState(); } - private unsafe ValueTask WriteAsyncCore(TAdapter adapter, QUIC_SEND_FLAGS flags) where TAdapter : ISendBufferAdapter + private unsafe ValueTask WriteAsyncCore(Action stateSetup, TBuffer buffer, bool isEmpty, QUIC_SEND_FLAGS flags) { - if (adapter.IsEmpty) + if (isEmpty) { if ((flags & QUIC_SEND_FLAGS.FIN) == QUIC_SEND_FLAGS.FIN) { @@ -320,7 +320,7 @@ private unsafe ValueTask WriteAsyncCore(TAdapter adapter, QUIC_SEND_FL return default; } - adapter.SetupSendState(_state); + stateSetup(_state, buffer); Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)"); int status = MsQuicApi.Api.ApiTable->StreamSend( @@ -1701,35 +1701,5 @@ private enum SendState /// Closed } - - private interface ISendBufferAdapter - { - void SetupSendState(State state); - bool IsEmpty { get; } - } - - private struct WriteMemoryAdapter : ISendBufferAdapter - { - private readonly ReadOnlyMemory _buffer; - public WriteMemoryAdapter(ReadOnlyMemory buffer) => _buffer = buffer; - public bool IsEmpty => _buffer.IsEmpty; - public void SetupSendState(State state) => state.SendBuffers.Initialize(_buffer); - } - - private struct WriteSequenceAdapter : ISendBufferAdapter - { - private readonly ReadOnlySequence _buffers; - public WriteSequenceAdapter(ReadOnlySequence buffers) => _buffers = buffers; - public bool IsEmpty => _buffers.IsEmpty; - public void SetupSendState(State state) => state.SendBuffers.Initialize(_buffers); - } - - private struct WriteMemoryOfMemoryAdapter : ISendBufferAdapter - { - private readonly ReadOnlyMemory> _buffers; - public WriteMemoryOfMemoryAdapter(ReadOnlyMemory> buffers) => _buffers = buffers; - public bool IsEmpty => _buffers.IsEmpty; - public void SetupSendState(State state) => state.SendBuffers.Initialize(_buffers); - } } } From de2b5d221dea6150d2140c317e8a102a031d65b4 Mon Sep 17 00:00:00 2001 From: Radek Zikmund Date: Mon, 23 May 2022 13:37:03 +0200 Subject: [PATCH 7/8] Minor change --- .../Net/Quic/Implementations/MsQuic/MsQuicStream.cs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs index 5c8f16d6cd0cd..b599fceaabcb0 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/MsQuicStream.cs @@ -303,16 +303,16 @@ private async ValueTask WriteAsync(Action stateSetup, T using CancellationTokenRegistration registration = SetupWriteStartState(isEmpty, cancellationToken); - await WriteAsyncCore(stateSetup, buffer, isEmpty, endStream ? QUIC_SEND_FLAGS.FIN : QUIC_SEND_FLAGS.NONE).ConfigureAwait(false); + await WriteAsyncCore(stateSetup, buffer, isEmpty, endStream).ConfigureAwait(false); CleanupWriteCompletedState(); } - private unsafe ValueTask WriteAsyncCore(Action stateSetup, TBuffer buffer, bool isEmpty, QUIC_SEND_FLAGS flags) + private unsafe ValueTask WriteAsyncCore(Action stateSetup, TBuffer buffer, bool isEmpty, bool endStream) { if (isEmpty) { - if ((flags & QUIC_SEND_FLAGS.FIN) == QUIC_SEND_FLAGS.FIN) + if (endStream) { // Start graceful shutdown sequence if passed in the fin flag and there is an empty buffer. StartShutdown(QUIC_STREAM_SHUTDOWN_FLAGS.GRACEFUL, errorCode: 0); @@ -327,7 +327,7 @@ private unsafe ValueTask WriteAsyncCore(Action stateSet _state.Handle.QuicHandle, _state.SendBuffers.Buffers, (uint)_state.SendBuffers.Count, - flags, + endStream ? QUIC_SEND_FLAGS.FIN : QUIC_SEND_FLAGS.NONE, (void*)IntPtr.Zero); if (StatusFailed(status)) From 7247995f33ecdb08fe186126da48fe28f5cb4a99 Mon Sep 17 00:00:00 2001 From: Radek Zikmund Date: Mon, 23 May 2022 14:21:41 +0200 Subject: [PATCH 8/8] fixup! Minor change --- .../Net/Quic/Implementations/MsQuic/Internal/MsQuicBuffers.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/Internal/MsQuicBuffers.cs b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/Internal/MsQuicBuffers.cs index 91dcd897e32d9..cc0c9c177739c 100644 --- a/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/Internal/MsQuicBuffers.cs +++ b/src/libraries/System.Net.Quic/src/System/Net/Quic/Implementations/MsQuic/Internal/MsQuicBuffers.cs @@ -119,7 +119,6 @@ public void Initialize(ReadOnlyMemory> buffers) { int count = buffers.Length; Reserve(count); - _count = count; ReadOnlySpan> span = buffers.Span; for (int i = 0; i < span.Length; i++)