Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async channel service #22

Merged
merged 5 commits into from
Jan 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 31 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -306,22 +306,48 @@ services.AddTransient<IChannelEvents, MyChannelEvents>();

## Channel Services

Sometimes it might be required to execute a long-running service inside a channel. This can be achieved by creating a class that implements `IChannelService` interface.
Sometimes it might be required to execute a long-running service inside a channel. The easiest and fastest way is to create a worker service by inheriting `ChannelService` abstract class.

```csharp
public class MyService : ChannelService
{
protected override async Task ExecuteAsync( CancellationToken cancellationToken )
{
while ( !cancellationToken.IsCancellationRequested )
{
// insert code...

/*
here we have access to the channel instance through the Channel property
*/

// Channel.WriteAsync( ... );

await Task.Delay( 1000 );
}
}
}
```

However, if you need need better control or a different approach, you can create a class that implements the `IChannelService` interface instead.

```csharp
public class MyService : IChannelService
{
// ...

public void Start( IChannel channel )
public Task StartAsync( IChannel channel, CancellationToken cancellationToken )
{
// This is invoked when a channel is created
// Invoked when a channel is created
}

public void Stop()
public Task StopAsync( CancellationToken cancellationToken )
{
// This is invoked when a channel is closed
// Invoked when a channel is closed
}

public void Dispose()
{ }
}
```

Expand Down
4 changes: 2 additions & 2 deletions src/abstractions/IChannelService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ public interface IChannelService : IDisposable
/// <summary>
/// Starts the service. Invoked when a channel is created.
/// </summary>
void Start( IChannel channel );
Task StartAsync( IChannel channel, CancellationToken cancellationToken = default );

/// <summary>
/// Stops the service. Invoked when a channel is closed.
/// </summary>
void Stop();
Task StopAsync( CancellationToken cancellationToken = default );
}
10 changes: 6 additions & 4 deletions src/buffers/ByteBufferExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,33 @@ public static class ByteBufferExtensions
/// Creates a read-only buffer from the given source
/// </summary>
/// <param name="source">The source buffer</param>
/// <param name="endianness">The buffer instance endianness; if null, the source buffer endianness is used.</param>
/// <returns>A read-only buffer instance</returns>
public static IByteBuffer MakeReadOnly( this IByteBuffer source )
public static IByteBuffer MakeReadOnly( this IByteBuffer source, Endianness? endianness = null )
{
if ( source.IsReadable && !source.IsWritable )
{
// already read-only
return ( source );
}

return new WrappedByteBuffer( source.ToArray(), source.Endianness );
return new WrappedByteBuffer( source.ToArray(), endianness ?? source.Endianness );
}

/// <summary>
/// Creates a writable buffer from the given source
/// </summary>
/// <param name="source">The source buffer</param>
/// <param name="endianness">The buffer instance endianness; if null, the source buffer endianness is used.</param>
/// <returns>A writable buffer instance</returns>
public static IByteBuffer MakeWritable( this IByteBuffer source )
public static IByteBuffer MakeWritable( this IByteBuffer source, Endianness? endianness = null )
{
if ( source.IsWritable )
{
return ( source );
}

return new WritableByteBuffer( source.ToArray(), source.Endianness );
return new WritableByteBuffer( source.ToArray(), endianness ?? source.Endianness );
}

/// <summary>
Expand Down
7 changes: 5 additions & 2 deletions src/channels/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public Channel( IServiceScope serviceScope

// notify channel created and start long-running services
this.NotifyChannelCreated();
this.StartChannelServices();
Task.Run( () => this.StartChannelServicesAsync() );
}

internal IServiceProvider ServiceProvider => channelScope.ServiceProvider;
Expand Down Expand Up @@ -124,7 +124,10 @@ protected override void OnDisconnected()

try
{
this.StopChannelServices();
Task.Run( () => this.StopChannelServicesAsync() )
.ConfigureAwait( false )
.GetAwaiter()
.GetResult();
}
catch ( Exception )
{ }
Expand Down
8 changes: 4 additions & 4 deletions src/channels/ChannelServiceExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ namespace Faactory.Channels;

internal static class ChannelServiceExtensions
{
public static void StartChannelServices( this Channel channel )
public static Task StartChannelServicesAsync( this Channel channel, CancellationToken cancellationToken = default )
=> channel.GetChannelServices()
.InvokeAll( x => x.Start( channel ) );
.InvokeAllAsync( x => x.StartAsync( channel, cancellationToken ) );

public static void StopChannelServices( this Channel channel )
public static Task StopChannelServicesAsync( this Channel channel, CancellationToken cancellationToken = default )
=> channel.GetChannelServices()
.InvokeAll( x => x.Stop() );
.InvokeAllAsync( x => x.StopAsync( cancellationToken ) );

private static IEnumerable<IChannelService> GetChannelServices( this Channel channel )
=> channel.ServiceProvider.GetServices<IChannelService>();
Expand Down
11 changes: 11 additions & 0 deletions src/channels/EnumerableActionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,15 @@ public static void InvokeAll<T>( this IEnumerable<T> services, Action<T> action
action.Invoke( service );
}
}

