diff --git a/README.md b/README.md index f1503dc..1d0e562 100644 --- a/README.md +++ b/README.md @@ -306,22 +306,48 @@ services.AddTransient(); ## Channel Services -Sometimes it might be required to execute a long-running service inside a channel. This can be achieved by creating a class that implements `IChannelService` interface. +Sometimes it might be required to execute a long-running service inside a channel. The easiest and fastest way is to create a worker service by inheriting `ChannelService` abstract class. + +```csharp +public class MyService : ChannelService +{ + protected override async Task ExecuteAsync( CancellationToken cancellationToken ) + { + while ( !cancellationToken.IsCancellationRequested ) + { + // insert code... + + /* + here we have access to the channel instance through the Channel property + */ + + // Channel.WriteAsync( ... ); + + await Task.Delay( 1000 ); + } + } +} +``` + +However, if you need need better control or a different approach, you can create a class that implements the `IChannelService` interface instead. ```csharp public class MyService : IChannelService { // ... - public void Start( IChannel channel ) + public Task StartAsync( IChannel channel, CancellationToken cancellationToken ) { - // This is invoked when a channel is created + // Invoked when a channel is created } - public void Stop() + public Task StopAsync( CancellationToken cancellationToken ) { - // This is invoked when a channel is closed + // Invoked when a channel is closed } + + public void Dispose() + { } } ``` diff --git a/src/abstractions/IChannelService.cs b/src/abstractions/IChannelService.cs index c1d8254..086a070 100644 --- a/src/abstractions/IChannelService.cs +++ b/src/abstractions/IChannelService.cs @@ -8,10 +8,10 @@ public interface IChannelService : IDisposable /// /// Starts the service. Invoked when a channel is created. /// - void Start( IChannel channel ); + Task StartAsync( IChannel channel, CancellationToken cancellationToken = default ); /// /// Stops the service. Invoked when a channel is closed. /// - void Stop(); + Task StopAsync( CancellationToken cancellationToken = default ); } diff --git a/src/buffers/ByteBufferExtensions.cs b/src/buffers/ByteBufferExtensions.cs index aa3441f..2aafd66 100644 --- a/src/buffers/ByteBufferExtensions.cs +++ b/src/buffers/ByteBufferExtensions.cs @@ -6,8 +6,9 @@ public static class ByteBufferExtensions /// Creates a read-only buffer from the given source /// /// The source buffer + /// The buffer instance endianness; if null, the source buffer endianness is used. /// A read-only buffer instance - public static IByteBuffer MakeReadOnly( this IByteBuffer source ) + public static IByteBuffer MakeReadOnly( this IByteBuffer source, Endianness? endianness = null ) { if ( source.IsReadable && !source.IsWritable ) { @@ -15,22 +16,23 @@ public static IByteBuffer MakeReadOnly( this IByteBuffer source ) return ( source ); } - return new WrappedByteBuffer( source.ToArray(), source.Endianness ); + return new WrappedByteBuffer( source.ToArray(), endianness ?? source.Endianness ); } /// /// Creates a writable buffer from the given source /// /// The source buffer + /// The buffer instance endianness; if null, the source buffer endianness is used. /// A writable buffer instance - public static IByteBuffer MakeWritable( this IByteBuffer source ) + public static IByteBuffer MakeWritable( this IByteBuffer source, Endianness? endianness = null ) { if ( source.IsWritable ) { return ( source ); } - return new WritableByteBuffer( source.ToArray(), source.Endianness ); + return new WritableByteBuffer( source.ToArray(), endianness ?? source.Endianness ); } /// diff --git a/src/channels/Channel.cs b/src/channels/Channel.cs index a1245f6..7f2369b 100644 --- a/src/channels/Channel.cs +++ b/src/channels/Channel.cs @@ -36,7 +36,7 @@ public Channel( IServiceScope serviceScope // notify channel created and start long-running services this.NotifyChannelCreated(); - this.StartChannelServices(); + Task.Run( () => this.StartChannelServicesAsync() ); } internal IServiceProvider ServiceProvider => channelScope.ServiceProvider; @@ -124,7 +124,10 @@ protected override void OnDisconnected() try { - this.StopChannelServices(); + Task.Run( () => this.StopChannelServicesAsync() ) + .ConfigureAwait( false ) + .GetAwaiter() + .GetResult(); } catch ( Exception ) { } diff --git a/src/channels/ChannelServiceExtensions.cs b/src/channels/ChannelServiceExtensions.cs index 5611bd2..378bd24 100644 --- a/src/channels/ChannelServiceExtensions.cs +++ b/src/channels/ChannelServiceExtensions.cs @@ -4,13 +4,13 @@ namespace Faactory.Channels; internal static class ChannelServiceExtensions { - public static void StartChannelServices( this Channel channel ) + public static Task StartChannelServicesAsync( this Channel channel, CancellationToken cancellationToken = default ) => channel.GetChannelServices() - .InvokeAll( x => x.Start( channel ) ); + .InvokeAllAsync( x => x.StartAsync( channel, cancellationToken ) ); - public static void StopChannelServices( this Channel channel ) + public static Task StopChannelServicesAsync( this Channel channel, CancellationToken cancellationToken = default ) => channel.GetChannelServices() - .InvokeAll( x => x.Stop() ); + .InvokeAllAsync( x => x.StopAsync( cancellationToken ) ); private static IEnumerable GetChannelServices( this Channel channel ) => channel.ServiceProvider.GetServices(); diff --git a/src/channels/EnumerableActionExtensions.cs b/src/channels/EnumerableActionExtensions.cs index b524d4f..76d4395 100644 --- a/src/channels/EnumerableActionExtensions.cs +++ b/src/channels/EnumerableActionExtensions.cs @@ -10,4 +10,15 @@ public static void InvokeAll( this IEnumerable services, Action action action.Invoke( service ); } } + + /// + /// Encapsulates a method with a single parameter for each item in the collection + /// + public static async Task InvokeAllAsync( this IEnumerable services, Func action ) + { + var tasks = services.Select( service => action.Invoke( service ) ); + + await Task.WhenAll( tasks ) + .ConfigureAwait( false ); + } } diff --git a/src/channels/Services/ChannelService.cs b/src/channels/Services/ChannelService.cs new file mode 100644 index 0000000..54c117b --- /dev/null +++ b/src/channels/Services/ChannelService.cs @@ -0,0 +1,53 @@ +namespace Faactory.Channels; + +public abstract class ChannelService : IChannelService +{ + private Task? task; + private CancellationTokenSource? cancellationTokenSource; + + public IChannel? Channel { get; private set; } + + public virtual Task StartAsync( IChannel channel, CancellationToken cancellationToken ) + { + Channel = channel; + + cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource( cancellationToken ); + + task = ExecuteAsync( cancellationTokenSource.Token ); + + return task.IsCompleted + ? task + : Task.CompletedTask; + } + + public virtual async Task StopAsync( CancellationToken cancellationToken ) + { + if ( task is null ) + { + return; + } + + try + { + cancellationTokenSource?.Cancel(); + } + finally + { + var taskCompletion = new TaskCompletionSource(); + + using ( CancellationTokenRegistration tokenRegistration = cancellationToken.Register( x => ((TaskCompletionSource?)x)?.SetCanceled(), taskCompletion ) ) + { + await Task.WhenAny( task, taskCompletion.Task ) + .ConfigureAwait( false ); + } + } + } + + public virtual void Dispose() + { + cancellationTokenSource?.Cancel(); + Channel = null; + } + + protected abstract Task ExecuteAsync( CancellationToken cancellationToken ); +} diff --git a/src/channels/IdleMonitor/IdleChannelService.cs b/src/channels/Services/IdleMonitor/IdleChannelService.cs similarity index 87% rename from src/channels/IdleMonitor/IdleChannelService.cs rename to src/channels/Services/IdleMonitor/IdleChannelService.cs index 25f1a81..11cdc60 100644 --- a/src/channels/IdleMonitor/IdleChannelService.cs +++ b/src/channels/Services/IdleMonitor/IdleChannelService.cs @@ -28,7 +28,7 @@ ILoggerFactory loggerFactory } } - public void Start( IChannel channel ) + public Task StartAsync( IChannel channel, CancellationToken cancellationToken ) { logger.LogDebug( "Starting..." ); @@ -37,7 +37,7 @@ public void Start( IChannel channel ) // not enabled logger.LogInformation( "Canceled. Idle detection mode is set to 'None'." ); - return; + return Task.CompletedTask; } var dueTime = ( detectionMode == IdleDetectionMode.Auto ) @@ -53,9 +53,11 @@ public void Start( IChannel channel ) , dueTime, intervalTime ); logger.LogInformation( "Started." ); + + return Task.CompletedTask; } - public void Stop() + public Task StopAsync( CancellationToken cancellationToken ) { logger.LogDebug( "Stopping..." ); @@ -64,7 +66,7 @@ public void Stop() // not active logger.LogDebug( "Already stopped." ); - return; + return Task.CompletedTask; } try @@ -80,6 +82,8 @@ public void Stop() } logger.LogInformation( "Stopped." ); + + return Task.CompletedTask; } public void Dispose() @@ -97,7 +101,10 @@ private void IdleDetectionTimeoutCallback( object? state ) { logger.LogWarning( $"Channel doesn't seem to be active anymore. Closing..." ); - Stop(); + StopAsync( CancellationToken.None ) + .ConfigureAwait( false ) + .GetAwaiter() + .GetResult(); channel.CloseAsync() .ConfigureAwait( false ) @@ -149,7 +156,10 @@ private void IdleDetectionTimeoutCallback( object? state ) var seconds = (int)timeout.TotalSeconds; logger.LogWarning( $"Channel has been idle for more than {seconds} seconds. Closing..." ); - Stop(); + StopAsync( CancellationToken.None ) + .ConfigureAwait( false ) + .GetAwaiter() + .GetResult(); channel.CloseAsync() .ConfigureAwait( false ) diff --git a/src/channels/IdleMonitor/IdleChannelServiceOptions.cs b/src/channels/Services/IdleMonitor/IdleChannelServiceOptions.cs similarity index 100% rename from src/channels/IdleMonitor/IdleChannelServiceOptions.cs rename to src/channels/Services/IdleMonitor/IdleChannelServiceOptions.cs diff --git a/src/channels/IdleMonitor/IdleDetectionMode.cs b/src/channels/Services/IdleMonitor/IdleDetectionMode.cs similarity index 100% rename from src/channels/IdleMonitor/IdleDetectionMode.cs rename to src/channels/Services/IdleMonitor/IdleDetectionMode.cs diff --git a/tests/ScopedServiceTests.cs b/tests/ScopedServiceTests.cs index ad79671..47c6c62 100644 --- a/tests/ScopedServiceTests.cs +++ b/tests/ScopedServiceTests.cs @@ -3,6 +3,7 @@ using System.Linq; using System.Net.Sockets; using System.Text; +using System.Threading; using System.Threading.Tasks; using Faactory.Channels; using Faactory.Channels.Adapters; @@ -22,14 +23,18 @@ private class MyService : IChannelService public void Dispose() { } - public void Start( IChannel channel ) + public Task StartAsync( IChannel channel, CancellationToken cancellationToken ) { Status = "started"; + + return Task.CompletedTask; } - public void Stop() + public Task StopAsync( CancellationToken cancellationToken ) { Status = "stopped"; + + return Task.CompletedTask; } }