diff --git a/Thor.sln b/Thor.sln index addf530..89c991c 100644 --- a/Thor.sln +++ b/Thor.sln @@ -53,6 +53,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Thor.Moonshot", "src\extens EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Thor.Core", "src\framework\Thor.Core\Thor.Core.csproj", "{88790F95-50F1-4A17-8BE8-287D68E433CD}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Thor.RabbitMQEvent", "src\framework\Thor.RabbitMQEvent\Thor.RabbitMQEvent.csproj", "{C1F516B3-E06A-4F2A-8465-FC3D8C7645D3}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -131,6 +133,10 @@ Global {88790F95-50F1-4A17-8BE8-287D68E433CD}.Debug|Any CPU.Build.0 = Debug|Any CPU {88790F95-50F1-4A17-8BE8-287D68E433CD}.Release|Any CPU.ActiveCfg = Release|Any CPU {88790F95-50F1-4A17-8BE8-287D68E433CD}.Release|Any CPU.Build.0 = Release|Any CPU + {C1F516B3-E06A-4F2A-8465-FC3D8C7645D3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {C1F516B3-E06A-4F2A-8465-FC3D8C7645D3}.Debug|Any CPU.Build.0 = Debug|Any CPU + {C1F516B3-E06A-4F2A-8465-FC3D8C7645D3}.Release|Any CPU.ActiveCfg = Release|Any CPU + {C1F516B3-E06A-4F2A-8465-FC3D8C7645D3}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -156,6 +162,7 @@ Global {84C27CF3-BD3C-46D6-A774-967DB5D3F060} = {1035B36B-8194-43D4-8A4A-992D962CD1D8} {4D98C74B-D071-430C-9956-3A9B8CF642DD} = {294B7A34-48F5-43AB-A1D3-033480559E3A} {88790F95-50F1-4A17-8BE8-287D68E433CD} = {1035B36B-8194-43D4-8A4A-992D962CD1D8} + {C1F516B3-E06A-4F2A-8465-FC3D8C7645D3} = {1035B36B-8194-43D4-8A4A-992D962CD1D8} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {8A80931C-B951-4EF2-AC79-457E73118E5F} diff --git a/src/Thor.Service/EventBus/ChannelEventHandler.cs b/src/Thor.Service/EventBus/ChannelEventHandler.cs index da2f3f7..97bc0bf 100644 --- a/src/Thor.Service/EventBus/ChannelEventHandler.cs +++ b/src/Thor.Service/EventBus/ChannelEventHandler.cs @@ -7,69 +7,26 @@ namespace Thor.Service.EventBus; public sealed class ChannelEventHandler : IEventHandler, IDisposable { private readonly ILogger _logger; - private readonly IServiceProvider _serviceProvider; private readonly IServiceScope _scope; - private readonly Timer _timer; - private readonly Channel _events = Channel.CreateUnbounded(); - private readonly SemaphoreSlim _semaphore = new(1, 1); - - private const int Interval = 30000; + private readonly LoggerDbContext _loggerDbContext; public ChannelEventHandler(IServiceProvider serviceProvider, ILogger logger) { _logger = logger; _scope = serviceProvider.CreateScope(); - _serviceProvider = _scope.ServiceProvider; - _timer = new Timer(Flush, null, Interval, Interval); + _loggerDbContext = _scope.ServiceProvider.GetRequiredService(); } public async Task HandleAsync(ChatLogger @event) { - await _events.Writer.WriteAsync(@event); - } - - private async void Flush(object state) - { - await FlushAsync(); - } - - private async Task FlushAsync() - { - await _semaphore.WaitAsync(); - - try - { - var currentEvents = new List(); - - _logger.LogInformation("Flushing events to database"); - - while (_events.Reader.TryRead(out var value)) - { - currentEvents.Add(value); - } - - _logger.LogInformation($"Flushing {currentEvents.Count} events to database"); - - if (!currentEvents.Any()) - { - return; - } - - var loggerDbContext = _serviceProvider.GetRequiredService(); - await loggerDbContext.Loggers.AddRangeAsync(currentEvents); - await loggerDbContext.SaveChangesAsync(); + await _loggerDbContext.Loggers.AddAsync(@event); + await _loggerDbContext.SaveChangesAsync(); - _logger.LogInformation("Events flushed to database"); - } - finally - { - _semaphore.Release(); - } + _logger.LogInformation("ChatLogger event received"); } public void Dispose() { - _timer.Dispose(); _scope.Dispose(); } } \ No newline at end of file diff --git a/src/Thor.Service/Program.cs b/src/Thor.Service/Program.cs index 4d27915..79772a5 100644 --- a/src/Thor.Service/Program.cs +++ b/src/Thor.Service/Program.cs @@ -27,6 +27,7 @@ using Thor.Service.Service; using Thor.SparkDesk.Extensions; using Thor.Abstractions.Chats.Dtos; +using Thor.RabbitMQEvent; try { @@ -62,12 +63,22 @@ builder.Services.AddRedisMemory(CacheOptions.ConnectionString); } + var rabbitMQConnectionString = builder.Configuration["RabbitMQ:ConnectionString"]; + if (!string.IsNullOrEmpty(rabbitMQConnectionString)) + { + builder.Services.AddRabbitMQEventBus(builder.Configuration); + } + else + { + builder.Services.AddLocalEventBus(); + } + + builder.Services.AddMvcCore().AddApiExplorer(); builder.Services .AddEndpointsApiExplorer() .AddSwaggerGen() .AddSingleton, ChannelEventHandler>() - .AddLocalEventBus() .AddCustomAuthentication() .AddHttpContextAccessor() .AddTransient() diff --git a/src/Thor.Service/Thor.Service.csproj b/src/Thor.Service/Thor.Service.csproj index d95aba2..32c863b 100644 --- a/src/Thor.Service/Thor.Service.csproj +++ b/src/Thor.Service/Thor.Service.csproj @@ -55,6 +55,7 @@ + diff --git a/src/Thor.Service/appsettings.json b/src/Thor.Service/appsettings.json index a2143e9..70d8937 100644 --- a/src/Thor.Service/appsettings.json +++ b/src/Thor.Service/appsettings.json @@ -13,6 +13,10 @@ "DBType": "sqlite" // sqlite | [postgresql,pgsql] | [sqlserver,mssql] | mysql }, + "RabbitMQ": { + "ConnectionString": "" + // amqp://token:dd666666@localhost:5672 + }, "Chat": { "Master": "", "Shared": { diff --git a/src/framework/Thor.RabbitMQEvent/EventEto.cs b/src/framework/Thor.RabbitMQEvent/EventEto.cs new file mode 100644 index 0000000..77eadef --- /dev/null +++ b/src/framework/Thor.RabbitMQEvent/EventEto.cs @@ -0,0 +1,14 @@ +namespace Thor.RabbitMQEvent; + +public class EventEto +{ + public string FullName { get; set; } + + public byte[] Data { get; set; } + + public EventEto(string fullName, byte[] data) + { + FullName = fullName; + Data = data; + } +} \ No newline at end of file diff --git a/src/framework/Thor.RabbitMQEvent/RabbitMQEventBus.cs b/src/framework/Thor.RabbitMQEvent/RabbitMQEventBus.cs new file mode 100644 index 0000000..20d4593 --- /dev/null +++ b/src/framework/Thor.RabbitMQEvent/RabbitMQEventBus.cs @@ -0,0 +1,16 @@ +using System.Text.Json; +using Raccoon.Stack.Rabbit; +using Thor.BuildingBlocks.Data; + +namespace Thor.RabbitMQEvent; + +public class RabbitMQEventBus(RabbitClient rabbitClient) : IEventBus where TEvent : class +{ + public async ValueTask PublishAsync(TEvent @event) + { + var eto = new EventEto(@event.GetType().FullName, JsonSerializer.SerializeToUtf8Bytes(@event)); + + await rabbitClient.PublishAsync("Thor:EventBus:exchange", "Thor:EventBus:key", + JsonSerializer.SerializeToUtf8Bytes(eto)); + } +} \ No newline at end of file diff --git a/src/framework/Thor.RabbitMQEvent/RabbitMQEventHandler.cs b/src/framework/Thor.RabbitMQEvent/RabbitMQEventHandler.cs new file mode 100644 index 0000000..08d7a61 --- /dev/null +++ b/src/framework/Thor.RabbitMQEvent/RabbitMQEventHandler.cs @@ -0,0 +1,42 @@ +using System.Reflection; +using System.Text.Json; +using Microsoft.Extensions.DependencyInjection; +using RabbitMQ.Client.Events; +using Raccoon.Stack.Rabbit; +using Raccoon.Stack.Rabbit.Handler; +using Thor.BuildingBlocks.Data; + +namespace Thor.RabbitMQEvent; + +public class RabbitMQEventHandler : IRabbitHandler +{ + public bool Enable(ConsumeOptions options) + { + return options.Queue.Equals("Thor:EventBus", StringComparison.OrdinalIgnoreCase); + } + + public async Task Handle(IServiceProvider sp, BasicDeliverEventArgs args, ConsumeOptions options) + { + var eto = JsonSerializer.Deserialize(args.Body.ToArray()); + + // type : Thor.Service.Domain.ChatLogger + + var type = Assembly.GetEntryAssembly()?.GetType(eto.FullName); + + if (type == null) + { + return; + } + + var @event = JsonSerializer.Deserialize(eto.Data, type); + + // IEventHandler + var handlerType = typeof(IEventHandler<>).MakeGenericType(type); + + var handler = sp.GetRequiredService(handlerType); + + var method = handlerType.GetMethod("HandleAsync"); + + await (Task)method.Invoke(handler, new object[] { @event }); + } +} \ No newline at end of file diff --git a/src/framework/Thor.RabbitMQEvent/ServiceCollectionExtensions.cs b/src/framework/Thor.RabbitMQEvent/ServiceCollectionExtensions.cs new file mode 100644 index 0000000..803dc22 --- /dev/null +++ b/src/framework/Thor.RabbitMQEvent/ServiceCollectionExtensions.cs @@ -0,0 +1,45 @@ +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using RabbitMQ.Client; +using Raccoon.Stack.Rabbit; +using Thor.BuildingBlocks.Data; + +namespace Thor.RabbitMQEvent; + +public static class ServiceCollectionExtensions +{ + public static IServiceCollection AddRabbitMQEventBus(this IServiceCollection services, IConfiguration configuration) + { + // 是否启用RabbitMQ + var connection = configuration["RabbitMQ:ConnectionString"]; + if (string.IsNullOrWhiteSpace(connection)) + { + return services; + } + + services + .AddSingleton(typeof(IEventBus<>), typeof(RabbitMQEventBus<>)); + + services.AddRabbitBoot((options => + { + options.ConnectionString = connection; + options.Consumes = + [ + new ConsumeOptions + { + AutoAck = false, + FetchCount = 10, + Queue = "Thor:EventBus", + Declaration = (declaration => + { + declaration.QueueDeclareAsync("Thor:EventBus", true); + declaration.ExchangeDeclareAsync("Thor:EventBus:exchange", ExchangeType.Direct, true); + declaration.QueueBindAsync("Thor:EventBus", "Thor:EventBus:exchange", "Thor:EventBus:key"); + }) + } + ]; + }), typeof(RabbitMQEventBus<>).Assembly); + + return services; + } +} \ No newline at end of file diff --git a/src/framework/Thor.RabbitMQEvent/Thor.RabbitMQEvent.csproj b/src/framework/Thor.RabbitMQEvent/Thor.RabbitMQEvent.csproj new file mode 100644 index 0000000..de39e77 --- /dev/null +++ b/src/framework/Thor.RabbitMQEvent/Thor.RabbitMQEvent.csproj @@ -0,0 +1,17 @@ + + + + net8.0 + enable + enable + + + + + + + + + + +