Skip to content

Commit

Permalink
Merge branch 'master' into async
Browse files Browse the repository at this point in the history
  • Loading branch information
electricessence authored Apr 16, 2018
2 parents 82ecbbc + b4f9a52 commit f094ce0
Show file tree
Hide file tree
Showing 5 changed files with 282 additions and 226 deletions.
124 changes: 65 additions & 59 deletions AsyncFileWriter/AsyncFileWriter.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
using System;
using System.Diagnostics.Contracts;
using System.IO;
using System.Text;
Expand All @@ -9,12 +9,14 @@

namespace Open
{
public class AsyncFileWriter : IDisposableAsync, ITargetBlock<byte[]>, ITargetBlock<char[]>, ITargetBlock<string>
public class AsyncFileWriter : IDisposable, ITargetBlock<byte[]>, ITargetBlock<char[]>, ITargetBlock<string>
{
public readonly string FilePath;
public readonly int BoundedCapacity;
public readonly Encoding Encoding;
public readonly FileShare FileShareMode;
public readonly bool AsyncFileStream;
public readonly int BufferSize;

bool _declinePermanently;
readonly Channel<byte[]> _channel;
Expand All @@ -27,15 +29,22 @@ public class AsyncFileWriter : IDisposableAsync, ITargetBlock<byte[]>, ITargetBl
/// <param name="boundedCapacity">The maximum number of entries to allow before blocking producing threads.</param>
/// <param name="encoding">The encoding type to use for transforming strings and characters to bytes. The default is UTF8.</param>
/// <param name="fileSharingMode">The file sharing mode to use. The default is FileShare.None (will not allow multiple writers). </param>
public AsyncFileWriter(string filePath, int boundedCapacity, Encoding encoding = null, FileShare fileSharingMode = FileShare.None)
public AsyncFileWriter(string filePath, int boundedCapacity, Encoding encoding = null, FileShare fileSharingMode = FileShare.None, int bufferSize = 4096 * 4, bool asyncFileStream = false)
{
FilePath = filePath ?? throw new ArgumentNullException(nameof(filePath));
BoundedCapacity = boundedCapacity;
Encoding = encoding ?? Encoding.UTF8;
FileShareMode = fileSharingMode;
BufferSize = bufferSize;
AsyncFileStream = asyncFileStream;

_channel = Channel.CreateBounded<byte[]>(boundedCapacity);
Completion = ProcessBytes();
Completion = ProcessBytesAsync()
.ContinueWith(
t => t.IsCompleted
? _channel.Reader.Completion
: t)
.Unwrap(); // Propagate the task state...
}

#endregion
Expand All @@ -57,27 +66,38 @@ public Task Complete()
}
#endregion

async Task ProcessBytes()
async Task ProcessBytesAsync()
{
var reader = _channel.Reader;
using (var fs = new FileStream(FilePath, FileMode.Append, FileAccess.Write, FileShareMode, bufferSize: 4096 * 4 /*, useAsync: true */))
while (await reader.WaitToReadAsync().ConfigureAwait(false))
{
while (await reader.WaitToReadAsync().ConfigureAwait(false))
using (var fs = new FileStream(FilePath, FileMode.Append, FileAccess.Write, FileShareMode, bufferSize: BufferSize, useAsync: AsyncFileStream))
{
while (reader.TryRead(out byte[] bytes))
if (AsyncFileStream)
{
//await fs.WriteAsync(bytes, 0, bytes.Length).ConfigureAwait(false);
fs.Write(bytes, 0, bytes.Length);
Task writeTask = Task.CompletedTask;
while (reader.TryRead(out byte[] bytes))
{
await writeTask.ConfigureAwait(false);
writeTask = fs.WriteAsync(bytes, 0, bytes.Length);
}

await writeTask.ConfigureAwait(false);
// FlushAsync here rather than block in Dispose on Flush
await fs.FlushAsync().ConfigureAwait(false);
}
else
{
while (reader.TryRead(out byte[] bytes))
{
fs.Write(bytes, 0, bytes.Length);
}
}
}

// FlushAsync here rather than block in Dispose on Flush
await fs.FlushAsync().ConfigureAwait(false);
}

await _channel.Reader.Completion.ConfigureAwait(false);
}

#region Add (queue) data methods.
void AssertWritable(bool writing)
{
if (!writing)
Expand All @@ -93,32 +113,19 @@ void AssertWritable(bool writing)
/// Queues bytes for writing to the file.
/// If the .Complete() method was called, this will throw an InvalidOperationException.
/// </summary>
public async Task AddAsync(byte[] bytes)
public async Task AddAsync(byte[] bytes, params byte[][] more)
{
if (bytes == null) throw new ArgumentNullException(nameof(bytes));
Contract.EndContractBlock();

if (_disposeState != 0) throw new ObjectDisposedException(GetType().ToString());
if (_disposer != null) throw new ObjectDisposedException(GetType().ToString());
AssertWritable(!_declinePermanently);

while (!_channel.Writer.TryWrite(bytes))
{
var written = await _channel.Writer.WaitToWriteAsync().ConfigureAwait(false);
AssertWritable(written);
}
}

/// <summary>
/// Queues bytes for writing to the file.
/// If the .Complete() method was called, this will throw an InvalidOperationException.
/// </summary>
public async Task AddAsync(byte[] bytes, params byte[][] more)
{
if (bytes == null) throw new ArgumentNullException(nameof(bytes));
Contract.EndContractBlock();

if (_disposeState != 0) throw new ObjectDisposedException(GetType().ToString());

await AddAsync(bytes);

if (more.Length != 0) foreach (var v in more) await AddAsync(v);
}
Expand Down Expand Up @@ -163,47 +170,46 @@ public async Task AddLineAsync(string line = null, params string[] more)

if (more.Length != 0) foreach (var v in more) await AddLineAsync(v);
}
#endregion

#region IDisposable Support
int _disposeState = 0;
Lazy<Task> _disposer;

protected virtual async Task DisposeAsync(bool calledExplicitly)
{
if (0 == Interlocked.CompareExchange(ref _disposeState, 0, 1))
{
if (calledExplicitly)
{
await Complete().ConfigureAwait(false);
}
else
protected Task DisposeAsync(bool calledExplicitly)
// EnsureInitialized is optimistic.
=> LazyInitializer.EnsureInitialized(ref _disposer,
// Lazy is pessimistic.
() => new Lazy<Task>(() => Task.Run(async () =>
{
// Left for the GC? :(
_channel.Writer.TryComplete(); // First try and mark as complete as if normal.
_channel.Writer.TryComplete(new ObjectDisposedException(GetType().ToString()));
}
if (calledExplicitly)
{
await Complete().ConfigureAwait(false);
}
else
{
// Left for the GC? :(
_channel.Writer.TryComplete(); // First try and mark as complete as if normal.
_channel.Writer.TryComplete(new ObjectDisposedException(GetType().ToString()));
}
}))).Value;

Interlocked.Exchange(ref _disposeState, 2);
}
public async Task DisposeAsync()
{
// Do not change this code. Put cleanup code in Dispose(bool disposing) above.
await DisposeAsync(true).ConfigureAwait(false);
// TODO: uncomment the following line if the finalizer is overridden above.
GC.SuppressFinalize(this);
}

~AsyncFileWriter()
{
// Do not change this code. Put cleanup code in Dispose(bool disposing) above.
DisposeAsync(false).Wait();
}

/// <summary>
/// Signals completion and waits for all bytes to be written to the destination.
/// If immediately cancellation of activity is required, call .CompleteImmediate() before disposing.
/// </summary>
public async Task DisposeAsync()
public void Dispose()
{
// Do not change this code. Put cleanup code in Dispose(bool disposing) above.
await DisposeAsync(true).ConfigureAwait(false);
// TODO: uncomment the following line if the finalizer is overridden above.
GC.SuppressFinalize(this);
DisposeAsync(true).Wait();
}

#endregion

#region ITargetBlock
Expand Down Expand Up @@ -232,7 +238,7 @@ public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, s

void IDataflowBlock.Complete()
{
this.Complete().Wait();
Complete();
}

public void Fault(Exception exception)
Expand Down
75 changes: 75 additions & 0 deletions AsyncFileWriterTester/AsyncTester.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
using Open;
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Diagnostics.Contracts;
using System.IO;
using System.Linq;
using System.Threading.Tasks;

namespace AsyncFileWriterTester
{
public class AsyncTester
{
public readonly string FileName;

public AsyncTester(string fileName = "AsyncFileWriterTest.txt")
{
FileName = fileName ?? throw new ArgumentNullException(nameof(fileName));
}

public async Task<(int TotalBytesQueued, TimeSpan AggregateTimeWaiting, TimeSpan Elapsed)> Run(Func<string, Func<Func<string, Task>, Task>, Task> context)
{
if (context == null) throw new ArgumentNullException(nameof(context));
Contract.EndContractBlock();

var dir = Environment.CurrentDirectory;
var filePath = Path.Combine(dir, FileName);
File.Delete(filePath); // Start from scratch. (comment out for further appends.)

var telemetry = new ConcurrentBag<(int bytes, TimeSpan time)>();

var sw = Stopwatch.StartNew();
await context(filePath, async writeHandler =>
{
void write(int i)
{
var message = $"{i}) {DateTime.Now} 00000000000000000000000000000000111111111111111111111111111222222222222222222222222222\n";
var t = Stopwatch.StartNew();
writeHandler(message).Wait();
telemetry.Add((message.Length, t.Elapsed));
}

Parallel.For(0, 10000, write);
Parallel.For(10000, 20000, write);

//writer.Fault(new Exception("Stop!"));

Task.Delay(1).Wait();
Parallel.For(20000, 100000, write);

Task.Delay(1000).Wait(); // Demonstrate that when nothing buffered the active stream closes.
Parallel.For(100000, 1000000, write);

await Task.Yield();
});
sw.Stop();

var (bytes, time) = telemetry.Aggregate((a, b) => (a.bytes + b.bytes, a.time + b.time));
return (bytes, time, sw.Elapsed);
}

public static async Task RunAndReportToConsole(Func<string, Func<Func<string, Task>, Task>, Task> context, string fileName = "AsyncFileWriterTest.txt")
=> (await new AsyncTester(fileName).Run(context)).EmitToConsole();

public static Task TestAsyncFileWriter(int boundedCapacity = -1)
{
Console.WriteLine("{0:#,##0} bounded capacity.", boundedCapacity);
return RunAndReportToConsole(async (filePath, handler) =>
{
using (var writer = new AsyncFileWriter(filePath, boundedCapacity, asyncFileStream: true))
await handler(s => writer.AddAsync(s));
});
}
}
}
16 changes: 16 additions & 0 deletions AsyncFileWriterTester/Extensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using System;

namespace AsyncFileWriterTester
{
public static class Extensions
{
public static void EmitToConsole(this (int TotalBytesQueued, TimeSpan AggregateTimeWaiting, TimeSpan Elapsed) run)
{
Console.WriteLine("Total Time: {0} seconds", run.Elapsed.TotalSeconds);
Console.WriteLine("Total Bytes: {0:#,##0}", run.TotalBytesQueued);
Console.WriteLine("Aggregate Waiting: {0}", run.AggregateTimeWaiting);
Console.WriteLine("------------------------");
Console.WriteLine();
}
}
}
Loading

0 comments on commit f094ce0

Please sign in to comment.