Skip to content

Commit

Permalink
增加Rabbit日志消费
Browse files Browse the repository at this point in the history
  • Loading branch information
239573049 committed Sep 4, 2024
1 parent fe8fbc3 commit d058b35
Show file tree
Hide file tree
Showing 10 changed files with 163 additions and 49 deletions.
7 changes: 7 additions & 0 deletions Thor.sln
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Thor.Moonshot", "src\extens
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Thor.Core", "src\framework\Thor.Core\Thor.Core.csproj", "{88790F95-50F1-4A17-8BE8-287D68E433CD}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Thor.RabbitMQEvent", "src\framework\Thor.RabbitMQEvent\Thor.RabbitMQEvent.csproj", "{C1F516B3-E06A-4F2A-8465-FC3D8C7645D3}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -131,6 +133,10 @@ Global
{88790F95-50F1-4A17-8BE8-287D68E433CD}.Debug|Any CPU.Build.0 = Debug|Any CPU
{88790F95-50F1-4A17-8BE8-287D68E433CD}.Release|Any CPU.ActiveCfg = Release|Any CPU
{88790F95-50F1-4A17-8BE8-287D68E433CD}.Release|Any CPU.Build.0 = Release|Any CPU
{C1F516B3-E06A-4F2A-8465-FC3D8C7645D3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{C1F516B3-E06A-4F2A-8465-FC3D8C7645D3}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C1F516B3-E06A-4F2A-8465-FC3D8C7645D3}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C1F516B3-E06A-4F2A-8465-FC3D8C7645D3}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -156,6 +162,7 @@ Global
{84C27CF3-BD3C-46D6-A774-967DB5D3F060} = {1035B36B-8194-43D4-8A4A-992D962CD1D8}
{4D98C74B-D071-430C-9956-3A9B8CF642DD} = {294B7A34-48F5-43AB-A1D3-033480559E3A}
{88790F95-50F1-4A17-8BE8-287D68E433CD} = {1035B36B-8194-43D4-8A4A-992D962CD1D8}
{C1F516B3-E06A-4F2A-8465-FC3D8C7645D3} = {1035B36B-8194-43D4-8A4A-992D962CD1D8}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {8A80931C-B951-4EF2-AC79-457E73118E5F}
Expand Down
53 changes: 5 additions & 48 deletions src/Thor.Service/EventBus/ChannelEventHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,69 +7,26 @@ namespace Thor.Service.EventBus;
public sealed class ChannelEventHandler : IEventHandler<ChatLogger>, IDisposable
{
private readonly ILogger<ChannelEventHandler> _logger;
private readonly IServiceProvider _serviceProvider;
private readonly IServiceScope _scope;
private readonly Timer _timer;
private readonly Channel<ChatLogger> _events = Channel.CreateUnbounded<ChatLogger>();
private readonly SemaphoreSlim _semaphore = new(1, 1);

private const int Interval = 30000;
private readonly LoggerDbContext _loggerDbContext;

public ChannelEventHandler(IServiceProvider serviceProvider, ILogger<ChannelEventHandler> logger)
{
_logger = logger;
_scope = serviceProvider.CreateScope();
_serviceProvider = _scope.ServiceProvider;
_timer = new Timer(Flush, null, Interval, Interval);
_loggerDbContext = _scope.ServiceProvider.GetRequiredService<LoggerDbContext>();
}

public async Task HandleAsync(ChatLogger @event)
{
await _events.Writer.WriteAsync(@event);
}

private async void Flush(object state)
{
await FlushAsync();
}

private async Task FlushAsync()
{
await _semaphore.WaitAsync();

try
{
var currentEvents = new List<ChatLogger>();

_logger.LogInformation("Flushing events to database");

while (_events.Reader.TryRead(out var value))
{
currentEvents.Add(value);
}

_logger.LogInformation($"Flushing {currentEvents.Count} events to database");

if (!currentEvents.Any())
{
return;
}

var loggerDbContext = _serviceProvider.GetRequiredService<LoggerDbContext>();
await loggerDbContext.Loggers.AddRangeAsync(currentEvents);
await loggerDbContext.SaveChangesAsync();
await _loggerDbContext.Loggers.AddAsync(@event);
await _loggerDbContext.SaveChangesAsync();

_logger.LogInformation("Events flushed to database");
}
finally
{
_semaphore.Release();
}
_logger.LogInformation("ChatLogger event received");
}

public void Dispose()
{
_timer.Dispose();
_scope.Dispose();
}
}
13 changes: 12 additions & 1 deletion src/Thor.Service/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
using Thor.Service.Service;
using Thor.SparkDesk.Extensions;
using Thor.Abstractions.Chats.Dtos;
using Thor.RabbitMQEvent;

try
{
Expand Down Expand Up @@ -62,12 +63,22 @@
builder.Services.AddRedisMemory(CacheOptions.ConnectionString);
}

var rabbitMQConnectionString = builder.Configuration["RabbitMQ:ConnectionString"];
if (!string.IsNullOrEmpty(rabbitMQConnectionString))
{
builder.Services.AddRabbitMQEventBus(builder.Configuration);
}
else
{
builder.Services.AddLocalEventBus();
}