/// <summary>
/// Encapsulates a method with a single parameter for each item in the collection
/// </summary>
public static async Task InvokeAllAsync<T>( this IEnumerable<T> services, Func<T, Task> action )
{
var tasks = services.Select( service => action.Invoke( service ) );

await Task.WhenAll( tasks )
.ConfigureAwait( false );
}
}
53 changes: 53 additions & 0 deletions src/channels/Services/ChannelService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
namespace Faactory.Channels;

public abstract class ChannelService : IChannelService
{
private Task? task;
private CancellationTokenSource? cancellationTokenSource;

public IChannel? Channel { get; private set; }

public virtual Task StartAsync( IChannel channel, CancellationToken cancellationToken )
{
Channel = channel;

cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource( cancellationToken );

task = ExecuteAsync( cancellationTokenSource.Token );

return task.IsCompleted
? task
: Task.CompletedTask;
}

public virtual async Task StopAsync( CancellationToken cancellationToken )
{
if ( task is null )
{
return;
}

try
{
cancellationTokenSource?.Cancel();
}
finally
{
var taskCompletion = new TaskCompletionSource<object>();

using ( CancellationTokenRegistration tokenRegistration = cancellationToken.Register( x => ((TaskCompletionSource<object>?)x)?.SetCanceled(), taskCompletion ) )
{
await Task.WhenAny( task, taskCompletion.Task )
.ConfigureAwait( false );
}
}
}

public virtual void Dispose()
{
cancellationTokenSource?.Cancel();
Channel = null;
}

protected abstract Task ExecuteAsync( CancellationToken cancellationToken );
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ ILoggerFactory loggerFactory
}
}

public void Start( IChannel channel )
public Task StartAsync( IChannel channel, CancellationToken cancellationToken )
{
logger.LogDebug( "Starting..." );

Expand All @@ -37,7 +37,7 @@ public void Start( IChannel channel )
// not enabled
logger.LogInformation( "Canceled. Idle detection mode is set to 'None'." );

return;
return Task.CompletedTask;
}

var dueTime = ( detectionMode == IdleDetectionMode.Auto )
Expand All @@ -53,9 +53,11 @@ public void Start( IChannel channel )
, dueTime, intervalTime );

logger.LogInformation( "Started." );

return Task.CompletedTask;
}

public void Stop()
public Task StopAsync( CancellationToken cancellationToken )
{
logger.LogDebug( "Stopping..." );

Expand All @@ -64,7 +66,7 @@ public void Stop()
// not active
logger.LogDebug( "Already stopped." );

return;
return Task.CompletedTask;
}

try
Expand All @@ -80,6 +82,8 @@ public void Stop()
}

logger.LogInformation( "Stopped." );

return Task.CompletedTask;
}

public void Dispose()
Expand All @@ -97,7 +101,10 @@ private void IdleDetectionTimeoutCallback( object? state )
{
logger.LogWarning( $"Channel doesn't seem to be active anymore. Closing..." );

Stop();
StopAsync( CancellationToken.None )
.ConfigureAwait( false )
.GetAwaiter()
.GetResult();

channel.CloseAsync()
.ConfigureAwait( false )
Expand Down Expand Up @@ -149,7 +156,10 @@ private void IdleDetectionTimeoutCallback( object? state )
var seconds = (int)timeout.TotalSeconds;
logger.LogWarning( $"Channel has been idle for more than {seconds} seconds. Closing..." );

Stop();
StopAsync( CancellationToken.None )
.ConfigureAwait( false )
.GetAwaiter()
.GetResult();

channel.CloseAsync()
.ConfigureAwait( false )
Expand Down
9 changes: 7 additions & 2 deletions tests/ScopedServiceTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Linq;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Faactory.Channels;
using Faactory.Channels.Adapters;
Expand All @@ -22,14 +23,18 @@ private class MyService : IChannelService
public void Dispose()
{ }

public void Start( IChannel channel )
public Task StartAsync( IChannel channel, CancellationToken cancellationToken )
{
Status = "started";

return Task.CompletedTask;
}

public void Stop()
public Task StopAsync( CancellationToken cancellationToken )
{
Status = "stopped";

return Task.CompletedTask;
}
}

Expand Down