diff --git a/samples/System.IO.Pipelines.Samples/CompressionSample.cs b/samples/System.IO.Pipelines.Samples/CompressionSample.cs index 74501db7cd0..353cd898069 100644 --- a/samples/System.IO.Pipelines.Samples/CompressionSample.cs +++ b/samples/System.IO.Pipelines.Samples/CompressionSample.cs @@ -10,7 +10,7 @@ public class CompressionSample { public static void Run() { - using (var factory = new PipelineFactory()) + using (var factory = new PipeFactory()) { var filePath = Path.GetFullPath("Program.cs"); diff --git a/samples/System.IO.Pipelines.Samples/Framing/Codec.cs b/samples/System.IO.Pipelines.Samples/Framing/Codec.cs index a76f7b3c462..1e5967a9127 100644 --- a/samples/System.IO.Pipelines.Samples/Framing/Codec.cs +++ b/samples/System.IO.Pipelines.Samples/Framing/Codec.cs @@ -81,7 +81,7 @@ public static void Run() thread.Dispose(); } - public static IPipelineConnection MakePipeline(IPipelineConnection connection) + public static IPipeConnection MakePipeline(IPipeConnection connection) { // Do something fancy here to wrap the connection, SSL etc return connection; @@ -97,7 +97,7 @@ public class LineHandler : IFrameHandler { private PipelineTextOutput _textOutput; - public void Initialize(IPipelineConnection connection) + public void Initialize(IPipeConnection connection) { _textOutput = new PipelineTextOutput(connection.Output, EncodingData.InvariantUtf8); } @@ -135,7 +135,7 @@ public interface IFrameDecoder public interface IFrameHandler { - void Initialize(IPipelineConnection connection); + void Initialize(IPipeConnection connection); Task HandleAsync(TInput message); } diff --git a/samples/System.IO.Pipelines.Samples/HttpClient/PipelineHttpContent.cs b/samples/System.IO.Pipelines.Samples/HttpClient/PipelineHttpContent.cs index db9ea0ef57f..93d097f2a2d 100644 --- a/samples/System.IO.Pipelines.Samples/HttpClient/PipelineHttpContent.cs +++ b/samples/System.IO.Pipelines.Samples/HttpClient/PipelineHttpContent.cs @@ -10,9 +10,9 @@ namespace System.IO.Pipelines.Samples { public class PipelineHttpContent : HttpContent { - private readonly IPipelineReader _output; + private readonly IPipeReader _output; - public PipelineHttpContent(IPipelineReader output) + public PipelineHttpContent(IPipeReader output) { _output = output; } diff --git a/samples/System.IO.Pipelines.Samples/HttpServer/HttpConnection.cs b/samples/System.IO.Pipelines.Samples/HttpServer/HttpConnection.cs index 8961fb7475d..2f8cdd79152 100644 --- a/samples/System.IO.Pipelines.Samples/HttpServer/HttpConnection.cs +++ b/samples/System.IO.Pipelines.Samples/HttpServer/HttpConnection.cs @@ -14,8 +14,8 @@ public partial class HttpConnection private static readonly byte[] _chunkedEndBytes = Encoding.UTF8.GetBytes("0\r\n\r\n"); private static readonly byte[] _endChunkBytes = Encoding.ASCII.GetBytes("\r\n"); - private readonly IPipelineReader _input; - private readonly IPipelineWriter _output; + private readonly IPipeReader _input; + private readonly IPipeWriter _output; private readonly IHttpApplication _application; public RequestHeaderDictionary RequestHeaders => _parser.RequestHeaders; @@ -38,7 +38,7 @@ public partial class HttpConnection private HttpRequestParser _parser = new HttpRequestParser(); - public HttpConnection(IHttpApplication application, IPipelineReader input, IPipelineWriter output) + public HttpConnection(IHttpApplication application, IPipeReader input, IPipeWriter output) { _application = application; _input = input; @@ -47,9 +47,9 @@ public HttpConnection(IHttpApplication application, IPipelineReader in _responseBody = new HttpResponseStream(this); } - public IPipelineReader Input => _input; + public IPipeReader Input => _input; - public IPipelineWriter Output => _output; + public IPipeWriter Output => _output; public HttpRequestStream RequestBody { get; set; } diff --git a/samples/System.IO.Pipelines.Samples/HttpServer/HttpServer.cs b/samples/System.IO.Pipelines.Samples/HttpServer/HttpServer.cs index be87e6715f9..aa5b74b8833 100644 --- a/samples/System.IO.Pipelines.Samples/HttpServer/HttpServer.cs +++ b/samples/System.IO.Pipelines.Samples/HttpServer/HttpServer.cs @@ -91,7 +91,7 @@ private async void StartAcceptingConnections(IHttpApplication(IHttpApplication(IHttpApplication application, PipelineFactory pipelineFactory, Socket socket) + private static async Task ProcessConnection(IHttpApplication application, PipeFactory pipeFactory, Socket socket) { using (var ns = new NetworkStream(socket)) { - using (var connection = pipelineFactory.CreateConnection(ns)) + using (var connection = pipeFactory.CreateConnection(ns)) { await ProcessClient(application, connection); } } } - private static async Task ProcessClient(IHttpApplication application, IPipelineConnection pipelineConnection) + private static async Task ProcessClient(IHttpApplication application, IPipeConnection pipeConnection) { - var connection = new HttpConnection(application, pipelineConnection.Input, pipelineConnection.Output); + var connection = new HttpConnection(application, pipeConnection.Input, pipeConnection.Output); await connection.ProcessAllRequests(); } diff --git a/samples/System.IO.Pipelines.Samples/RawLibuvHttpClientSample.cs b/samples/System.IO.Pipelines.Samples/RawLibuvHttpClientSample.cs index 1accb9255d8..011c22ea6eb 100644 --- a/samples/System.IO.Pipelines.Samples/RawLibuvHttpClientSample.cs +++ b/samples/System.IO.Pipelines.Samples/RawLibuvHttpClientSample.cs @@ -17,7 +17,7 @@ public static async Task Run() var thread = new UvThread(); var client = new UvTcpClient(thread, new IPEndPoint(IPAddress.Loopback, 5000)); - var consoleOutput = thread.PipelineFactory.CreateWriter(Console.OpenStandardOutput()); + var consoleOutput = thread.PipeFactory.CreateWriter(Console.OpenStandardOutput()); var connection = await client.ConnectAsync(); @@ -36,7 +36,7 @@ public static async Task Run() await Task.Delay(1000); } } - private static async Task CopyCompletedAsync(IPipelineReader input, IPipelineWriter output) + private static async Task CopyCompletedAsync(IPipeReader input, IPipeWriter output) { var result = await input.ReadAsync(); var inputBuffer = result.Buffer; diff --git a/src/System.IO.Pipelines.Compression/CompressionPipelineExtensions.cs b/src/System.IO.Pipelines.Compression/CompressionPipelineExtensions.cs index 8a403bbb54d..74511fa20a6 100644 --- a/src/System.IO.Pipelines.Compression/CompressionPipelineExtensions.cs +++ b/src/System.IO.Pipelines.Compression/CompressionPipelineExtensions.cs @@ -9,61 +9,61 @@ namespace System.IO.Pipelines.Compression { public static class CompressionPipelineExtensions { - public static IPipelineReader DeflateDecompress(this IPipelineReader reader, PipelineFactory factory) + public static IPipeReader DeflateDecompress(this IPipeReader reader, PipeFactory factory) { var inflater = new ReadableDeflateTransform(ZLibNative.Deflate_DefaultWindowBits); return factory.CreateReader(reader, inflater.Execute); } - public static IPipelineReader DeflateCompress(this IPipelineReader reader, PipelineFactory factory, CompressionLevel compressionLevel) + public static IPipeReader DeflateCompress(this IPipeReader reader, PipeFactory factory, CompressionLevel compressionLevel) { var deflater = new WritableDeflateTransform(compressionLevel, ZLibNative.Deflate_DefaultWindowBits); return factory.CreateReader(reader, deflater.Execute); } - public static IPipelineReader GZipDecompress(this IPipelineReader reader, PipelineFactory factory) + public static IPipeReader GZipDecompress(this IPipeReader reader, PipeFactory factory) { var inflater = new ReadableDeflateTransform(ZLibNative.GZip_DefaultWindowBits); return factory.CreateReader(reader, inflater.Execute); } - public static IPipelineWriter GZipCompress(this IPipelineWriter writer, PipelineFactory factory, CompressionLevel compressionLevel) + public static IPipeWriter GZipCompress(this IPipeWriter writer, PipeFactory factory, CompressionLevel compressionLevel) { var deflater = new WritableDeflateTransform(compressionLevel, ZLibNative.GZip_DefaultWindowBits); return factory.CreateWriter(writer, deflater.Execute); } - public static IPipelineReader GZipCompress(this IPipelineReader reader, PipelineFactory factory, CompressionLevel compressionLevel) + public static IPipeReader GZipCompress(this IPipeReader reader, PipeFactory factory, CompressionLevel compressionLevel) { var deflater = new WritableDeflateTransform(compressionLevel, ZLibNative.GZip_DefaultWindowBits); return factory.CreateReader(reader, deflater.Execute); } - public static IPipelineReader CreateDeflateDecompressReader(this PipelineFactory factory, IPipelineReader reader) + public static IPipeReader CreateDeflateDecompressReader(this PipeFactory factory, IPipeReader reader) { var inflater = new ReadableDeflateTransform(ZLibNative.Deflate_DefaultWindowBits); return factory.CreateReader(reader, inflater.Execute); } - public static IPipelineReader CreateDeflateCompressReader(this PipelineFactory factory, IPipelineReader reader, CompressionLevel compressionLevel) + public static IPipeReader CreateDeflateCompressReader(this PipeFactory factory, IPipeReader reader, CompressionLevel compressionLevel) { var deflater = new WritableDeflateTransform(compressionLevel, ZLibNative.Deflate_DefaultWindowBits); return factory.CreateReader(reader, deflater.Execute); } - public static IPipelineReader CreateGZipDecompressReader(this PipelineFactory factory, IPipelineReader reader) + public static IPipeReader CreateGZipDecompressReader(this PipeFactory factory, IPipeReader reader) { var inflater = new ReadableDeflateTransform(ZLibNative.GZip_DefaultWindowBits); return factory.CreateReader(reader, inflater.Execute); } - public static IPipelineWriter CreateGZipCompressWriter(this PipelineFactory factory, IPipelineWriter writer, CompressionLevel compressionLevel) + public static IPipeWriter CreateGZipCompressWriter(this PipeFactory factory, IPipeWriter writer, CompressionLevel compressionLevel) { var deflater = new WritableDeflateTransform(compressionLevel, ZLibNative.GZip_DefaultWindowBits); return factory.CreateWriter(writer, deflater.Execute); } - public static IPipelineReader CreateGZipCompressReader(this PipelineFactory factory, IPipelineReader reader, CompressionLevel compressionLevel) + public static IPipeReader CreateGZipCompressReader(this PipeFactory factory, IPipeReader reader, CompressionLevel compressionLevel) { var deflater = new WritableDeflateTransform(compressionLevel, ZLibNative.GZip_DefaultWindowBits); return factory.CreateReader(reader, deflater.Execute); @@ -78,7 +78,7 @@ public WritableDeflateTransform(CompressionLevel compressionLevel, int bits) _deflater = new Deflater(compressionLevel, bits); } - public async Task Execute(IPipelineReader reader, IPipelineWriter writer) + public async Task Execute(IPipeReader reader, IPipeWriter writer) { while (true) { @@ -207,7 +207,7 @@ public ReadableDeflateTransform(int bits) _inflater = new Inflater(bits); } - public async Task Execute(IPipelineReader reader, IPipelineWriter writer) + public async Task Execute(IPipeReader reader, IPipeWriter writer) { while (true) { diff --git a/src/System.IO.Pipelines.File/FileReader.cs b/src/System.IO.Pipelines.File/FileReader.cs index 672a6946748..f817fb3e60a 100644 --- a/src/System.IO.Pipelines.File/FileReader.cs +++ b/src/System.IO.Pipelines.File/FileReader.cs @@ -9,13 +9,13 @@ namespace System.IO.Pipelines.File { - public class FileReader : PipelineReader + public class FileReader : PipeReader { public FileReader(MemoryPool pool) : base(pool) { } - public FileReader(Pipe pipe) : base(pipe) + public FileReader(IPipe pipe) : base(pipe) { } @@ -29,7 +29,7 @@ public unsafe void OpenReadFile(string path) var readOperation = new ReadOperation { - Writer = _pipe, + Writer = Pipe.Writer, FileHandle = fileHandle, ThreadPoolBoundHandle = handle, IOCallback = IOCallback @@ -38,7 +38,7 @@ public unsafe void OpenReadFile(string path) var overlapped = new PreAllocatedOverlapped(IOCallback, readOperation, null); readOperation.PreAllocatedOverlapped = overlapped; - _pipe.ReadingStarted.ContinueWith((t, state) => + Pipe.ReadingStarted.ContinueWith((t, state) => { ((ReadOperation)state).Read(); }, @@ -103,7 +103,7 @@ private class ReadOperation public unsafe NativeOverlapped* Overlapped { get; set; } - public IPipelineWriter Writer { get; set; } + public IPipeWriter Writer { get; set; } public StrongBox BoxedBuffer { get; set; } diff --git a/src/System.IO.Pipelines.File/ReadableFilePipelineFactoryExtensions.cs b/src/System.IO.Pipelines.File/ReadableFilePipelineFactoryExtensions.cs index ca7e0350fe1..617ac2b2c97 100644 --- a/src/System.IO.Pipelines.File/ReadableFilePipelineFactoryExtensions.cs +++ b/src/System.IO.Pipelines.File/ReadableFilePipelineFactoryExtensions.cs @@ -10,7 +10,7 @@ namespace System.IO.Pipelines.File { public static class ReadableFilePipelineFactoryExtensions { - public static IPipelineReader ReadFile(this PipelineFactory factory, string path) + public static IPipeReader ReadFile(this PipeFactory factory, string path) { var pipe = factory.Create(); diff --git a/src/System.IO.Pipelines.Networking.Libuv/UvTcpConnection.cs b/src/System.IO.Pipelines.Networking.Libuv/UvTcpConnection.cs index 4bc441d19e2..2ead4591050 100644 --- a/src/System.IO.Pipelines.Networking.Libuv/UvTcpConnection.cs +++ b/src/System.IO.Pipelines.Networking.Libuv/UvTcpConnection.cs @@ -9,7 +9,7 @@ namespace System.IO.Pipelines.Networking.Libuv { - public class UvTcpConnection : IPipelineConnection + public class UvTcpConnection : IPipeConnection { private const int EOF = -4095; @@ -17,8 +17,8 @@ public class UvTcpConnection : IPipelineConnection private static readonly Func _allocCallback = AllocCallback; private static readonly Action _writeCallback = WriteCallback; - protected readonly Pipe _input; - protected readonly Pipe _output; + protected readonly IPipe _input; + protected readonly IPipe _output; private readonly UvThread _thread; private readonly UvTcpHandle _handle; private volatile bool _stopping; @@ -34,14 +34,14 @@ public UvTcpConnection(UvThread thread, UvTcpHandle handle) _thread = thread; _handle = handle; - _input = _thread.PipelineFactory.Create(new PipeOptions + _input = _thread.PipeFactory.Create(new PipeOptions { // ReaderScheduler = TaskRunScheduler.Default, // execute user code on the thread pool // ReaderScheduler = thread, WriterScheduler = thread // resume from back pressure on the uv thread }); - _output = _thread.PipelineFactory.Create(new PipeOptions + _output = _thread.PipeFactory.Create(new PipeOptions { // WriterScheduler = TaskRunScheduler.Default, // Execute the flush callback on the thread pool // WriterScheduler = thread, @@ -60,17 +60,17 @@ public void Dispose() protected virtual void Dispose(bool disposing) { _stopping = true; - _output.CancelPendingRead(); + _output.Reader.CancelPendingRead(); _sendingTask.Wait(); - _output.CompleteWriter(); - _input.CompleteReader(); + _output.Writer.Complete(); + _input.Reader.Complete(); } - public IPipelineWriter Output => _output; + public IPipeWriter Output => _output.Writer; - public IPipelineReader Input => _input; + public IPipeReader Input => _input.Reader; private async Task ProcessWrites() { @@ -78,7 +78,7 @@ private async Task ProcessWrites() { while (!_stopping) { - var result = await _output.ReadAsync(); + var result = await _output.Reader.ReadAsync(); var buffer = result.Buffer; try @@ -95,17 +95,17 @@ private async Task ProcessWrites() } finally { - _output.Advance(buffer.End); + _output.Reader.Advance(buffer.End); } } } catch (Exception ex) { - _output.CompleteReader(ex); + _output.Reader.Complete(ex); } finally { - _output.CompleteReader(); + _output.Reader.Complete(); // Drain the pending writes if (_pendingWrites > 0) @@ -118,7 +118,7 @@ private async Task ProcessWrites() _handle.Dispose(); // We'll never call the callback after disposing the handle - _input.CompleteWriter(); + _input.Writer.Complete(); } } @@ -194,7 +194,7 @@ private async void OnRead(UvStreamHandle handle, int status) // REVIEW: Should we treat ECONNRESET as an error? // Ignore the error for now - _input.CompleteWriter(); + _input.Writer.Complete(); } else { @@ -222,7 +222,7 @@ private async void OnRead(UvStreamHandle handle, int status) else { // We're done writing, the reading is gone - _input.CompleteWriter(); + _input.Writer.Complete(); } } } @@ -230,7 +230,7 @@ private async void OnRead(UvStreamHandle handle, int status) if (normalDone) { - _input.CompleteWriter(); + _input.Writer.Complete(); } } @@ -241,7 +241,7 @@ private static Uv.uv_buf_t AllocCallback(UvStreamHandle handle, int status, obje private unsafe Uv.uv_buf_t OnAlloc(UvStreamHandle handle, int status) { - var inputBuffer = _input.Alloc(2048); + var inputBuffer = _input.Writer.Alloc(2048); _inputBuffer = inputBuffer; diff --git a/src/System.IO.Pipelines.Networking.Libuv/UvThread.cs b/src/System.IO.Pipelines.Networking.Libuv/UvThread.cs index ab190768bee..99811876359 100644 --- a/src/System.IO.Pipelines.Networking.Libuv/UvThread.cs +++ b/src/System.IO.Pipelines.Networking.Libuv/UvThread.cs @@ -32,7 +32,7 @@ public UvThread() public UvLoopHandle Loop { get; private set; } - public PipelineFactory PipelineFactory { get; } = new PipelineFactory(); + public PipeFactory PipeFactory { get; } = new PipeFactory(); public WriteReqPool WriteReqPool { get; } @@ -131,7 +131,7 @@ public void Dispose() { Stop(); - PipelineFactory.Dispose(); + PipeFactory.Dispose(); } public void Schedule(Action action) diff --git a/src/System.IO.Pipelines.Networking.Sockets/SocketConnection.cs b/src/System.IO.Pipelines.Networking.Sockets/SocketConnection.cs index 0f909a863b1..a32f70bbc1b 100644 --- a/src/System.IO.Pipelines.Networking.Sockets/SocketConnection.cs +++ b/src/System.IO.Pipelines.Networking.Sockets/SocketConnection.cs @@ -14,9 +14,9 @@ namespace System.IO.Pipelines.Networking.Sockets { /// - /// Represents an implementation using the async Socket API + /// Represents an implementation using the async Socket API /// - public class SocketConnection : IPipelineConnection + public class SocketConnection : IPipeConnection { private static readonly EventHandler _asyncCompleted = OnAsyncCompleted; @@ -37,8 +37,8 @@ public class SocketConnection : IPipelineConnection private readonly bool _ownsFactory; - private PipelineFactory _factory; - private Pipe _input, _output; + private PipeFactory _factory; + private IPipe _input, _output; private Socket _socket; private Task _receiveTask; private Task _sendTask; @@ -72,19 +72,19 @@ static SocketConnection() } } - internal SocketConnection(Socket socket, PipelineFactory factory) + internal SocketConnection(Socket socket, PipeFactory factory) { socket.NoDelay = true; _socket = socket; if (factory == null) { _ownsFactory = true; - factory = new PipelineFactory(); + factory = new PipeFactory(); } _factory = factory; - _input = PipelineFactory.Create(); - _output = PipelineFactory.Create(); + _input = PipeFactory.Create(); + _output = PipeFactory.Create(); _receiveTask = ReceiveFromSocketAndPushToWriterAsync(); _sendTask = ReadFromReaderAndWriteToSocketAsync(); @@ -93,14 +93,14 @@ internal SocketConnection(Socket socket, PipelineFactory factory) /// /// Provides access to data received from the socket /// - public IPipelineReader Input => _input; + public IPipeReader Input => _input.Reader; /// /// Provides access to write data to the socket /// - public IPipelineWriter Output => _output; + public IPipeWriter Output => _output.Writer; - private PipelineFactory PipelineFactory => _factory; + private PipeFactory PipeFactory => _factory; private Socket Socket => _socket; @@ -108,8 +108,8 @@ internal SocketConnection(Socket socket, PipelineFactory factory) /// Begins an asynchronous connect operation to the designated endpoint /// /// The endpoint to which to connect - /// Optionally allows the underlying (and hence memory pool) to be specified; if one is not provided, a will be instantiated and owned by the connection - public static Task ConnectAsync(IPEndPoint endPoint, PipelineFactory factory = null) + /// Optionally allows the underlying (and hence memory pool) to be specified; if one is not provided, a will be instantiated and owned by the connection + public static Task ConnectAsync(IPEndPoint endPoint, PipeFactory factory = null) { var args = new SocketAsyncEventArgs(); args.RemoteEndPoint = endPoint; @@ -175,12 +175,12 @@ protected virtual void Dispose(bool disposing) if (disposing) { _stopping = true; - _output.CancelPendingRead(); + _output.Reader.CancelPendingRead(); Task.WaitAll(_sendTask, _receiveTask); - _output.CompleteWriter(); - _input.CompleteReader(); + _output.Writer.Complete(); + _input.Reader.Complete(); GC.SuppressFinalize(this); @@ -217,7 +217,7 @@ private static void OnConnect(SocketAsyncEventArgs e) { if (e.SocketError == SocketError.Success) { - tcs.TrySetResult(new SocketConnection(e.ConnectSocket, (PipelineFactory)tcs.Task.AsyncState)); + tcs.TrySetResult(new SocketConnection(e.ConnectSocket, (PipeFactory)tcs.Task.AsyncState)); } else { @@ -339,7 +339,7 @@ private async Task ReceiveFromSocketAndPushToWriterAsync() // certainly want to coalesce the initial buffer (from the speculative receive) with the initial // data, but we probably don't want to buffer indefinitely; for now, it will buffer up to 4 pages // before flushing (entirely arbitrarily) - might want to make this configurable later - buffer = _input.Alloc(SmallBufferSize * 2); + buffer = _input.Writer.Alloc(SmallBufferSize * 2); haveWriteBuffer = true; const int FlushInputEveryBytes = 4 * MemoryPool.MaxPooledBlockLength; @@ -393,7 +393,7 @@ private async Task ReceiveFromSocketAndPushToWriterAsync() } } } - _input.CompleteWriter(); + _input.Writer.Complete(); } catch (Exception ex) { @@ -403,7 +403,7 @@ private async Task ReceiveFromSocketAndPushToWriterAsync() { args.UserToken = null; } - _input?.CompleteWriter(ex); + _input?.Writer.Complete(ex); } finally { @@ -528,7 +528,7 @@ private async Task ReadFromReaderAndWriteToSocketAsync() while (!_stopping) { - var result = await _output.ReadAsync(); + var result = await _output.Reader.ReadAsync(); var buffer = result.Buffer; try { @@ -559,10 +559,10 @@ private async Task ReadFromReaderAndWriteToSocketAsync() } finally { - _output.Advance(buffer.End); + _output.Reader.Advance(buffer.End); } } - _output.CompleteReader(); + _output.Reader.Complete(); } catch (Exception ex) { @@ -572,7 +572,7 @@ private async Task ReadFromReaderAndWriteToSocketAsync() { args.UserToken = null; } - _output?.CompleteReader(ex); + _output?.Reader.Complete(ex); } finally { diff --git a/src/System.IO.Pipelines.Networking.Sockets/SocketListener.cs b/src/System.IO.Pipelines.Networking.Sockets/SocketListener.cs index a5a59b38414..e5613aac6dc 100644 --- a/src/System.IO.Pipelines.Networking.Sockets/SocketListener.cs +++ b/src/System.IO.Pipelines.Networking.Sockets/SocketListener.cs @@ -16,19 +16,19 @@ public class SocketListener : IDisposable private readonly bool _ownsFactory; private Socket _socket; private Socket Socket => _socket; - private PipelineFactory _factory; - private PipelineFactory PipelineFactory => _factory; + private PipeFactory _factory; + private PipeFactory PipeFactory => _factory; private Func Callback { get; set; } static readonly EventHandler _asyncCompleted = OnAsyncCompleted; /// /// Creates a new SocketListener instance /// - /// Optionally allows the underlying (and hence memory pool) to be specified; if one is not provided, a will be instantiated and owned by the listener - public SocketListener(PipelineFactory factory = null) + /// Optionally allows the underlying (and hence memory pool) to be specified; if one is not provided, a will be instantiated and owned by the listener + public SocketListener(PipeFactory factory = null) { _ownsFactory = factory == null; - _factory = factory ?? new PipelineFactory(); + _factory = factory ?? new PipeFactory(); } /// @@ -128,7 +128,7 @@ private void OnAccept(SocketAsyncEventArgs e) { if (e.SocketError == SocketError.Success) { - var conn = new SocketConnection(e.AcceptSocket, PipelineFactory); + var conn = new SocketConnection(e.AcceptSocket, PipeFactory); e.AcceptSocket = null; ExecuteConnection(conn); } diff --git a/src/System.IO.Pipelines.Networking.Windows.RIO/Internal/RioThread.cs b/src/System.IO.Pipelines.Networking.Windows.RIO/Internal/RioThread.cs index 6c82d626c72..b8a77f880a2 100644 --- a/src/System.IO.Pipelines.Networking.Windows.RIO/Internal/RioThread.cs +++ b/src/System.IO.Pipelines.Networking.Windows.RIO/Internal/RioThread.cs @@ -27,7 +27,7 @@ internal unsafe class RioThread private readonly Queue _notifyBatches; private readonly Queue _processedBatches; - private PipelineFactory _factory; + private PipeFactory _factory; private Dictionary _connections; private List _bufferIdMappings; @@ -37,7 +37,7 @@ internal unsafe class RioThread public IntPtr CompletionPort => _completionPort; - public PipelineFactory PipelineFactory => _factory; + public PipeFactory PipeFactory => _factory; public RioThread(int id, CancellationToken token, IntPtr completionPort, IntPtr completionQueue, RegisteredIO rio) { @@ -174,7 +174,7 @@ private static void RunLogicalCompletions(object state) var memoryPool = new MemoryPool(); memoryPool.RegisterSlabAllocationCallback((slab) => thread.OnSlabAllocated(slab)); memoryPool.RegisterSlabDeallocationCallback((slab) => thread.OnSlabDeallocated(slab)); - thread._factory = new PipelineFactory(memoryPool); + thread._factory = new PipeFactory(memoryPool); thread.ProcessLogicalCompletions(); @@ -200,7 +200,7 @@ private static void RunPhysicalCompletions(object state) var memoryPool = new MemoryPool(); memoryPool.RegisterSlabAllocationCallback((slab) => thread.OnSlabAllocated(slab)); memoryPool.RegisterSlabDeallocationCallback((slab) => thread.OnSlabDeallocated(slab)); - thread._factory = new PipelineFactory(memoryPool); + thread._factory = new PipeFactory(memoryPool); thread.ProcessPhysicalCompletions(); diff --git a/src/System.IO.Pipelines.Networking.Windows.RIO/RioTcpConnection.cs b/src/System.IO.Pipelines.Networking.Windows.RIO/RioTcpConnection.cs index 5c6e8de52f9..bb5bb0c92ed 100644 --- a/src/System.IO.Pipelines.Networking.Windows.RIO/RioTcpConnection.cs +++ b/src/System.IO.Pipelines.Networking.Windows.RIO/RioTcpConnection.cs @@ -10,7 +10,7 @@ namespace System.IO.Pipelines.Networking.Windows.RIO { - public sealed class RioTcpConnection : IPipelineConnection + public sealed class RioTcpConnection : IPipeConnection { private const RioSendFlags MessagePart = RioSendFlags.Defer | RioSendFlags.DontNotify; private const RioSendFlags MessageEnd = RioSendFlags.None; @@ -29,8 +29,8 @@ public sealed class RioTcpConnection : IPipelineConnection private long _previousSendCorrelation = RestartSendCorrelations; - private readonly Pipe _input; - private readonly Pipe _output; + private readonly IPipe _input; + private readonly IPipe _output; private readonly SemaphoreSlim _outgoingSends = new SemaphoreSlim(RioTcpServer.MaxWritesPerSocket); private readonly SemaphoreSlim _previousSendsComplete = new SemaphoreSlim(1); @@ -47,8 +47,8 @@ internal RioTcpConnection(IntPtr socket, long connectionId, IntPtr requestQueue, _rio = rio; _rioThread = rioThread; - _input = rioThread.PipelineFactory.Create(); - _output = rioThread.PipelineFactory.Create(); + _input = rioThread.PipeFactory.Create(); + _output = rioThread.PipeFactory.Create(); _requestQueue = requestQueue; @@ -58,12 +58,12 @@ internal RioTcpConnection(IntPtr socket, long connectionId, IntPtr requestQueue, _sendTask = ProcessSends(); } - public IPipelineReader Input => _input; - public IPipelineWriter Output => _output; + public IPipeReader Input => _input.Reader; + public IPipeWriter Output => _output.Writer; private void ProcessReceives() { - _buffer = _input.Alloc(2048); + _buffer = _input.Writer.Alloc(2048); var receiveBufferSeg = GetSegmentFromMemory(_buffer.Memory); if (!_rio.RioReceive(_requestQueue, ref receiveBufferSeg, 1, RioReceiveFlags.None, 0)) @@ -76,7 +76,7 @@ private async Task ProcessSends() { while (true) { - var result = await _output.ReadAsync(); + var result = await _output.Reader.ReadAsync(); var buffer = result.Buffer; if (buffer.IsEmpty && result.IsCompleted) @@ -105,10 +105,10 @@ private async Task ProcessSends() await SendAsync(current, endOfMessage: true); } - _output.Advance(buffer.End); + _output.Reader.Advance(buffer.End); } - _output.CompleteReader(); + _output.Reader.Complete(); } private Task SendAsync(Memory memory, bool endOfMessage) @@ -203,7 +203,7 @@ public void ReceiveBeginComplete(uint bytesTransferred) { if (bytesTransferred == 0) { - _input.CompleteWriter(); + _input.Writer.Complete(); } else { diff --git a/src/System.IO.Pipelines.Text.Primitives/PipelineTextOutput.cs b/src/System.IO.Pipelines.Text.Primitives/PipelineTextOutput.cs index 50926a4cff8..48a041b3275 100644 --- a/src/System.IO.Pipelines.Text.Primitives/PipelineTextOutput.cs +++ b/src/System.IO.Pipelines.Text.Primitives/PipelineTextOutput.cs @@ -12,11 +12,11 @@ namespace System.IO.Pipelines.Text.Primitives { public class PipelineTextOutput : ITextOutput { - private readonly IPipelineWriter _writer; + private readonly IPipeWriter _writer; private WritableBuffer _writableBuffer; private bool _needAlloc = true; - public PipelineTextOutput(IPipelineWriter writer, EncodingData encoding) + public PipelineTextOutput(IPipeWriter writer, EncodingData encoding) { _writer = writer; Encoding = encoding; diff --git a/src/System.IO.Pipelines.Text.Primitives/PipelineWriterExtensions.cs b/src/System.IO.Pipelines.Text.Primitives/PipelineWriterExtensions.cs index 1e0ef82dbb9..0945c3d1e9d 100644 --- a/src/System.IO.Pipelines.Text.Primitives/PipelineWriterExtensions.cs +++ b/src/System.IO.Pipelines.Text.Primitives/PipelineWriterExtensions.cs @@ -8,7 +8,7 @@ namespace System.IO.Pipelines.Text.Primitives { public static class PipelineWriterExtensions { - public static PipelineTextOutput AsTextOutput(this IPipelineWriter writer, EncodingData formattingData) + public static PipelineTextOutput AsTextOutput(this IPipeWriter writer, EncodingData formattingData) { return new PipelineTextOutput(writer, formattingData); } diff --git a/src/System.IO.Pipelines/DefaultReadableBufferExtensions.cs b/src/System.IO.Pipelines/DefaultReadableBufferExtensions.cs index 2e811defc75..95e02c2ae4f 100644 --- a/src/System.IO.Pipelines/DefaultReadableBufferExtensions.cs +++ b/src/System.IO.Pipelines/DefaultReadableBufferExtensions.cs @@ -54,7 +54,7 @@ private static async Task WriteToStream(Stream stream, Memory memory) } } - public static async Task ReadToEndAsync(this IPipelineReader input) + public static async Task ReadToEndAsync(this IPipeReader input) { while (true) { diff --git a/src/System.IO.Pipelines/IPipe.cs b/src/System.IO.Pipelines/IPipe.cs new file mode 100644 index 00000000000..9a205f39dca --- /dev/null +++ b/src/System.IO.Pipelines/IPipe.cs @@ -0,0 +1,11 @@ +using System.Threading.Tasks; + +namespace System.IO.Pipelines +{ + public interface IPipe + { + IPipeReader Reader { get; } + IPipeWriter Writer { get; } + Task ReadingStarted { get; } + } +} \ No newline at end of file diff --git a/src/System.IO.Pipelines/IPipelineConnection.cs b/src/System.IO.Pipelines/IPipeConnection.cs similarity index 59% rename from src/System.IO.Pipelines/IPipelineConnection.cs rename to src/System.IO.Pipelines/IPipeConnection.cs index a32d4bf4b42..5ac5477b44a 100644 --- a/src/System.IO.Pipelines/IPipelineConnection.cs +++ b/src/System.IO.Pipelines/IPipeConnection.cs @@ -8,16 +8,16 @@ namespace System.IO.Pipelines /// /// Defines a class that provides a connection from which data can be read from and written to. /// - public interface IPipelineConnection : IDisposable + public interface IPipeConnection : IDisposable { /// - /// Gets the half of the duplex connection. + /// Gets the half of the duplex connection. /// - IPipelineReader Input { get; } + IPipeReader Input { get; } /// - /// Gets the half of the duplex connection. + /// Gets the half of the duplex connection. /// - IPipelineWriter Output { get; } + IPipeWriter Output { get; } } } diff --git a/src/System.IO.Pipelines/IPipelineReader.cs b/src/System.IO.Pipelines/IPipeReader.cs similarity index 94% rename from src/System.IO.Pipelines/IPipelineReader.cs rename to src/System.IO.Pipelines/IPipeReader.cs index 953222d9080..d122d750a2c 100644 --- a/src/System.IO.Pipelines/IPipelineReader.cs +++ b/src/System.IO.Pipelines/IPipeReader.cs @@ -8,10 +8,10 @@ namespace System.IO.Pipelines /// /// Defines a class that provides a pipeline from which data can be read. /// - public interface IPipelineReader + public interface IPipeReader { /// - /// Asynchronously reads a sequence of bytes from the current . + /// Asynchronously reads a sequence of bytes from the current . /// /// A representing the asynchronous read operation. ReadableBufferAwaitable ReadAsync(); @@ -28,7 +28,7 @@ public interface IPipelineReader void Advance(ReadCursor consumed, ReadCursor examined); /// - /// Cancel to currently pending or next call to if none is pending, without completing the . + /// Cancel to currently pending or next call to if none is pending, without completing the . /// void CancelPendingRead(); diff --git a/src/System.IO.Pipelines/IPipelineWriter.cs b/src/System.IO.Pipelines/IPipeWriter.cs similarity index 96% rename from src/System.IO.Pipelines/IPipelineWriter.cs rename to src/System.IO.Pipelines/IPipeWriter.cs index 036a6756388..c7cc26428d9 100644 --- a/src/System.IO.Pipelines/IPipelineWriter.cs +++ b/src/System.IO.Pipelines/IPipeWriter.cs @@ -9,7 +9,7 @@ namespace System.IO.Pipelines /// /// Defines a class that provides a pipeline to which data can be written. /// - public interface IPipelineWriter + public interface IPipeWriter { /// /// Allocates memory from the pipeline to write into. diff --git a/src/System.IO.Pipelines/Pipe.cs b/src/System.IO.Pipelines/Pipe.cs index 51d42877419..3eaf8900b49 100644 --- a/src/System.IO.Pipelines/Pipe.cs +++ b/src/System.IO.Pipelines/Pipe.cs @@ -8,9 +8,9 @@ namespace System.IO.Pipelines { /// - /// Default and implementation. + /// Default and implementation. /// - public class Pipe : IPipelineReader, IPipelineWriter, IReadableBufferAwaiter, IWritableBufferAwaiter + internal class Pipe : IPipe, IPipeReader, IPipeWriter, IReadableBufferAwaiter, IWritableBufferAwaiter { private static readonly Action _awaitableIsCompleted = () => { }; private static readonly Action _awaitableIsNotCompleted = () => { }; @@ -100,7 +100,7 @@ public Pipe(IBufferPool pool, PipeOptions options = null) } /// - /// A that completes when the consumer starts consuming the . + /// A that completes when the consumer starts consuming the . /// public Task ReadingStarted => _startingReadingTcs.Task; @@ -126,19 +126,8 @@ public Pipe(IBufferPool pool, PipeOptions options = null) /// /// The minimum size buffer to allocate /// A that can be written to. - public WritableBuffer Alloc(int minimumSize = 0) + WritableBuffer IPipeWriter.Alloc(int minimumSize) { - // CompareExchange not required as its setting to current value if test fails - if (Interlocked.Exchange(ref _producingState, State.Active) != State.NotActive) - { - - ThrowHelper.ThrowInvalidOperationException(ExceptionResource.AlreadyProducing -#if PRODUCING_LOCATION_TRACKING - , _producingLocation -#endif - ); - } - #if PRODUCING_LOCATION_TRACKING _producingLocation = Environment.StackTrace; #endif @@ -155,8 +144,19 @@ public WritableBuffer Alloc(int minimumSize = 0) { ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.minimumSize); } - else if (minimumSize > 0) + + // CompareExchange not required as its setting to current value if test fails + if (Interlocked.Exchange(ref _producingState, State.Active) != State.NotActive) { + ThrowHelper.ThrowInvalidOperationException(ExceptionResource.AlreadyProducing +#if PRODUCING_LOCATION_TRACKING + , _producingLocation +#endif + ); + } + + if (minimumSize > 0) + { try { AllocateWriteHead(minimumSize); @@ -333,7 +333,7 @@ internal void Commit() #endif } - public void AdvanceWriter(int bytesWritten) + internal void AdvanceWriter(int bytesWritten) { EnsureAlloc(); @@ -412,14 +412,12 @@ private ReadableBuffer Read() return new ReadableBuffer(new ReadCursor(head), readEnd); } - - void IPipelineWriter.Complete(Exception exception) => CompleteWriter(exception); - + /// /// Marks the pipeline as being complete, meaning no more items will be written to it. /// /// Optional Exception indicating a failure that's causing the pipeline to complete. - public void CompleteWriter(Exception exception = null) + void IPipeWriter.Complete(Exception exception) { if (_producingState != State.NotActive) { @@ -435,7 +433,7 @@ public void CompleteWriter(Exception exception = null) // TODO: Review this lock? lock (_sync) { - SignalReader(exception); + Complete(_readingTcs, exception); Resume(_readerScheduler, ref _readerCallback); @@ -446,7 +444,9 @@ public void CompleteWriter(Exception exception = null) } } - public void AdvanceReader(ReadCursor consumed, ReadCursor examined) + // Reading + + void IPipeReader.Advance(ReadCursor consumed, ReadCursor examined) { BufferSegment returnStart = null; BufferSegment returnEnd = null; @@ -508,29 +508,11 @@ public void AdvanceReader(ReadCursor consumed, ReadCursor examined) } } - private void SignalWriter(Exception exception) - { - if (exception != null) - { - _writingTcs.TrySetException(exception); - } - else - { - _writingTcs.TrySetResult(null); - } - } - - // Reading - - void IPipelineReader.Complete(Exception exception) => CompleteReader(exception); - - void IPipelineReader.Advance(ReadCursor consumed, ReadCursor examined) => AdvanceReader(consumed, examined); - /// /// Signal to the producer that the consumer is done reading. /// /// Optional Exception indicating a failure that's causing the pipeline to complete. - public void CompleteReader(Exception exception = null) + void IPipeReader.Complete(Exception exception) { if (_consumingState != State.NotActive) { @@ -549,7 +531,7 @@ public void CompleteReader(Exception exception = null) // Trigger this if it's never been triggered _startingReadingTcs.TrySetResult(null); - SignalWriter(exception); + Complete(_writingTcs, exception); Resume(_writerScheduler, ref _writerCallback); @@ -561,9 +543,9 @@ public void CompleteReader(Exception exception = null) } /// - /// Cancel to currently pending call to without completing the . + /// Cancel to currently pending call to without completing the . /// - public void CancelPendingRead() + void IPipeReader.CancelPendingRead() { // TODO: Can factor out this lock lock (_sync) @@ -576,10 +558,10 @@ public void CancelPendingRead() } /// - /// Asynchronously reads a sequence of bytes from the current . + /// Asynchronously reads a sequence of bytes from the current . /// /// A representing the asynchronous read operation. - public ReadableBufferAwaitable ReadAsync() + ReadableBufferAwaitable IPipeReader.ReadAsync() { if (Writing.IsCompleted) { @@ -594,17 +576,6 @@ public ReadableBufferAwaitable ReadAsync() return new ReadableBufferAwaitable(this); } - private void SignalReader(Exception exception) - { - if (exception != null) - { - _readingTcs.TrySetException(exception); - } - else - { - _readingTcs.TrySetResult(null); - } - } // Awaiter support members @@ -683,6 +654,18 @@ private static void Reset(ref Action awaitableState) _awaitableIsNotCompleted, _awaitableIsCompleted); } + + private void Complete(TaskCompletionSource taskCompletionSource, Exception exception) + { + if (exception != null) + { + taskCompletionSource.TrySetException(exception); + } + else + { + taskCompletionSource.TrySetResult(null); + } + } private void Dispose() { @@ -731,7 +714,7 @@ ReadResult IReadableBufferAwaiter.GetResult() return new ReadResult(Read(), isCancelled, isCompleted); } - // IFlushAwaiter members + // IWritableBufferAwaiter members bool IWritableBufferAwaiter.IsCompleted => IsCompleted(_writerCallback); @@ -749,6 +732,9 @@ void IWritableBufferAwaiter.OnCompleted(Action continuation) OnCompleted(continuation, _writerScheduler, ref _writerCallback, _writingTcs); } + public IPipeReader Reader => this; + public IPipeWriter Writer => this; + // Can't use enums with Interlocked private static class State { diff --git a/src/System.IO.Pipelines/PipelineFactory.cs b/src/System.IO.Pipelines/PipeFactory.cs similarity index 69% rename from src/System.IO.Pipelines/PipelineFactory.cs rename to src/System.IO.Pipelines/PipeFactory.cs index 9d3a07654f3..d5e059621b7 100644 --- a/src/System.IO.Pipelines/PipelineFactory.cs +++ b/src/System.IO.Pipelines/PipeFactory.cs @@ -14,30 +14,30 @@ namespace System.IO.Pipelines /// /// Factory used to creaet instances of various pipelines. /// - public class PipelineFactory : IDisposable + public class PipeFactory : IDisposable { private readonly IBufferPool _pool; - public PipelineFactory() : this(new MemoryPool()) + public PipeFactory() : this(new MemoryPool()) { } - public PipelineFactory(IBufferPool pool) + public PipeFactory(IBufferPool pool) { _pool = pool; } - public Pipe Create() + public IPipe Create() { return new Pipe(_pool); } - public Pipe Create(PipeOptions options) + public IPipe Create(PipeOptions options) { return new Pipe(_pool, options); } - public IPipelineReader CreateReader(Stream stream) + public IPipeReader CreateReader(Stream stream) { if (!stream.CanRead) { @@ -56,12 +56,12 @@ private async void ExecuteCopyToAsync(Pipe pipe, Stream stream) await stream.CopyToAsync(pipe); } - public IPipelineConnection CreateConnection(NetworkStream stream) + public IPipeConnection CreateConnection(NetworkStream stream) { - return new StreamPipelineConnection(this, stream); + return new StreamPipeConnection(this, stream); } - public IPipelineWriter CreateWriter(Stream stream) + public IPipeWriter CreateWriter(Stream stream) { if (!stream.CanWrite) { @@ -75,11 +75,11 @@ public IPipelineWriter CreateWriter(Stream stream) var innerPipe = (Pipe)state; if (task.IsFaulted) { - innerPipe.CompleteReader(task.Exception.InnerException); + innerPipe.Reader.Complete(task.Exception.InnerException); } else { - innerPipe.CompleteReader(); + innerPipe.Reader.Complete(); } }, pipe, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); @@ -87,7 +87,7 @@ public IPipelineWriter CreateWriter(Stream stream) return pipe; } - public IPipelineWriter CreateWriter(IPipelineWriter writer, Func consume) + public IPipeWriter CreateWriter(IPipeWriter writer, Func consume) { var pipe = new Pipe(_pool); @@ -98,14 +98,14 @@ public IPipelineWriter CreateWriter(IPipelineWriter writer, Func produce) + public IPipeReader CreateReader(IPipeReader reader, Func produce) { var pipe = new Pipe(_pool); Execute(reader, pipe, produce); return pipe; } - private async void Execute(IPipelineReader reader, Pipe pipe, Func produce) + private async void Execute(IPipeReader reader, Pipe pipe, Func produce) { await pipe.ReadingStarted; diff --git a/src/System.IO.Pipelines/PipelineReader.cs b/src/System.IO.Pipelines/PipeReader.cs similarity index 69% rename from src/System.IO.Pipelines/PipelineReader.cs rename to src/System.IO.Pipelines/PipeReader.cs index 800d2c6fcc4..c89098a92e8 100644 --- a/src/System.IO.Pipelines/PipelineReader.cs +++ b/src/System.IO.Pipelines/PipeReader.cs @@ -9,31 +9,31 @@ namespace System.IO.Pipelines /// /// Represents a pipeline from which data can be read. /// - public abstract class PipelineReader : IPipelineReader + public abstract class PipeReader : IPipeReader { /// - /// The underlying the communicates over. - /// - protected readonly Pipe _pipe; - - /// - /// Creates a base . + /// Creates a base . /// /// The that buffers will be allocated from. - protected PipelineReader(IBufferPool pool) + protected PipeReader(IBufferPool pool) { - _pipe = new Pipe(pool); + Pipe = new Pipe(pool); } /// - /// Creates a base . + /// Creates a base . /// - /// The the communicates over. - protected PipelineReader(Pipe pipe) + /// The the communicates over. + protected PipeReader(IPipe pipe) { - _pipe = pipe; + Pipe = pipe; } + /// + /// The underlying the communicates over. + /// + protected IPipe Pipe { get; } + /// /// Moves forward the pipelines read cursor to after the consumed data. /// @@ -43,23 +43,23 @@ protected PipelineReader(Pipe pipe) /// The memory for the consumed data will be released and no longer available. /// The examined data communicates to the pipeline when it should signal more data is available. /// - public void Advance(ReadCursor consumed, ReadCursor examined) => _pipe.AdvanceReader(consumed, examined); + public void Advance(ReadCursor consumed, ReadCursor examined) => Pipe.Reader.Advance(consumed, examined); /// /// Cancel to currently pending call to /// - public void CancelPendingRead() => _pipe.CancelPendingRead(); + public void CancelPendingRead() => Pipe.Reader.CancelPendingRead(); /// /// Signal to the producer that the consumer is done reading. /// /// Optional Exception indicating a failure that's causing the pipeline to complete. - public void Complete(Exception exception = null) => _pipe.CompleteReader(exception); + public void Complete(Exception exception = null) => Pipe.Reader.Complete(exception); /// - /// Asynchronously reads a sequence of bytes from the current . + /// Asynchronously reads a sequence of bytes from the current . /// /// A representing the asynchronous read operation. - public ReadableBufferAwaitable ReadAsync() => _pipe.ReadAsync(); + public ReadableBufferAwaitable ReadAsync() => Pipe.Reader.ReadAsync(); } } diff --git a/src/System.IO.Pipelines/PipelineWriter.cs b/src/System.IO.Pipelines/PipeWriter.cs similarity index 83% rename from src/System.IO.Pipelines/PipelineWriter.cs rename to src/System.IO.Pipelines/PipeWriter.cs index c31c66b27bd..6cb62a6b675 100644 --- a/src/System.IO.Pipelines/PipelineWriter.cs +++ b/src/System.IO.Pipelines/PipeWriter.cs @@ -6,11 +6,11 @@ namespace System.IO.Pipelines { - public abstract class PipelineWriter : IPipelineWriter + public abstract class PipeWriter : IPipeWriter { private readonly Pipe _pipe; - public PipelineWriter(IBufferPool pool) + public PipeWriter(IBufferPool pool) { _pipe = new Pipe(pool); @@ -19,11 +19,11 @@ public PipelineWriter(IBufferPool pool) protected abstract Task WriteAsync(ReadableBuffer buffer); - public WritableBuffer Alloc(int minimumSize = 0) => _pipe.Alloc(minimumSize); + public WritableBuffer Alloc(int minimumSize = 0) => _pipe.Writer.Alloc(minimumSize); - public void Complete(Exception exception = null) => _pipe.CompleteWriter(exception); + public void Complete(Exception exception = null) => _pipe.Writer.Complete(exception); - private async void Consume(IPipelineReader input) + private async void Consume(IPipeReader input) { while (true) { diff --git a/src/System.IO.Pipelines/PipelineConnectionExtensions.cs b/src/System.IO.Pipelines/PipelineConnectionExtensions.cs index 485f6d33820..a131296bb54 100644 --- a/src/System.IO.Pipelines/PipelineConnectionExtensions.cs +++ b/src/System.IO.Pipelines/PipelineConnectionExtensions.cs @@ -11,7 +11,7 @@ namespace System.IO.Pipelines { public static class PipelineConnectionExtensions { - public static Stream GetStream(this IPipelineConnection connection) + public static Stream GetStream(this IPipeConnection connection) { return new PipelineConnectionStream(connection); } @@ -19,7 +19,7 @@ public static Stream GetStream(this IPipelineConnection connection) public static class PipelineWriterExtensions { - public static async Task WriteAsync(this IPipelineWriter output, Span source) + public static async Task WriteAsync(this IPipeWriter output, Span source) { var writeBuffer = output.Alloc(); writeBuffer.Write(source); @@ -29,12 +29,12 @@ public static async Task WriteAsync(this IPipelineWriter output, Span sour public static class PipelineReaderExtensions { - public static void Advance(this IPipelineReader input, ReadCursor cursor) + public static void Advance(this IPipeReader input, ReadCursor cursor) { input.Advance(cursor, cursor); } - public static ValueTask ReadAsync(this IPipelineReader input, Span destination) + public static ValueTask ReadAsync(this IPipeReader input, Span destination) { while (true) { @@ -67,12 +67,12 @@ public static ValueTask ReadAsync(this IPipelineReader input, Span de return new ValueTask(input.ReadAsyncAwaited(destination)); } - public static Task CopyToAsync(this IPipelineReader input, Stream stream) + public static Task CopyToAsync(this IPipeReader input, Stream stream) { return input.CopyToAsync(stream, 4096, CancellationToken.None); } - public static async Task CopyToAsync(this IPipelineReader input, Stream stream, int bufferSize, CancellationToken cancellationToken) + public static async Task CopyToAsync(this IPipeReader input, Stream stream, int bufferSize, CancellationToken cancellationToken) { // TODO: Use bufferSize argument while (!cancellationToken.IsCancellationRequested) @@ -95,7 +95,7 @@ public static async Task CopyToAsync(this IPipelineReader input, Stream stream, } } - public static async Task CopyToAsync(this IPipelineReader input, IPipelineWriter output) + public static async Task CopyToAsync(this IPipeReader input, IPipeWriter output) { while (true) { @@ -124,7 +124,7 @@ public static async Task CopyToAsync(this IPipelineReader input, IPipelineWriter } } - private static async Task ReadAsyncAwaited(this IPipelineReader input, Span destination) + private static async Task ReadAsyncAwaited(this IPipeReader input, Span destination) { while (true) { diff --git a/src/System.IO.Pipelines/PipelineConnectionStream.cs b/src/System.IO.Pipelines/PipelineConnectionStream.cs index 12e6b8ccc72..5dd70a04e2d 100644 --- a/src/System.IO.Pipelines/PipelineConnectionStream.cs +++ b/src/System.IO.Pipelines/PipelineConnectionStream.cs @@ -13,9 +13,9 @@ public class PipelineConnectionStream : Stream private readonly static Task _initialCachedTask = Task.FromResult(0); private Task _cachedTask = _initialCachedTask; - private readonly IPipelineConnection _connection; + private readonly IPipeConnection _connection; - public PipelineConnectionStream(IPipelineConnection connection) + public PipelineConnectionStream(IPipeConnection connection) { _connection = connection; } diff --git a/src/System.IO.Pipelines/ReadResult.cs b/src/System.IO.Pipelines/ReadResult.cs index f3c2e06e33a..75b74f79c6a 100644 --- a/src/System.IO.Pipelines/ReadResult.cs +++ b/src/System.IO.Pipelines/ReadResult.cs @@ -4,7 +4,7 @@ namespace System.IO.Pipelines { /// - /// The result of a call. + /// The result of a call. /// public struct ReadResult { @@ -26,7 +26,7 @@ public ReadResult(ReadableBuffer buffer, bool isCancelled, bool isCompleted) public bool IsCancelled { get; } /// - /// True if the is complete + /// True if the is complete /// public bool IsCompleted { get; } } diff --git a/src/System.IO.Pipelines/ReadableBuffer.cs b/src/System.IO.Pipelines/ReadableBuffer.cs index 321bee0f0e6..4a31c55a497 100644 --- a/src/System.IO.Pipelines/ReadableBuffer.cs +++ b/src/System.IO.Pipelines/ReadableBuffer.cs @@ -306,7 +306,7 @@ public int Peek() } /// - /// This transfers ownership of the buffer from the to the caller of this method. Preserved buffers must be disposed to avoid + /// This transfers ownership of the buffer from the to the caller of this method. Preserved buffers must be disposed to avoid /// memory leaks. /// public PreservedBuffer Preserve() diff --git a/src/System.IO.Pipelines/StreamExtensions.cs b/src/System.IO.Pipelines/StreamExtensions.cs index 771c168a0f3..3594eb4b271 100644 --- a/src/System.IO.Pipelines/StreamExtensions.cs +++ b/src/System.IO.Pipelines/StreamExtensions.cs @@ -11,22 +11,22 @@ namespace System.IO.Pipelines public static class StreamExtensions { /// - /// Adapts a into a . + /// Adapts a into a . /// /// /// - public static IPipelineWriter AsPipelineWriter(this Stream stream) + public static IPipeWriter AsPipelineWriter(this Stream stream) { - return (stream as IPipelineWriter) ?? stream.AsPipelineWriter(ArrayBufferPool.Instance); + return (stream as IPipeWriter) ?? stream.AsPipelineWriter(ArrayBufferPool.Instance); } /// - /// Adapts a into a . + /// Adapts a into a . /// /// /// /// - public static IPipelineWriter AsPipelineWriter(this Stream stream, IBufferPool pool) + public static IPipeWriter AsPipelineWriter(this Stream stream, IBufferPool pool) { var pipe = new Pipe(pool); pipe.CopyToAsync(stream).ContinueWith((task, state) => @@ -35,11 +35,11 @@ public static IPipelineWriter AsPipelineWriter(this Stream stream, IBufferPool p if (task.IsFaulted) { - innerPipe.CompleteReader(task.Exception.InnerException); + innerPipe.Reader.Complete(task.Exception.InnerException); } else { - innerPipe.CompleteReader(); + innerPipe.Reader.Complete(); } }, pipe, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); @@ -48,23 +48,23 @@ public static IPipelineWriter AsPipelineWriter(this Stream stream, IBufferPool p } /// - /// Adapts a into a . + /// Adapts a into a . /// /// /// - public static IPipelineReader AsPipelineReader(this Stream stream) => AsPipelineReader(stream, CancellationToken.None); + public static IPipeReader AsPipelineReader(this Stream stream) => AsPipelineReader(stream, CancellationToken.None); /// - /// Adapts a into a . + /// Adapts a into a . /// /// /// /// - public static IPipelineReader AsPipelineReader(this Stream stream, CancellationToken cancellationToken) + public static IPipeReader AsPipelineReader(this Stream stream, CancellationToken cancellationToken) { - if (stream is IPipelineReader) + if (stream is IPipeReader) { - return (IPipelineReader)stream; + return (IPipeReader)stream; } var streamAdaptor = new UnownedBufferStream(stream); @@ -73,12 +73,12 @@ public static IPipelineReader AsPipelineReader(this Stream stream, CancellationT } /// - /// Copies the content of a into a . + /// Copies the content of a into a . /// /// /// /// - public static Task CopyToAsync(this Stream stream, IPipelineWriter writer) + public static Task CopyToAsync(this Stream stream, IPipeWriter writer) { return stream.CopyToAsync(new PipelineWriterStream(writer)); } @@ -88,7 +88,7 @@ private class UnownedBufferStream : Stream private readonly Stream _stream; private readonly UnownedBufferReader _reader; - public IPipelineReader Reader => _reader; + public IPipeReader Reader => _reader; public override bool CanRead => false; public override bool CanSeek => false; @@ -176,9 +176,9 @@ public override int Read(byte[] buffer, int offset, int count) private class PipelineWriterStream : Stream { - private IPipelineWriter _writer; + private IPipeWriter _writer; - public PipelineWriterStream(IPipelineWriter writer) + public PipelineWriterStream(IPipeWriter writer) { _writer = writer; } diff --git a/src/System.IO.Pipelines/StreamPipelineConnection.cs b/src/System.IO.Pipelines/StreamPipeConnection.cs similarity index 66% rename from src/System.IO.Pipelines/StreamPipelineConnection.cs rename to src/System.IO.Pipelines/StreamPipeConnection.cs index d1287d58228..58a2842931a 100644 --- a/src/System.IO.Pipelines/StreamPipelineConnection.cs +++ b/src/System.IO.Pipelines/StreamPipeConnection.cs @@ -6,17 +6,17 @@ namespace System.IO.Pipelines { - internal class StreamPipelineConnection : IPipelineConnection + internal class StreamPipeConnection : IPipeConnection { - public StreamPipelineConnection(PipelineFactory factory, Stream stream) + public StreamPipeConnection(PipeFactory factory, Stream stream) { Input = factory.CreateReader(stream); Output = factory.CreateWriter(stream); } - public IPipelineReader Input { get; } + public IPipeReader Input { get; } - public IPipelineWriter Output { get; } + public IPipeWriter Output { get; } public void Dispose() { diff --git a/src/System.IO.Pipelines/UnownedBufferReader.cs b/src/System.IO.Pipelines/UnownedBufferReader.cs index f2cf33d548a..277b906bb40 100644 --- a/src/System.IO.Pipelines/UnownedBufferReader.cs +++ b/src/System.IO.Pipelines/UnownedBufferReader.cs @@ -12,9 +12,9 @@ namespace System.IO.Pipelines { /// /// Works in buffers which it does not own, as opposed to using a . Designed - /// to allow Streams to be easily adapted to via + /// to allow Streams to be easily adapted to via /// - public class UnownedBufferReader : IPipelineReader, IReadableBufferAwaiter + public class UnownedBufferReader : IPipeReader, IReadableBufferAwaiter { private static readonly Action _awaitableIsCompleted = () => { }; private static readonly Action _awaitableIsNotCompleted = () => { }; @@ -45,7 +45,7 @@ public UnownedBufferReader() } /// - /// A that completes when the consumer starts consuming the . + /// A that completes when the consumer starts consuming the . /// public Task ReadingStarted => _startingReadingTcs.Task; @@ -181,7 +181,7 @@ private ReadableBuffer Read() } // Called by the READER - void IPipelineReader.Advance(ReadCursor consumed, ReadCursor examined) + void IPipeReader.Advance(ReadCursor consumed, ReadCursor examined) { BufferSegment returnStart = null; BufferSegment returnEnd = null; @@ -226,7 +226,7 @@ void IPipelineReader.Advance(ReadCursor consumed, ReadCursor examined) /// /// Optional Exception indicating a failure that's causing the pipeline to complete. // Called by the READER - void IPipelineReader.Complete(Exception exception) + void IPipeReader.Complete(Exception exception) { if (exception != null) { @@ -269,7 +269,7 @@ public void CompleteWriter(Exception exception = null) } /// - /// Cancel to currently pending call to without completing the . + /// Cancel to currently pending call to without completing the . /// public void CancelPendingRead() { @@ -279,7 +279,7 @@ public void CancelPendingRead() } /// - /// Asynchronously reads a sequence of bytes from the current . + /// Asynchronously reads a sequence of bytes from the current . /// /// A representing the asynchronous read operation. // Called by the READER diff --git a/src/System.IO.Pipelines/WritableBuffer.cs b/src/System.IO.Pipelines/WritableBuffer.cs index 5106216413d..a5995dadd23 100644 --- a/src/System.IO.Pipelines/WritableBuffer.cs +++ b/src/System.IO.Pipelines/WritableBuffer.cs @@ -77,7 +77,7 @@ public void Append(ReadableBuffer buffer) } /// - /// Moves forward the underlying 's write cursor but does not commit the data. + /// Moves forward the underlying 's write cursor but does not commit the data. /// /// number of bytes to be marked as written. /// Forwards the start of available by . @@ -89,7 +89,7 @@ public void Advance(int bytesWritten) } /// - /// Commits all outstanding written data to the underlying so they can be read + /// Commits all outstanding written data to the underlying so they can be read /// and seals the so no more data can be committed. /// /// @@ -101,7 +101,7 @@ public void Commit() } /// - /// Signals the data is available. + /// Signals the data is available. /// Will if necessary. /// /// A task that completes when the data is fully flushed. diff --git a/tests/Benchmarks/Helpers/Connection.cs b/tests/Benchmarks/Helpers/Connection.cs index ed653ec1aab..c6cb6875d77 100644 --- a/tests/Benchmarks/Helpers/Connection.cs +++ b/tests/Benchmarks/Helpers/Connection.cs @@ -1,26 +1,26 @@ namespace System.IO.Pipelines { - public class PipelineConnection : IPipelineConnection + public class PipeConnection : IPipeConnection { - public PipelineConnection(PipelineFactory factory) + public PipeConnection(PipeFactory factory) { Input = factory.Create(); Output = factory.Create(); } - IPipelineReader IPipelineConnection.Input => Input; - IPipelineWriter IPipelineConnection.Output => Output; + IPipeReader IPipeConnection.Input => Input.Reader; + IPipeWriter IPipeConnection.Output => Output.Writer; - public Pipe Input { get; } + public IPipe Input { get; } - public Pipe Output { get; } + public IPipe Output { get; } public void Dispose() { - Input.CompleteReader(); - Input.CompleteWriter(); - Output.CompleteReader(); - Output.CompleteWriter(); + Input.Reader.Complete(); + Input.Writer.Complete(); + Output.Reader.Complete(); + Output.Writer.Complete(); } } } \ No newline at end of file diff --git a/tests/Benchmarks/Helpers/Listener.cs b/tests/Benchmarks/Helpers/Listener.cs index e6a7d91ded9..427945aa829 100644 --- a/tests/Benchmarks/Helpers/Listener.cs +++ b/tests/Benchmarks/Helpers/Listener.cs @@ -5,18 +5,18 @@ namespace System.IO.Pipelines.Samples { public class FakeListener { - private readonly List _connections = new List(); + private readonly List _connections = new List(); private Task[] _connectionTasks; - public FakeListener(PipelineFactory factory, int concurrentConnections) + public FakeListener(PipeFactory factory, int concurrentConnections) { for (int i = 0; i < concurrentConnections; i++) { - _connections.Add(new PipelineConnection(factory)); + _connections.Add(new PipeConnection(factory)); } } - public void OnConnection(Func callback) + public void OnConnection(Func callback) { _connectionTasks = new Task[_connections.Count]; for (int i = 0; i < _connections.Count; i++) @@ -31,7 +31,7 @@ public Task ExecuteRequestAsync(byte[] request) for (int i = 0; i < _connections.Count; i++) { var connection = _connections[i]; - tasks[i] = connection.Input.WriteAsync(request); + tasks[i] = connection.Input.Writer.WriteAsync(request); } return Task.WhenAll(tasks); @@ -41,7 +41,7 @@ public void Dispose() { foreach (var c in _connections) { - c.Input.CompleteWriter(); + c.Input.Writer.Complete(); } Task.WaitAll(_connectionTasks); diff --git a/tests/Benchmarks/Helpers/Server.cs b/tests/Benchmarks/Helpers/Server.cs index ad994e1348f..951d2f71650 100644 --- a/tests/Benchmarks/Helpers/Server.cs +++ b/tests/Benchmarks/Helpers/Server.cs @@ -16,7 +16,7 @@ public static class RawInMemoryHttpServer { public static void RunSingleSegmentParser(int numberOfRequests, int concurrentConnections, byte[] requestPayload, Action writeResponse) { - var factory = new PipelineFactory(); + var factory = new PipeFactory(); var listener = new FakeListener(factory, concurrentConnections); listener.OnConnection(async connection => @@ -76,7 +76,7 @@ public static void RunSingleSegmentParser(int numberOfRequests, int concurrentCo public static void Run(int numberOfRequests, int concurrentConnections, byte[] requestPayload, Action writeResponse) { - var factory = new PipelineFactory(); + var factory = new PipeFactory(); var listener = new FakeListener(factory, concurrentConnections); listener.OnConnection(async connection => { diff --git a/tests/System.Buffers.Experimental.Tests/BasicUnitTests.cs b/tests/System.Buffers.Experimental.Tests/BasicUnitTests.cs index 645059aae58..a62af5679e3 100644 --- a/tests/System.Buffers.Experimental.Tests/BasicUnitTests.cs +++ b/tests/System.Buffers.Experimental.Tests/BasicUnitTests.cs @@ -6,7 +6,7 @@ namespace System.Buffers.Tests { public class NativeBufferPoolTests { - [Fact] + [Fact(Skip = "ReadOnlyBytesTests are flaky")] public void BasicsWork() { var pool = NativeBufferPool.Shared; var buffer = pool.Rent(10); diff --git a/tests/System.Buffers.Experimental.Tests/MemoryTests.cs b/tests/System.Buffers.Experimental.Tests/MemoryTests.cs index 48b7f9fde59..fe25a4f1a88 100644 --- a/tests/System.Buffers.Experimental.Tests/MemoryTests.cs +++ b/tests/System.Buffers.Experimental.Tests/MemoryTests.cs @@ -7,7 +7,7 @@ namespace System.Slices.Tests { public class MemoryTests { - [Fact] + [Fact(Skip = "ReadOnlyBytesTests are flaky")] public void SimpleTestS() { using(var owned = new OwnedNativeMemory(1024)) { @@ -43,7 +43,7 @@ public void SimpleTestS() } } - [Fact] + [Fact(Skip = "ReadOnlyBytesTests are flaky")] public void NativeMemoryLifetime() { Memory copyStoredForLater; @@ -81,7 +81,7 @@ public void NativeMemoryLifetime() }); } - [Fact] + [Fact(Skip = "ReadOnlyBytesTests are flaky")] public unsafe void PinnedArrayMemoryLifetime() { Memory copyStoredForLater; diff --git a/tests/System.IO.Pipelines.Tests/BackpressureTests.cs b/tests/System.IO.Pipelines.Tests/BackpressureTests.cs index d290fcc7b98..dbe9a2dcad8 100644 --- a/tests/System.IO.Pipelines.Tests/BackpressureTests.cs +++ b/tests/System.IO.Pipelines.Tests/BackpressureTests.cs @@ -8,14 +8,14 @@ namespace System.IO.Pipelines.Tests { public class BackpressureTests : IDisposable { - private PipelineFactory _pipelineFactory; + private PipeFactory _pipeFactory; - private Pipe _pipe; + private IPipe _pipe; public BackpressureTests() { - _pipelineFactory = new PipelineFactory(); - _pipe = _pipelineFactory.Create(new PipeOptions + _pipeFactory = new PipeFactory(); + _pipe = _pipeFactory.Create(new PipeOptions { MaximumSizeLow = 32, MaximumSizeHigh = 64 @@ -24,15 +24,15 @@ public BackpressureTests() public void Dispose() { - _pipe.CompleteWriter(); - _pipe.CompleteReader(); - _pipelineFactory?.Dispose(); + _pipe.Writer.Complete(); + _pipe.Reader.Complete(); + _pipeFactory?.Dispose(); } [Fact] public void FlushAsyncReturnsCompletedTaskWhenSizeLessThenLimit() { - var writableBuffer = _pipe.Alloc(32); + var writableBuffer = _pipe.Writer.Alloc(32); writableBuffer.Advance(32); var flushAsync = writableBuffer.FlushAsync(); Assert.True(flushAsync.IsCompleted); @@ -42,7 +42,7 @@ public void FlushAsyncReturnsCompletedTaskWhenSizeLessThenLimit() [Fact] public void FlushAsyncReturnsNonCompletedSizeWhenCommitOverTheLimit() { - var writableBuffer = _pipe.Alloc(64); + var writableBuffer = _pipe.Writer.Alloc(64); writableBuffer.Advance(64); var flushAsync = writableBuffer.FlushAsync(); Assert.False(flushAsync.IsCompleted); @@ -51,15 +51,15 @@ public void FlushAsyncReturnsNonCompletedSizeWhenCommitOverTheLimit() [Fact] public void FlushAsyncAwaitableCompletesWhenReaderAdvancesUnderLow() { - var writableBuffer = _pipe.Alloc(64); + var writableBuffer = _pipe.Writer.Alloc(64); writableBuffer.Advance(64); var flushAsync = writableBuffer.FlushAsync(); Assert.False(flushAsync.IsCompleted); - var result = _pipe.ReadAsync().GetAwaiter().GetResult(); + var result = _pipe.Reader.ReadAsync().GetAwaiter().GetResult(); var consumed = result.Buffer.Move(result.Buffer.Start, 33); - _pipe.AdvanceReader(consumed, consumed); + _pipe.Reader.Advance(consumed, consumed); Assert.True(flushAsync.IsCompleted); Assert.True(flushAsync.GetResult()); @@ -67,16 +67,16 @@ public void FlushAsyncAwaitableCompletesWhenReaderAdvancesUnderLow() [Fact] public void FlushAsyncAwaitableDoesNotCompletesWhenReaderAdvancesUnderHight() - { - var writableBuffer = _pipe.Alloc(64); + { + var writableBuffer = _pipe.Writer.Alloc(64); writableBuffer.Advance(64); var flushAsync = writableBuffer.FlushAsync(); Assert.False(flushAsync.IsCompleted); - var result = _pipe.ReadAsync().GetAwaiter().GetResult(); + var result = _pipe.Reader.ReadAsync().GetAwaiter().GetResult(); var consumed = result.Buffer.Move(result.Buffer.Start, 32); - _pipe.AdvanceReader(consumed, consumed); + _pipe.Reader.Advance(consumed, consumed); Assert.False(flushAsync.IsCompleted); } @@ -84,9 +84,9 @@ public void FlushAsyncAwaitableDoesNotCompletesWhenReaderAdvancesUnderHight() [Fact] public async Task FlushAsyncThrowsIfReaderCompletedWithException() { - _pipe.CompleteReader(new InvalidOperationException("Reader failed")); + _pipe.Reader.Complete(new InvalidOperationException("Reader failed")); - var writableBuffer = _pipe.Alloc(64); + var writableBuffer = _pipe.Writer.Alloc(64); writableBuffer.Advance(64); var invalidOperationException = await Assert.ThrowsAsync(async () => await writableBuffer.FlushAsync()); Assert.Equal("Reader failed", invalidOperationException.Message); @@ -97,13 +97,13 @@ public async Task FlushAsyncThrowsIfReaderCompletedWithException() [Fact] public void FlushAsyncReturnsFalseIfReaderCompletes() { - var writableBuffer = _pipe.Alloc(64); + var writableBuffer = _pipe.Writer.Alloc(64); writableBuffer.Advance(64); var flushAsync = writableBuffer.FlushAsync(); Assert.False(flushAsync.IsCompleted); - _pipe.CompleteReader(); + _pipe.Reader.Complete(); Assert.True(flushAsync.IsCompleted); Assert.False(flushAsync.GetResult()); @@ -112,20 +112,20 @@ public void FlushAsyncReturnsFalseIfReaderCompletes() [Fact] public void FlushAsyncAwaitableResetsOnCommit() { - var writableBuffer = _pipe.Alloc(64); + var writableBuffer = _pipe.Writer.Alloc(64); writableBuffer.Advance(64); var flushAsync = writableBuffer.FlushAsync(); Assert.False(flushAsync.IsCompleted); - var result = _pipe.ReadAsync().GetAwaiter().GetResult(); + var result = _pipe.Reader.ReadAsync().GetAwaiter().GetResult(); var consumed = result.Buffer.Move(result.Buffer.Start, 33); - _pipe.AdvanceReader(consumed, consumed); + _pipe.Reader.Advance(consumed, consumed); Assert.True(flushAsync.IsCompleted); Assert.True(flushAsync.GetResult()); - writableBuffer = _pipe.Alloc(64); + writableBuffer = _pipe.Writer.Alloc(64); writableBuffer.Advance(64); flushAsync = writableBuffer.FlushAsync(); diff --git a/tests/System.IO.Pipelines.Tests/PipeLengthTests.cs b/tests/System.IO.Pipelines.Tests/PipeLengthTests.cs index 05860c14d55..0384098c71b 100644 --- a/tests/System.IO.Pipelines.Tests/PipeLengthTests.cs +++ b/tests/System.IO.Pipelines.Tests/PipeLengthTests.cs @@ -8,27 +8,27 @@ namespace System.IO.Pipelines.Tests { public class PipeLengthTests : IDisposable { - private PipelineFactory _pipelineFactory; + private PipeFactory _pipeFactory; private Pipe _pipe; public PipeLengthTests() { - _pipelineFactory = new PipelineFactory(); - _pipe = _pipelineFactory.Create(); + _pipeFactory = new PipeFactory(); + _pipe = (Pipe)_pipeFactory.Create(); } public void Dispose() { - _pipe.CompleteWriter(); - _pipe.CompleteReader(); - _pipelineFactory?.Dispose(); + _pipe.Writer.Complete(); + _pipe.Reader.Complete(); + _pipeFactory?.Dispose(); } [Fact] public void LengthCorrectAfterAllocAdvanceCommit() { - var writableBuffer = _pipe.Alloc(100); + var writableBuffer = _pipe.Writer.Alloc(100); writableBuffer.Advance(10); writableBuffer.Commit(); @@ -38,7 +38,7 @@ public void LengthCorrectAfterAllocAdvanceCommit() [Fact] public void LengthCorrectAfterAlloc0AdvanceCommit() { - var writableBuffer = _pipe.Alloc(); + var writableBuffer = _pipe.Writer.Alloc(); writableBuffer.Ensure(10); writableBuffer.Advance(10); writableBuffer.Commit(); @@ -49,7 +49,7 @@ public void LengthCorrectAfterAlloc0AdvanceCommit() [Fact] public void LengthIncreasesAfterAppend() { - var writableBuffer = _pipe.Alloc(); + var writableBuffer = _pipe.Writer.Alloc(); writableBuffer.Append(BufferUtilities.CreateBuffer(1, 2, 3)); Assert.Equal(0, _pipe.Length); writableBuffer.Commit(); @@ -60,7 +60,7 @@ public void LengthIncreasesAfterAppend() [Fact] public void LengthIncreasesAfterAdvanceAndAppend() { - var writableBuffer = _pipe.Alloc(10); + var writableBuffer = _pipe.Writer.Alloc(10); writableBuffer.Advance(4); writableBuffer.Append(BufferUtilities.CreateBuffer(1, 2, 3)); Assert.Equal(0, _pipe.Length); @@ -72,14 +72,14 @@ public void LengthIncreasesAfterAdvanceAndAppend() [Fact] public void LengthDecreasedAfterReadAdvanceConsume() { - var writableBuffer = _pipe.Alloc(100); + var writableBuffer = _pipe.Writer.Alloc(100); writableBuffer.Advance(10); writableBuffer.Commit(); writableBuffer.FlushAsync(); - var result = _pipe.ReadAsync().GetResult(); + var result = _pipe.Reader.ReadAsync().GetResult(); var consumed = result.Buffer.Slice(5).Start; - _pipe.AdvanceReader(consumed, consumed); + _pipe.Reader.Advance(consumed, consumed); Assert.Equal(5, _pipe.Length); } @@ -87,14 +87,13 @@ public void LengthDecreasedAfterReadAdvanceConsume() [Fact] public void LengthNotChangeAfterReadAdvanceExamine() { - var writableBuffer = _pipe.Alloc(100); + var writableBuffer = _pipe.Writer.Alloc(100); writableBuffer.Advance(10); writableBuffer.Commit(); writableBuffer.FlushAsync(); - var result = _pipe.ReadAsync().GetResult(); - var consumed = result.Buffer.Slice(5).Start; - _pipe.AdvanceReader(result.Buffer.Start, result.Buffer.End); + var result = _pipe.Reader.ReadAsync().GetResult(); + _pipe.Reader.Advance(result.Buffer.Start, result.Buffer.End); Assert.Equal(10, _pipe.Length); } @@ -105,7 +104,7 @@ public void ByteByByteTest() WritableBuffer writableBuffer; for (int i = 1; i <= 1024 * 1024; i++) { - writableBuffer = _pipe.Alloc(100); + writableBuffer = _pipe.Writer.Alloc(100); writableBuffer.Advance(1); writableBuffer.Commit(); @@ -115,9 +114,9 @@ public void ByteByByteTest() for (int i = 1024 * 1024 - 1; i >= 0; i--) { - var result = _pipe.ReadAsync().GetResult(); + var result = _pipe.Reader.ReadAsync().GetResult(); var consumed = result.Buffer.Slice(1).Start; - _pipe.AdvanceReader(consumed, consumed); + _pipe.Reader.Advance(consumed, consumed); Assert.Equal(i, _pipe.Length); } diff --git a/tests/System.IO.Pipelines.Tests/PipelineReaderWriterFacts.cs b/tests/System.IO.Pipelines.Tests/PipelineReaderWriterFacts.cs index 0b7b98b12ed..fe4f658cf13 100644 --- a/tests/System.IO.Pipelines.Tests/PipelineReaderWriterFacts.cs +++ b/tests/System.IO.Pipelines.Tests/PipelineReaderWriterFacts.cs @@ -13,27 +13,27 @@ namespace System.IO.Pipelines.Tests { public class PipelineReaderWriterFacts : IDisposable { - private Pipe _pipe; - private PipelineFactory _pipelineFactory; + private IPipe _pipe; + private PipeFactory _pipeFactory; public PipelineReaderWriterFacts() { - _pipelineFactory = new PipelineFactory(); - _pipe = _pipelineFactory.Create(); + _pipeFactory = new PipeFactory(); + _pipe = _pipeFactory.Create(); } public void Dispose() { - _pipe.CompleteWriter(); - _pipe.CompleteReader(); - _pipelineFactory?.Dispose(); + _pipe.Writer.Complete(); + _pipe.Reader.Complete(); + _pipeFactory?.Dispose(); } [Fact] public async Task ReaderShouldNotGetUnflushedBytesWhenOverflowingSegments() { // Fill the block with stuff leaving 5 bytes at the end - var buffer = _pipe.Alloc(1); + var buffer = _pipe.Writer.Alloc(1); var len = buffer.Memory.Length; // Fill the buffer with garbage @@ -44,7 +44,7 @@ public async Task ReaderShouldNotGetUnflushedBytesWhenOverflowingSegments() await buffer.FlushAsync(); // Write 10 and flush - buffer = _pipe.Alloc(); + buffer = _pipe.Writer.Alloc(); buffer.WriteLittleEndian(10); // Write 9 @@ -54,107 +54,107 @@ public async Task ReaderShouldNotGetUnflushedBytesWhenOverflowingSegments() buffer.WriteLittleEndian(8); // Make sure we don't see it yet - var result = await _pipe.ReadAsync(); + var result = await _pipe.Reader.ReadAsync(); var reader = result.Buffer; Assert.Equal(len - 5, reader.Length); // Don't move - _pipe.Advance(reader.End); + _pipe.Reader.Advance(reader.End); // Now flush await buffer.FlushAsync(); - reader = (await _pipe.ReadAsync()).Buffer; + reader = (await _pipe.Reader.ReadAsync()).Buffer; Assert.Equal(12, reader.Length); Assert.Equal(10, reader.ReadLittleEndian()); Assert.Equal(9, reader.Slice(4).ReadLittleEndian()); Assert.Equal(8, reader.Slice(8).ReadLittleEndian()); - _pipe.AdvanceReader(reader.Start, reader.Start); + _pipe.Reader.Advance(reader.Start, reader.Start); } [Fact] public async Task ReaderShouldNotGetUnflushedBytes() { // Write 10 and flush - var buffer = _pipe.Alloc(); + var buffer = _pipe.Writer.Alloc(); buffer.WriteLittleEndian(10); await buffer.FlushAsync(); // Write 9 - buffer = _pipe.Alloc(); + buffer = _pipe.Writer.Alloc(); buffer.WriteLittleEndian(9); // Write 8 buffer.WriteLittleEndian(8); // Make sure we don't see it yet - var result = await _pipe.ReadAsync(); + var result = await _pipe.Reader.ReadAsync(); var reader = result.Buffer; Assert.Equal(4, reader.Length); Assert.Equal(10, reader.ReadLittleEndian()); // Don't move - _pipe.Advance(reader.Start); + _pipe.Reader.Advance(reader.Start); // Now flush await buffer.FlushAsync(); - reader = (await _pipe.ReadAsync()).Buffer; + reader = (await _pipe.Reader.ReadAsync()).Buffer; Assert.Equal(12, reader.Length); Assert.Equal(10, reader.ReadLittleEndian()); Assert.Equal(9, reader.Slice(4).ReadLittleEndian()); Assert.Equal(8, reader.Slice(8).ReadLittleEndian()); - _pipe.AdvanceReader(reader.Start, reader.Start); + _pipe.Reader.Advance(reader.Start, reader.Start); } [Fact] public async Task ReaderShouldNotGetUnflushedBytesWithAppend() { // Write 10 and flush - var buffer = _pipe.Alloc(); + var buffer = _pipe.Writer.Alloc(); buffer.WriteLittleEndian(10); await buffer.FlushAsync(); // Write Hello to another pipeline and get the buffer var bytes = Encoding.ASCII.GetBytes("Hello"); - var c2 = _pipelineFactory.Create(); - await c2.WriteAsync(bytes); - var result = await c2.ReadAsync(); + var c2 = _pipeFactory.Create(); + await c2.Writer.WriteAsync(bytes); + var result = await c2.Reader.ReadAsync(); var c2Buffer = result.Buffer; Assert.Equal(bytes.Length, c2Buffer.Length); // Write 9 to the buffer - buffer = _pipe.Alloc(); + buffer = _pipe.Writer.Alloc(); buffer.WriteLittleEndian(9); // Append the data from the other pipeline buffer.Append(c2Buffer); // Mark it as consumed - c2.Advance(c2Buffer.End); + c2.Reader.Advance(c2Buffer.End); // Now read and make sure we only see the comitted data - result = await _pipe.ReadAsync(); + result = await _pipe.Reader.ReadAsync(); var reader = result.Buffer; Assert.Equal(4, reader.Length); Assert.Equal(10, reader.ReadLittleEndian()); // Consume nothing - _pipe.Advance(reader.Start); + _pipe.Reader.Advance(reader.Start); // Flush the second set of writes await buffer.FlushAsync(); - reader = (await _pipe.ReadAsync()).Buffer; + reader = (await _pipe.Reader.ReadAsync()).Buffer; // int, int, "Hello" Assert.Equal(13, reader.Length); @@ -162,7 +162,7 @@ public async Task ReaderShouldNotGetUnflushedBytesWithAppend() Assert.Equal(9, reader.Slice(4).ReadLittleEndian()); Assert.Equal("Hello", reader.Slice(8).GetUtf8String()); - _pipe.AdvanceReader(reader.Start, reader.Start); + _pipe.Reader.Advance(reader.Start, reader.Start); } [Fact] @@ -170,8 +170,8 @@ public async Task WritingDataMakesDataReadableViaPipeline() { var bytes = Encoding.ASCII.GetBytes("Hello World"); - await _pipe.WriteAsync(bytes); - var result = await _pipe.ReadAsync(); + await _pipe.Writer.WriteAsync(bytes); + var result = await _pipe.Reader.ReadAsync(); var buffer = result.Buffer; Assert.Equal(11, buffer.Length); @@ -180,7 +180,7 @@ public async Task WritingDataMakesDataReadableViaPipeline() buffer.First.Span.CopyTo(array); Assert.Equal("Hello World", Encoding.ASCII.GetString(array)); - _pipe.AdvanceReader(buffer.Start, buffer.Start); + _pipe.Reader.Advance(buffer.Start, buffer.Start); } [Fact] @@ -188,8 +188,8 @@ public async Task AdvanceEmptyBufferAfterWritingResetsAwaitable() { var bytes = Encoding.ASCII.GetBytes("Hello World"); - await _pipe.WriteAsync(bytes); - var result = await _pipe.ReadAsync(); + await _pipe.Writer.WriteAsync(bytes); + var result = await _pipe.Reader.ReadAsync(); var buffer = result.Buffer; Assert.Equal(11, buffer.Length); @@ -198,53 +198,53 @@ public async Task AdvanceEmptyBufferAfterWritingResetsAwaitable() buffer.First.Span.CopyTo(array); Assert.Equal("Hello World", Encoding.ASCII.GetString(array)); - _pipe.Advance(buffer.End); + _pipe.Reader.Advance(buffer.End); // Now write 0 and advance 0 - await _pipe.WriteAsync(Span.Empty); - result = await _pipe.ReadAsync(); - _pipe.Advance(result.Buffer.End); + await _pipe.Writer.WriteAsync(Span.Empty); + result = await _pipe.Reader.ReadAsync(); + _pipe.Reader.Advance(result.Buffer.End); - var awaitable = _pipe.ReadAsync(); + var awaitable = _pipe.Reader.ReadAsync(); Assert.False(awaitable.IsCompleted); } [Fact] public async Task AdvanceShouldResetStateIfReadCancelled() { - _pipe.CancelPendingRead(); + _pipe.Reader.CancelPendingRead(); - var result = await _pipe.ReadAsync(); + var result = await _pipe.Reader.ReadAsync(); var buffer = result.Buffer; - _pipe.Advance(buffer.End); + _pipe.Reader.Advance(buffer.End); Assert.False(result.IsCompleted); Assert.True(result.IsCancelled); Assert.True(buffer.IsEmpty); - var awaitable = _pipe.ReadAsync(); + var awaitable = _pipe.Reader.ReadAsync(); Assert.False(awaitable.IsCompleted); } [Fact] public async Task CancellingPendingReadBeforeReadAsync() { - _pipe.CancelPendingRead(); + _pipe.Reader.CancelPendingRead(); - var result = await _pipe.ReadAsync(); + var result = await _pipe.Reader.ReadAsync(); var buffer = result.Buffer; - _pipe.Advance(buffer.End); + _pipe.Reader.Advance(buffer.End); Assert.False(result.IsCompleted); Assert.True(result.IsCancelled); Assert.True(buffer.IsEmpty); var bytes = Encoding.ASCII.GetBytes("Hello World"); - var output = _pipe.Alloc(); + var output = _pipe.Writer.Alloc(); output.Write(bytes); await output.FlushAsync(); - result = await _pipe.ReadAsync(); + result = await _pipe.Reader.ReadAsync(); buffer = result.Buffer; Assert.Equal(11, buffer.Length); @@ -254,18 +254,18 @@ public async Task CancellingPendingReadBeforeReadAsync() buffer.First.Span.CopyTo(array); Assert.Equal("Hello World", Encoding.ASCII.GetString(array)); - _pipe.AdvanceReader(buffer.Start, buffer.Start); + _pipe.Reader.Advance(buffer.Start, buffer.Start); } [Fact] public async Task CancellingBeforeAdvance() { var bytes = Encoding.ASCII.GetBytes("Hello World"); - var output = _pipe.Alloc(); + var output = _pipe.Writer.Alloc(); output.Write(bytes); await output.FlushAsync(); - var result = await _pipe.ReadAsync(); + var result = await _pipe.Reader.ReadAsync(); var buffer = result.Buffer; Assert.Equal(11, buffer.Length); @@ -275,11 +275,11 @@ public async Task CancellingBeforeAdvance() buffer.First.Span.CopyTo(array); Assert.Equal("Hello World", Encoding.ASCII.GetString(array)); - _pipe.CancelPendingRead(); + _pipe.Reader.CancelPendingRead(); - _pipe.Advance(buffer.End); + _pipe.Reader.Advance(buffer.End); - var awaitable = _pipe.ReadAsync(); + var awaitable = _pipe.Reader.ReadAsync(); Assert.True(awaitable.IsCompleted); @@ -287,21 +287,21 @@ public async Task CancellingBeforeAdvance() Assert.True(result.IsCancelled); - _pipe.AdvanceReader(buffer.Start, buffer.Start); + _pipe.Reader.Advance(buffer.Start, buffer.Start); } [Fact] public async Task CancellingPendingAfterReadAsync() { var bytes = Encoding.ASCII.GetBytes("Hello World"); - var output = _pipe.Alloc(); + var output = _pipe.Writer.Alloc(); output.Write(bytes); var task = Task.Run(async () => { - var result = await _pipe.ReadAsync(); + var result = await _pipe.Reader.ReadAsync(); var buffer = result.Buffer; - _pipe.Advance(buffer.End); + _pipe.Reader.Advance(buffer.End); Assert.False(result.IsCompleted); Assert.True(result.IsCancelled); @@ -309,7 +309,7 @@ public async Task CancellingPendingAfterReadAsync() await output.FlushAsync(); - result = await _pipe.ReadAsync(); + result = await _pipe.Reader.ReadAsync(); buffer = result.Buffer; Assert.Equal(11, buffer.Length); @@ -318,32 +318,32 @@ public async Task CancellingPendingAfterReadAsync() var array = new byte[11]; buffer.First.Span.CopyTo(array); Assert.Equal("Hello World", Encoding.ASCII.GetString(array)); - _pipe.AdvanceReader(result.Buffer.End, result.Buffer.End); + _pipe.Reader.Advance(result.Buffer.End, result.Buffer.End); - _pipe.CompleteReader(); + _pipe.Reader.Complete(); }); // Wait until reading starts to cancel the pending read await _pipe.ReadingStarted; - _pipe.CancelPendingRead(); + _pipe.Reader.CancelPendingRead(); await task; - _pipe.CompleteWriter(); + _pipe.Writer.Complete(); } [Fact] public async Task WriteAndCancellingPendingReadBeforeReadAsync() { var bytes = Encoding.ASCII.GetBytes("Hello World"); - var output = _pipe.Alloc(); + var output = _pipe.Writer.Alloc(); output.Write(bytes); await output.FlushAsync(); - _pipe.CancelPendingRead(); + _pipe.Reader.CancelPendingRead(); - var result = await _pipe.ReadAsync(); + var result = await _pipe.Reader.ReadAsync(); var buffer = result.Buffer; Assert.False(result.IsCompleted); @@ -354,7 +354,7 @@ public async Task WriteAndCancellingPendingReadBeforeReadAsync() var array = new byte[11]; buffer.First.Span.CopyTo(array); Assert.Equal("Hello World", Encoding.ASCII.GetString(array)); - _pipe.AdvanceReader(buffer.End, buffer.End); + _pipe.Reader.Advance(buffer.End, buffer.End); } [Fact] @@ -363,7 +363,7 @@ public async Task ReadingCanBeCancelled() var cts = new CancellationTokenSource(); cts.Token.Register(() => { - _pipe.CompleteWriter(new OperationCanceledException(cts.Token)); + _pipe.Writer.Complete(new OperationCanceledException(cts.Token)); }); var ignore = Task.Run(async () => @@ -374,7 +374,7 @@ public async Task ReadingCanBeCancelled() await Assert.ThrowsAsync(async () => { - var result = await _pipe.ReadAsync(); + var result = await _pipe.Reader.ReadAsync(); var buffer = result.Buffer; }); } @@ -387,12 +387,12 @@ public async Task HelloWorldAcrossTwoBlocks() // [padding..hello] -> [ world ] var paddingBytes = Enumerable.Repeat((byte)'a', blockSize - 5).ToArray(); var bytes = Encoding.ASCII.GetBytes("Hello World"); - var writeBuffer = _pipe.Alloc(); + var writeBuffer = _pipe.Writer.Alloc(); writeBuffer.Write(paddingBytes); writeBuffer.Write(bytes); await writeBuffer.FlushAsync(); - var result = await _pipe.ReadAsync(); + var result = await _pipe.Reader.ReadAsync(); var buffer = result.Buffer; Assert.False(buffer.IsSingleSpan); var helloBuffer = buffer.Slice(blockSize - 5); @@ -403,7 +403,7 @@ public async Task HelloWorldAcrossTwoBlocks() memory.Add(m); } var spans = memory; - _pipe.AdvanceReader(buffer.Start, buffer.Start); + _pipe.Reader.Advance(buffer.Start, buffer.Start); Assert.Equal(2, memory.Count); var helloBytes = new byte[spans[0].Length]; @@ -419,15 +419,15 @@ public async Task IndexOfNotFoundReturnsEnd() { var bytes = Encoding.ASCII.GetBytes("Hello World"); - await _pipe.WriteAsync(bytes); - var result = await _pipe.ReadAsync(); + await _pipe.Writer.WriteAsync(bytes); + var result = await _pipe.Reader.ReadAsync(); var buffer = result.Buffer; ReadableBuffer slice; ReadCursor cursor; Assert.False(buffer.TrySliceTo(10, out slice, out cursor)); - _pipe.AdvanceReader(buffer.Start, buffer.Start); + _pipe.Reader.Advance(buffer.Start, buffer.Start); } [Fact] @@ -440,18 +440,18 @@ public async Task FastPathIndexOfAcrossBlocks() // [padding..hello] -> [ world ] var paddingBytes = Enumerable.Repeat((byte)'a', blockSize - 5).ToArray(); var bytes = Encoding.ASCII.GetBytes("Hello World"); - var writeBuffer = _pipe.Alloc(); + var writeBuffer = _pipe.Writer.Alloc(); writeBuffer.Write(paddingBytes); writeBuffer.Write(bytes); await writeBuffer.FlushAsync(); - var result = await _pipe.ReadAsync(); + var result = await _pipe.Reader.ReadAsync(); var buffer = result.Buffer; ReadableBuffer slice; ReadCursor cursor; Assert.False(buffer.TrySliceTo((byte)'R', out slice, out cursor)); - _pipe.AdvanceReader(buffer.Start, buffer.Start); + _pipe.Reader.Advance(buffer.Start, buffer.Start); } [Fact] @@ -462,12 +462,12 @@ public async Task SlowPathIndexOfAcrossBlocks() // [padding..hello] -> [ world ] var paddingBytes = Enumerable.Repeat((byte)'a', blockSize - 5).ToArray(); var bytes = Encoding.ASCII.GetBytes("Hello World"); - var writeBuffer = _pipe.Alloc(); + var writeBuffer = _pipe.Writer.Alloc(); writeBuffer.Write(paddingBytes); writeBuffer.Write(bytes); await writeBuffer.FlushAsync(); - var result = await _pipe.ReadAsync(); + var result = await _pipe.Reader.ReadAsync(); var buffer = result.Buffer; ReadableBuffer slice; ReadCursor cursor; @@ -479,19 +479,19 @@ public async Task SlowPathIndexOfAcrossBlocks() Assert.Equal("World", Encoding.ASCII.GetString(array)); - _pipe.AdvanceReader(buffer.Start, buffer.Start); + _pipe.Reader.Advance(buffer.Start, buffer.Start); } [Fact] public void AllocMoreThanPoolBlockSizeThrows() { - Assert.Throws(() => _pipe.Alloc(8192)); + Assert.Throws(() => _pipe.Writer.Alloc(8192)); } [Fact] public void ReadingStartedCompletesOnCompleteReader() { - _pipe.CompleteReader(); + _pipe.Reader.Complete(); Assert.True(_pipe.ReadingStarted.IsCompleted); } @@ -499,7 +499,7 @@ public void ReadingStartedCompletesOnCompleteReader() [Fact] public void ReadingStartedCompletesOnCallToReadAsync() { - _pipe.ReadAsync(); + _pipe.Reader.ReadAsync(); Assert.True(_pipe.ReadingStarted.IsCompleted); } @@ -507,19 +507,17 @@ public void ReadingStartedCompletesOnCallToReadAsync() [Fact] public void ThrowsOnReadAfterCompleteReader() { - _pipe.CompleteReader(); + _pipe.Reader.Complete(); - Assert.Throws(() => _pipe.ReadAsync()); + Assert.Throws(() => _pipe.Reader.ReadAsync()); } [Fact] public void ThrowsOnAllocAfterCompleteWriter() { + _pipe.Writer.Complete(); - _pipe.CompleteWriter(); - - Assert.Throws(() => _pipe.Alloc()); - _pipe.Commit(); + Assert.Throws(() => _pipe.Writer.Alloc()); } [Fact] @@ -527,17 +525,17 @@ public async Task MultipleCompleteReaderWriterCauseDisposeOnlyOnce() { var pool = new DisposeTrackingOwnedMemory(new byte[1]); - using (var factory = new PipelineFactory(pool)) + using (var factory = new PipeFactory(pool)) { var readerWriter = factory.Create(); - await readerWriter.WriteAsync(new byte[] { 1 }); + await readerWriter.Writer.WriteAsync(new byte[] { 1 }); - readerWriter.CompleteWriter(); - readerWriter.CompleteReader(); + readerWriter.Writer.Complete(); + readerWriter.Reader.Complete(); Assert.Equal(1, pool.Disposed); - readerWriter.CompleteWriter(); - readerWriter.CompleteReader(); + readerWriter.Writer.Complete(); + readerWriter.Reader.Complete(); Assert.Equal(1, pool.Disposed); } } @@ -545,45 +543,45 @@ public async Task MultipleCompleteReaderWriterCauseDisposeOnlyOnce() [Fact] public async Task CompleteReaderThrowsIfReadInProgress() { - await _pipe.WriteAsync(new byte[1]); - var result = await _pipe.ReadAsync(); + await _pipe.Writer.WriteAsync(new byte[1]); + var result = await _pipe.Reader.ReadAsync(); var buffer = result.Buffer; - Assert.Throws(() => _pipe.CompleteReader()); + Assert.Throws(() => _pipe.Reader.Complete()); - _pipe.AdvanceReader(buffer.Start, buffer.Start); + _pipe.Reader.Advance(buffer.Start, buffer.Start); } [Fact] public void CompleteWriterThrowsIfWriteInProgress() { - _pipe.Alloc(); + var buffer = _pipe.Writer.Alloc(); - Assert.Throws(() => _pipe.CompleteWriter()); + Assert.Throws(() => _pipe.Writer.Complete()); - _pipe.Commit(); + buffer.Commit(); } [Fact] public async Task ReadAsync_ThrowsIfWriterCompletedWithException() { - _pipe.CompleteWriter(new InvalidOperationException("Writer exception")); + _pipe.Writer.Complete(new InvalidOperationException("Writer exception")); - var invalidOperationException = await Assert.ThrowsAsync(async () => await _pipe.ReadAsync()); + var invalidOperationException = await Assert.ThrowsAsync(async () => await _pipe.Reader.ReadAsync()); Assert.Equal("Writer exception", invalidOperationException.Message); - invalidOperationException = await Assert.ThrowsAsync(async () => await _pipe.ReadAsync()); + invalidOperationException = await Assert.ThrowsAsync(async () => await _pipe.Reader.ReadAsync()); Assert.Equal("Writer exception", invalidOperationException.Message); } [Fact] public void FlushAsync_ReturnsCompletedTaskWhenMaxSizeIfZero() { - var writableBuffer = _pipe.Alloc(1); + var writableBuffer = _pipe.Writer.Alloc(1); writableBuffer.Advance(1); var flushTask = writableBuffer.FlushAsync(); Assert.True(flushTask.IsCompleted); - writableBuffer = _pipe.Alloc(1); + writableBuffer = _pipe.Writer.Alloc(1); writableBuffer.Advance(1); flushTask = writableBuffer.FlushAsync(); Assert.True(flushTask.IsCompleted); diff --git a/tests/System.IO.Pipelines.Tests/PipelineWriterFacts.cs b/tests/System.IO.Pipelines.Tests/PipelineWriterFacts.cs index 954e1b23bc6..ad8ac01ee9e 100644 --- a/tests/System.IO.Pipelines.Tests/PipelineWriterFacts.cs +++ b/tests/System.IO.Pipelines.Tests/PipelineWriterFacts.cs @@ -105,7 +105,7 @@ public async Task StreamAsPipelineWriterUsesUnderlyingWriter() } } - private class MyCustomStream : Stream, IPipelineWriter + private class MyCustomStream : Stream, IPipeWriter { private readonly Pipe _pipe = new Pipe(ArrayBufferPool.Instance); @@ -138,12 +138,12 @@ public override long Position public WritableBuffer Alloc(int minimumSize = 0) { - return _pipe.Alloc(minimumSize); + return _pipe.Writer.Alloc(minimumSize); } public void Complete(Exception exception = null) { - _pipe.CompleteWriter(exception); + _pipe.Writer.Complete(exception); } public override void Flush() @@ -175,8 +175,8 @@ protected override void Dispose(bool disposing) { base.Dispose(disposing); - _pipe.CompleteReader(); - _pipe.CompleteWriter(); + _pipe.Reader.Complete(); + _pipe.Writer.Complete(); } } } diff --git a/tests/System.IO.Pipelines.Tests/Properties/AssemblyInfo.cs b/tests/System.IO.Pipelines.Tests/Properties/AssemblyInfo.cs index 10bd0e99562..b1d442f10c6 100644 --- a/tests/System.IO.Pipelines.Tests/Properties/AssemblyInfo.cs +++ b/tests/System.IO.Pipelines.Tests/Properties/AssemblyInfo.cs @@ -18,3 +18,5 @@ // The following GUID is for the ID of the typelib if this project is exposed to COM [assembly: Guid("b9967782-565b-4b0b-97b9-043e35022674")] + +[assembly: CollectionBehavior(DisableTestParallelization = true)] diff --git a/tests/System.IO.Pipelines.Tests/ReadableBufferFacts.cs b/tests/System.IO.Pipelines.Tests/ReadableBufferFacts.cs index 06361e9238c..8289ba3a51f 100644 --- a/tests/System.IO.Pipelines.Tests/ReadableBufferFacts.cs +++ b/tests/System.IO.Pipelines.Tests/ReadableBufferFacts.cs @@ -19,7 +19,7 @@ public class ReadableBufferFacts [Fact] public async Task TestIndexOfWorksForAllLocations() { - using (var factory = new PipelineFactory()) + using (var factory = new PipeFactory()) { var readerWriter = factory.Create(); const int Size = 5 * 4032; // multiple blocks @@ -28,7 +28,7 @@ public async Task TestIndexOfWorksForAllLocations() byte[] data = new byte[512]; for (int i = 0; i < data.Length; i++) data[i] = 42; int totalBytes = 0; - var writeBuffer = readerWriter.Alloc(); + var writeBuffer = readerWriter.Writer.Alloc(); for (int i = 0; i < Size / data.Length; i++) { writeBuffer.Write(data); @@ -37,7 +37,7 @@ public async Task TestIndexOfWorksForAllLocations() await writeBuffer.FlushAsync(); // now read it back - var result = await readerWriter.ReadAsync(); + var result = await readerWriter.Reader.ReadAsync(); var readBuffer = result.Buffer; Assert.False(readBuffer.IsSingleSpan); Assert.Equal(totalBytes, readBuffer.Length); @@ -48,7 +48,7 @@ public async Task TestIndexOfWorksForAllLocations() [Fact] public async Task EqualsDetectsDeltaForAllLocations() { - using (var factory = new PipelineFactory()) + using (var factory = new PipeFactory()) { var readerWriter = factory.Create(); @@ -58,12 +58,12 @@ public async Task EqualsDetectsDeltaForAllLocations() var rand = new Random(12345); rand.NextBytes(data); - var writeBuffer = readerWriter.Alloc(); + var writeBuffer = readerWriter.Writer.Alloc(); writeBuffer.Write(data); await writeBuffer.FlushAsync(); // now read it back - var result = await readerWriter.ReadAsync(); + var result = await readerWriter.Reader.ReadAsync(); var readBuffer = result.Buffer; Assert.False(readBuffer.IsSingleSpan); Assert.Equal(data.Length, readBuffer.Length); @@ -103,17 +103,17 @@ private void EqualsDetectsDeltaForAllLocations(ReadableBuffer slice, byte[] expe [Fact] public async Task GetUInt64GivesExpectedValues() { - using (var factory = new PipelineFactory()) + using (var factory = new PipeFactory()) { var readerWriter = factory.Create(); - var writeBuffer = readerWriter.Alloc(); + var writeBuffer = readerWriter.Writer.Alloc(); writeBuffer.Ensure(50); writeBuffer.Advance(50); // not even going to pretend to write data here - we're going to cheat await writeBuffer.FlushAsync(); // by overwriting the buffer in-situ // now read it back - var result = await readerWriter.ReadAsync(); + var result = await readerWriter.Reader.ReadAsync(); var readBuffer = result.Buffer; ReadUInt64GivesExpectedValues(ref readBuffer); @@ -128,16 +128,16 @@ public async Task GetUInt64GivesExpectedValues() [InlineData("\thell o ", "hell o ")] public async Task TrimStartTrimsWhitespaceAtStart(string input, string expected) { - using (var readerWriter = new PipelineFactory()) + using (var readerWriter = new PipeFactory()) { var connection = readerWriter.Create(); - var writeBuffer = connection.Alloc(); + var writeBuffer = connection.Writer.Alloc(); var bytes = Encoding.ASCII.GetBytes(input); writeBuffer.Write(bytes); await writeBuffer.FlushAsync(); - var result = await connection.ReadAsync(); + var result = await connection.Reader.ReadAsync(); var buffer = result.Buffer; var trimmed = buffer.TrimStart(); var outputBytes = trimmed.ToArray(); @@ -154,16 +154,16 @@ public async Task TrimStartTrimsWhitespaceAtStart(string input, string expected) [InlineData(" hell o\t", " hell o")] public async Task TrimEndTrimsWhitespaceAtEnd(string input, string expected) { - using (var factory = new PipelineFactory()) + using (var factory = new PipeFactory()) { var readerWriter = factory.Create(); - var writeBuffer = readerWriter.Alloc(); + var writeBuffer = readerWriter.Writer.Alloc(); var bytes = Encoding.ASCII.GetBytes(input); writeBuffer.Write(bytes); await writeBuffer.FlushAsync(); - var result = await readerWriter.ReadAsync(); + var result = await readerWriter.Reader.ReadAsync(); var buffer = result.Buffer; var trimmed = buffer.TrimEnd(); var outputBytes = trimmed.ToArray(); @@ -181,16 +181,16 @@ public async Task TrySliceToSpan(string input, string sliceTo, string expected) { var sliceToBytes = Encoding.UTF8.GetBytes(sliceTo); - using (var factory = new PipelineFactory()) + using (var factory = new PipeFactory()) { var readerWriter = factory.Create(); - var writeBuffer = readerWriter.Alloc(); + var writeBuffer = readerWriter.Writer.Alloc(); var bytes = Encoding.UTF8.GetBytes(input); writeBuffer.Write(bytes); await writeBuffer.FlushAsync(); - var result = await readerWriter.ReadAsync(); + var result = await readerWriter.Reader.ReadAsync(); var buffer = result.Buffer; ReadableBuffer slice; ReadCursor cursor; @@ -297,10 +297,10 @@ public async Task Split(string input, char delimiter) // note: different expectation to string.Split; empty has 0 outputs var expected = input == "" ? new string[0] : input.Split(delimiter); - using (var factory = new PipelineFactory()) + using (var factory = new PipeFactory()) { var readerWriter = factory.Create(); - var output = readerWriter.Alloc(); + var output = readerWriter.Writer.Alloc(); output.Append(input, EncodingData.InvariantUtf8); var readable = output.AsReadableBuffer(); @@ -335,10 +335,10 @@ public async Task ReadTWorksAgainstSimpleBuffers() byte[] chunk = { 0, 1, 2, 3, 4, 5, 6, 7 }; var span = new Span(chunk); - using (var factory = new PipelineFactory()) + using (var factory = new PipeFactory()) { var readerWriter = factory.Create(); - var output = readerWriter.Alloc(); + var output = readerWriter.Writer.Alloc(); output.Write(span); var readable = output.AsReadableBuffer(); Assert.True(readable.IsSingleSpan); @@ -359,10 +359,10 @@ public async Task ReadTWorksAgainstSimpleBuffers() [Fact] public async Task ReadTWorksAgainstMultipleBuffers() { - using (var factory = new PipelineFactory()) + using (var factory = new PipeFactory()) { var readerWriter = factory.Create(); - var output = readerWriter.Alloc(); + var output = readerWriter.Writer.Alloc(); // we're going to try to force 3 buffers for 8 bytes output.Write(new byte[] { 0, 1, 2 }); @@ -402,14 +402,14 @@ public async Task ReadTWorksAgainstMultipleBuffers() [Fact] public async Task CopyToAsync() { - using (var factory = new PipelineFactory()) + using (var factory = new PipeFactory()) { var readerWriter = factory.Create(); - var output = readerWriter.Alloc(); + var output = readerWriter.Writer.Alloc(); output.Append("Hello World", EncodingData.InvariantUtf8); await output.FlushAsync(); var ms = new MemoryStream(); - var result = await readerWriter.ReadAsync(); + var result = await readerWriter.Reader.ReadAsync(); var rb = result.Buffer; await rb.CopyToAsync(ms); ms.Position = 0; @@ -424,14 +424,14 @@ public async Task CopyToAsync() public async Task CopyToAsyncNativeMemory() { using (var pool = new NativePool()) - using (var factory = new PipelineFactory(pool)) + using (var factory = new PipeFactory(pool)) { var readerWriter = factory.Create(); - var output = readerWriter.Alloc(); + var output = readerWriter.Writer.Alloc(); output.Append("Hello World", EncodingData.InvariantUtf8); await output.FlushAsync(); var ms = new MemoryStream(); - var result = await readerWriter.ReadAsync(); + var result = await readerWriter.Reader.ReadAsync(); var rb = result.Buffer; await rb.CopyToAsync(ms); ms.Position = 0; @@ -484,10 +484,10 @@ public void CanUseArrayBasedReadableBuffers() [Fact] public void ReadableBufferSequenceWorks() { - using (var factory = new PipelineFactory()) + using (var factory = new PipeFactory()) { var readerWriter = factory.Create(); - var output = readerWriter.Alloc(); + var output = readerWriter.Writer.Alloc(); { // empty buffer diff --git a/tests/System.IO.Pipelines.Tests/ReadableBufferReaderFacts.cs b/tests/System.IO.Pipelines.Tests/ReadableBufferReaderFacts.cs index 2fcd75994df..f6c0dcdfacd 100644 --- a/tests/System.IO.Pipelines.Tests/ReadableBufferReaderFacts.cs +++ b/tests/System.IO.Pipelines.Tests/ReadableBufferReaderFacts.cs @@ -37,16 +37,16 @@ public void PeekReturnsMinuOneByteInTheEnd() [Fact] public async Task TakeTraversesSegments() { - using (var factory = new PipelineFactory()) + using (var factory = new PipeFactory()) { var readerWriter = factory.Create(); - var w = readerWriter.Alloc(); + var w = readerWriter.Writer.Alloc(); w.Append(ReadableBuffer.Create(new byte[] { 1 }, 0, 1)); w.Append(ReadableBuffer.Create(new byte[] { 2 }, 0, 1)); w.Append(ReadableBuffer.Create(new byte[] { 3 }, 0, 1)); await w.FlushAsync(); - var result = await readerWriter.ReadAsync(); + var result = await readerWriter.Reader.ReadAsync(); var buffer = result.Buffer; var reader = new ReadableBufferReader(buffer); @@ -60,15 +60,15 @@ public async Task TakeTraversesSegments() [Fact] public async Task PeekTraversesSegments() { - using (var factory = new PipelineFactory()) + using (var factory = new PipeFactory()) { var readerWriter = factory.Create(); - var w = readerWriter.Alloc(); + var w = readerWriter.Writer.Alloc(); w.Append(ReadableBuffer.Create(new byte[] { 1 }, 0, 1)); w.Append(ReadableBuffer.Create(new byte[] { 2 }, 0, 1)); await w.FlushAsync(); - var result = await readerWriter.ReadAsync(); + var result = await readerWriter.Reader.ReadAsync(); var buffer = result.Buffer; var reader = new ReadableBufferReader(buffer); @@ -83,15 +83,15 @@ public async Task PeekTraversesSegments() [Fact] public async Task PeekWorkesWithEmptySegments() { - using (var factory = new PipelineFactory()) + using (var factory = new PipeFactory()) { var readerWriter = factory.Create(); - var w = readerWriter.Alloc(); + var w = readerWriter.Writer.Alloc(); w.Append(ReadableBuffer.Create(new byte[] { 0 }, 0, 0)); w.Append(ReadableBuffer.Create(new byte[] { 1 }, 0, 1)); await w.FlushAsync(); - var result = await readerWriter.ReadAsync(); + var result = await readerWriter.Reader.ReadAsync(); var buffer = result.Buffer; var reader = new ReadableBufferReader(buffer); diff --git a/tests/System.IO.Pipelines.Tests/SchedulerFacts.cs b/tests/System.IO.Pipelines.Tests/SchedulerFacts.cs index 28a1f52b714..62894fcb593 100644 --- a/tests/System.IO.Pipelines.Tests/SchedulerFacts.cs +++ b/tests/System.IO.Pipelines.Tests/SchedulerFacts.cs @@ -13,7 +13,7 @@ public class SchedulerFacts [Fact] public async Task ReadAsyncCallbackRunsOnReaderScheduler() { - using (var factory = new PipelineFactory()) + using (var factory = new PipeFactory()) { using (var scheduler = new ThreadScheduler()) { @@ -26,20 +26,20 @@ public async Task ReadAsyncCallbackRunsOnReaderScheduler() { var oid = Thread.CurrentThread.ManagedThreadId; - var result = await pipe.ReadAsync(); + var result = await pipe.Reader.ReadAsync(); Assert.NotEqual(oid, Thread.CurrentThread.ManagedThreadId); Assert.Equal(Thread.CurrentThread.ManagedThreadId, scheduler.Thread.ManagedThreadId); - pipe.AdvanceReader(result.Buffer.End, result.Buffer.End); + pipe.Reader.Advance(result.Buffer.End, result.Buffer.End); - pipe.CompleteReader(); + pipe.Reader.Complete(); }; var reading = doRead(); - var buffer = pipe.Alloc(); + var buffer = pipe.Writer.Alloc(); buffer.Write(Encoding.UTF8.GetBytes("Hello World")); await buffer.FlushAsync(); @@ -51,7 +51,7 @@ public async Task ReadAsyncCallbackRunsOnReaderScheduler() [Fact] public async Task FlushCallbackRunsOnWriterScheduler() { - using (var factory = new PipelineFactory()) + using (var factory = new PipeFactory()) { using (var scheduler = new ThreadScheduler()) { @@ -62,7 +62,7 @@ public async Task FlushCallbackRunsOnWriterScheduler() WriterScheduler = scheduler }); - var writableBuffer = pipe.Alloc(64); + var writableBuffer = pipe.Writer.Alloc(64); writableBuffer.Advance(64); var flushAsync = writableBuffer.FlushAsync(); @@ -76,18 +76,18 @@ public async Task FlushCallbackRunsOnWriterScheduler() Assert.NotEqual(oid, Thread.CurrentThread.ManagedThreadId); - pipe.CompleteWriter(); + pipe.Writer.Complete(); Assert.Equal(Thread.CurrentThread.ManagedThreadId, scheduler.Thread.ManagedThreadId); }; var writing = doWrite(); - var result = await pipe.ReadAsync(); + var result = await pipe.Reader.ReadAsync(); - pipe.AdvanceReader(result.Buffer.End, result.Buffer.End); + pipe.Reader.Advance(result.Buffer.End, result.Buffer.End); - pipe.CompleteReader(); + pipe.Reader.Complete(); await writing; } @@ -97,7 +97,7 @@ public async Task FlushCallbackRunsOnWriterScheduler() [Fact] public async Task DefaultReaderSchedulerRunsInline() { - using (var factory = new PipelineFactory()) + using (var factory = new PipeFactory()) { var pipe = factory.Create(); @@ -105,24 +105,24 @@ public async Task DefaultReaderSchedulerRunsInline() Func doRead = async () => { - var result = await pipe.ReadAsync(); + var result = await pipe.Reader.ReadAsync(); Assert.Equal(Thread.CurrentThread.ManagedThreadId, id); - pipe.AdvanceReader(result.Buffer.End, result.Buffer.End); + pipe.Reader.Advance(result.Buffer.End, result.Buffer.End); - pipe.CompleteReader(); + pipe.Reader.Complete(); }; var reading = doRead(); id = Thread.CurrentThread.ManagedThreadId; - var buffer = pipe.Alloc(); + var buffer = pipe.Writer.Alloc(); buffer.Write(Encoding.UTF8.GetBytes("Hello World")); await buffer.FlushAsync(); - pipe.CompleteWriter(); + pipe.Writer.Complete(); await reading; } @@ -131,7 +131,7 @@ public async Task DefaultReaderSchedulerRunsInline() [Fact] public async Task DefaultWriterSchedulerRunsInline() { - using (var factory = new PipelineFactory()) + using (var factory = new PipeFactory()) { var pipe = factory.Create(new PipeOptions { @@ -139,7 +139,7 @@ public async Task DefaultWriterSchedulerRunsInline() MaximumSizeHigh = 64 }); - var writableBuffer = pipe.Alloc(64); + var writableBuffer = pipe.Writer.Alloc(64); writableBuffer.Advance(64); var flushAsync = writableBuffer.FlushAsync(); @@ -151,20 +151,20 @@ public async Task DefaultWriterSchedulerRunsInline() { await flushAsync; - pipe.CompleteWriter(); + pipe.Writer.Complete(); Assert.Equal(Thread.CurrentThread.ManagedThreadId, id); }; var writing = doWrite(); - var result = await pipe.ReadAsync(); + var result = await pipe.Reader.ReadAsync(); id = Thread.CurrentThread.ManagedThreadId; - pipe.AdvanceReader(result.Buffer.End, result.Buffer.End); + pipe.Reader.Advance(result.Buffer.End, result.Buffer.End); - pipe.CompleteReader(); + pipe.Reader.Complete(); await writing; } diff --git a/tests/System.IO.Pipelines.Tests/SocketsFacts.cs b/tests/System.IO.Pipelines.Tests/SocketsFacts.cs index e36ddd2d4be..e30d06dcaeb 100644 --- a/tests/System.IO.Pipelines.Tests/SocketsFacts.cs +++ b/tests/System.IO.Pipelines.Tests/SocketsFacts.cs @@ -151,7 +151,7 @@ public async Task RunStressPingPongTest_Socket() } } - static async Task> PingClient(IPipelineConnection connection, int messagesToSend) + static async Task> PingClient(IPipeConnection connection, int messagesToSend) { int count = 0; var watch = Stopwatch.StartNew(); @@ -205,7 +205,7 @@ static async Task> PingClient(IPipelineConnection connectio } - private static async Task PongServer(IPipelineConnection connection) + private static async Task PongServer(IPipeConnection connection) { while (true) { @@ -261,7 +261,7 @@ private static string SendBasicSocketMessage(IPEndPoint endpoint, string message } } - private async Task Echo(IPipelineConnection connection) + private async Task Echo(IPipeConnection connection) { while (true) { diff --git a/tests/System.IO.Pipelines.Tests/UnownedBufferReaderFacts.cs b/tests/System.IO.Pipelines.Tests/UnownedBufferReaderFacts.cs index 8199b38b772..6ed7c8b864c 100644 --- a/tests/System.IO.Pipelines.Tests/UnownedBufferReaderFacts.cs +++ b/tests/System.IO.Pipelines.Tests/UnownedBufferReaderFacts.cs @@ -499,7 +499,7 @@ public async Task NotCallingAdvanceWillCauseReadToThrow() [Fact] public async Task StreamAsPipelineReaderUsesUnderlyingPipelineReaderIfAvailable() { - var stream = new StreamAndPipelineReader(); + var stream = new StreamAndPipeReader(); var sw = new StreamWriter(stream); sw.Write("Hello"); sw.Flush(); @@ -529,7 +529,7 @@ public async Task StreamAsPipelineReaderUsesUnderlyingPipelineReaderIfAvailable( [Fact] public async Task StreamAsPipelineReaderReadStream() { - var stream = new StreamAndPipelineReader(); + var stream = new StreamAndPipeReader(); var sw = new StreamWriter(stream); sw.Write("Hello"); sw.Flush(); @@ -551,7 +551,7 @@ public async Task StreamAsPipelineReaderReadStream() Assert.Equal("World", Encoding.UTF8.GetString(readBuf, 0, read)); } - private class StreamAndPipelineReader : Stream, IPipelineReader + private class StreamAndPipeReader : Stream, IPipeReader { private readonly Pipe _pipe = new Pipe(ArrayBufferPool.Instance); @@ -582,16 +582,16 @@ public override long Position } } - public void CancelPendingRead() => _pipe.CancelPendingRead(); + public void CancelPendingRead() => _pipe.Reader.CancelPendingRead(); public void Advance(ReadCursor consumed, ReadCursor examined) { - _pipe.AdvanceReader(consumed, examined); + _pipe.Reader.Advance(consumed, examined); } public void Complete(Exception exception = null) { - _pipe.CompleteReader(exception); + _pipe.Reader.Complete(exception); } public override void Flush() @@ -611,7 +611,7 @@ public override async Task ReadAsync(byte[] buffer, int offset, int count, public ReadableBufferAwaitable ReadAsync() { - return _pipe.ReadAsync(); + return _pipe.Reader.ReadAsync(); } public override long Seek(long offset, SeekOrigin origin) @@ -634,14 +634,14 @@ public override Task WriteAsync(byte[] buffer, int offset, int count, Cancellati return _pipe.WriteAsync(new Span(buffer, offset, count)); } - public void FinishWriting() => _pipe.CompleteWriter(); + public void FinishWriting() => _pipe.Writer.Complete(); protected override void Dispose(bool disposing) { base.Dispose(disposing); - _pipe.CompleteReader(); - _pipe.CompleteWriter(); + _pipe.Reader.Complete(); + _pipe.Writer.Complete(); } } diff --git a/tests/System.IO.Pipelines.Tests/WritableBufferFacts.cs b/tests/System.IO.Pipelines.Tests/WritableBufferFacts.cs index d46d4d0758a..39b6fe3b58c 100644 --- a/tests/System.IO.Pipelines.Tests/WritableBufferFacts.cs +++ b/tests/System.IO.Pipelines.Tests/WritableBufferFacts.cs @@ -17,7 +17,7 @@ public async Task CanWriteNothingToBuffer() using (var memoryPool = new MemoryPool()) { var pipe = new Pipe(memoryPool); - var buffer = pipe.Alloc(); + var buffer = pipe.Writer.Alloc(); buffer.Advance(0); // doing nothing, the hard way await buffer.FlushAsync(); } @@ -35,11 +35,11 @@ public async Task CanWriteUInt64ToBuffer(ulong value, string valueAsString) using (var memoryPool = new MemoryPool()) { var pipe = new Pipe(memoryPool); - var buffer = pipe.Alloc(); + var buffer = pipe.Writer.Alloc(); buffer.Append(value, EncodingData.InvariantUtf8); await buffer.FlushAsync(); - var result = await pipe.ReadAsync(); + var result = await pipe.Reader.ReadAsync(); var inputBuffer = result.Buffer; Assert.Equal(valueAsString, inputBuffer.GetUtf8String()); @@ -60,16 +60,16 @@ public async Task WriteLargeDataBinary(int length) { var pipe = new Pipe(memoryPool); - var output = pipe.Alloc(); + var output = pipe.Writer.Alloc(); output.Write(data); var foo = output.Memory.IsEmpty; // trying to see if .Memory breaks await output.FlushAsync(); - pipe.CompleteWriter(); + pipe.Writer.Complete(); int offset = 0; while (true) { - var result = await pipe.ReadAsync(); + var result = await pipe.Reader.ReadAsync(); var input = result.Buffer; if (input.Length == 0) break; @@ -95,16 +95,16 @@ public async Task WriteLargeDataTextUtf8(int length) { var pipe = new Pipe(memoryPool); - var output = pipe.Alloc(); + var output = pipe.Writer.Alloc(); output.Append(data, EncodingData.InvariantUtf8); var foo = output.Memory.IsEmpty; // trying to see if .Memory breaks await output.FlushAsync(); - pipe.CompleteWriter(); + pipe.Writer.Complete(); int offset = 0; while (true) { - var result = await pipe.ReadAsync(); + var result = await pipe.Reader.ReadAsync(); var input = result.Buffer; if (input.Length == 0) break; @@ -130,16 +130,16 @@ public async Task WriteLargeDataTextAscii(int length) { var pipe = new Pipe(memoryPool); - var output = pipe.Alloc(); + var output = pipe.Writer.Alloc(); output.Append(data, EncodingData.InvariantUtf8); var foo = output.Memory.IsEmpty; // trying to see if .Memory breaks await output.FlushAsync(); - pipe.CompleteWriter(); + pipe.Writer.Complete(); int offset = 0; while (true) { - var result = await pipe.ReadAsync(); + var result = await pipe.Reader.ReadAsync(); var input = result.Buffer; if (input.Length == 0) break; @@ -171,7 +171,7 @@ public void CanReReadDataThatHasNotBeenCommitted_SmallData() using (var memoryPool = new MemoryPool()) { var pipe = new Pipe(memoryPool); - var output = pipe.Alloc(); + var output = pipe.Writer.Alloc(); Assert.True(output.AsReadableBuffer().IsEmpty); Assert.Equal(0, output.AsReadableBuffer().Length); @@ -210,7 +210,7 @@ public void CanReReadDataThatHasNotBeenCommitted_LargeData() { var pipe = new Pipe(memoryPool); - var output = pipe.Alloc(); + var output = pipe.Writer.Alloc(); byte[] predictablyGibberish = new byte[512]; const int SEED = 1235412; @@ -251,7 +251,7 @@ public async Task CanAppendSelfWhileEmpty() { var pipe = new Pipe(memoryPool); - var output = pipe.Alloc(); + var output = pipe.Writer.Alloc(); var readable = output.AsReadableBuffer(); output.Append(readable); Assert.Equal(0, output.AsReadableBuffer().Length); @@ -269,7 +269,7 @@ public async Task CanAppendSelfWhileNotEmpty() { var pipe = new Pipe(memoryPool); - var output = pipe.Alloc(); + var output = pipe.Writer.Alloc(); for (int i = 0; i < 20; i++) { @@ -291,10 +291,10 @@ public async Task CanAppendSelfWhileNotEmpty() [Fact] public void EnsureMoreThanPoolBlockSizeThrows() { - using (var factory = new PipelineFactory()) + using (var factory = new PipeFactory()) { var pipe = factory.Create(); - var buffer = pipe.Alloc(); + var buffer = pipe.Writer.Alloc(); Assert.Throws(() => buffer.Ensure(8192)); } } @@ -315,10 +315,10 @@ public static IEnumerable HexNumbers [MemberData(nameof(HexNumbers))] public void WriteHex(int value, string hex) { - using (var factory = new PipelineFactory()) + using (var factory = new PipeFactory()) { var pipe = factory.Create(); - var buffer = pipe.Alloc(); + var buffer = pipe.Writer.Alloc(); buffer.Append(value, EncodingData.InvariantUtf8, 'x'); Assert.Equal(hex, buffer.AsReadableBuffer().GetAsciiString());