builder.Services.AddMvcCore().AddApiExplorer();
builder.Services
.AddEndpointsApiExplorer()
.AddSwaggerGen()
.AddSingleton<IEventHandler<ChatLogger>, ChannelEventHandler>()
.AddLocalEventBus()
.AddCustomAuthentication()
.AddHttpContextAccessor()
.AddTransient<ProductService>()
Expand Down
1 change: 1 addition & 0 deletions src/Thor.Service/Thor.Service.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
<ProjectReference Include="..\framework\Thor.Core\Thor.Core.csproj" />
<ProjectReference Include="..\framework\Thor.LocalEvent\Thor.LocalEvent.csproj" />
<ProjectReference Include="..\framework\Thor.LocalMemory.Cache\Thor.LocalMemory.Cache.csproj" />
<ProjectReference Include="..\framework\Thor.RabbitMQEvent\Thor.RabbitMQEvent.csproj" />
<ProjectReference Include="..\framework\Thor.RedisMemory.Cache\Thor.RedisMemory.Cache.csproj" />
<ProjectReference Include="..\Thor.Abstractions\Thor.Abstractions.csproj" />
</ItemGroup>
Expand Down
4 changes: 4 additions & 0 deletions src/Thor.Service/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
"DBType": "sqlite"
// sqlite | [postgresql,pgsql] | [sqlserver,mssql] | mysql
},
"RabbitMQ": {
"ConnectionString": ""
// amqp://token:dd666666@localhost:5672
},
"Chat": {
"Master": "",
"Shared": {
Expand Down
14 changes: 14 additions & 0 deletions src/framework/Thor.RabbitMQEvent/EventEto.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
namespace Thor.RabbitMQEvent;

public class EventEto
{
public string FullName { get; set; }

public byte[] Data { get; set; }

public EventEto(string fullName, byte[] data)
{
FullName = fullName;
Data = data;
}
}
16 changes: 16 additions & 0 deletions src/framework/Thor.RabbitMQEvent/RabbitMQEventBus.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using System.Text.Json;
using Raccoon.Stack.Rabbit;
using Thor.BuildingBlocks.Data;

namespace Thor.RabbitMQEvent;

public class RabbitMQEventBus<TEvent>(RabbitClient rabbitClient) : IEventBus<TEvent> where TEvent : class
{
public async ValueTask PublishAsync(TEvent @event)
{
var eto = new EventEto(@event.GetType().FullName, JsonSerializer.SerializeToUtf8Bytes(@event));

await rabbitClient.PublishAsync("Thor:EventBus:exchange", "Thor:EventBus:key",
JsonSerializer.SerializeToUtf8Bytes(eto));
}
}
42 changes: 42 additions & 0 deletions src/framework/Thor.RabbitMQEvent/RabbitMQEventHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
using System.Reflection;
using System.Text.Json;
using Microsoft.Extensions.DependencyInjection;
using RabbitMQ.Client.Events;
using Raccoon.Stack.Rabbit;
using Raccoon.Stack.Rabbit.Handler;
using Thor.BuildingBlocks.Data;

namespace Thor.RabbitMQEvent;

public class RabbitMQEventHandler : IRabbitHandler
{
public bool Enable(ConsumeOptions options)
{
return options.Queue.Equals("Thor:EventBus", StringComparison.OrdinalIgnoreCase);
}

public async Task Handle(IServiceProvider sp, BasicDeliverEventArgs args, ConsumeOptions options)
{
var eto = JsonSerializer.Deserialize<EventEto>(args.Body.ToArray());

// type : Thor.Service.Domain.ChatLogger

var type = Assembly.GetEntryAssembly()?.GetType(eto.FullName);

if (type == null)
{
return;
}

var @event = JsonSerializer.Deserialize(eto.Data, type);

// IEventHandler<ChatLogger>
var handlerType = typeof(IEventHandler<>).MakeGenericType(type);

var handler = sp.GetRequiredService(handlerType);

var method = handlerType.GetMethod("HandleAsync");

await (Task)method.Invoke(handler, new object[] { @event });
}
}
45 changes: 45 additions & 0 deletions src/framework/Thor.RabbitMQEvent/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using RabbitMQ.Client;
using Raccoon.Stack.Rabbit;
using Thor.BuildingBlocks.Data;

namespace Thor.RabbitMQEvent;

public static class ServiceCollectionExtensions
{
public static IServiceCollection AddRabbitMQEventBus(this IServiceCollection services, IConfiguration configuration)
{
// 是否启用RabbitMQ
var connection = configuration["RabbitMQ:ConnectionString"];
if (string.IsNullOrWhiteSpace(connection))
{
return services;
}

services
.AddSingleton(typeof(IEventBus<>), typeof(RabbitMQEventBus<>));

services.AddRabbitBoot((options =>
{
options.ConnectionString = connection;
options.Consumes =
[
new ConsumeOptions
{
AutoAck = false,
FetchCount = 10,
Queue = "Thor:EventBus",
Declaration = (declaration =>
{
declaration.QueueDeclareAsync("Thor:EventBus", true);
declaration.ExchangeDeclareAsync("Thor:EventBus:exchange", ExchangeType.Direct, true);
declaration.QueueBindAsync("Thor:EventBus", "Thor:EventBus:exchange", "Thor:EventBus:key");
})
}
];
}), typeof(RabbitMQEventBus<>).Assembly);

return services;
}
}
17 changes: 17 additions & 0 deletions src/framework/Thor.RabbitMQEvent/Thor.RabbitMQEvent.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Raccoon.Stack.Rabbit" Version="0.0.1.2" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Thor.BuildingBlocks.Event\Thor.BuildingBlocks.Event.csproj" />
</ItemGroup>

</Project>

0 comments on commit d058b35

Please sign in to comment.