Skip to content

Commit

Permalink
优化异常逻辑
Browse files Browse the repository at this point in the history
  • Loading branch information
239573049 committed Dec 7, 2024
1 parent d71b21e commit 7c408e9
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 15 deletions.
20 changes: 13 additions & 7 deletions src/Thor.Service/Infrastructure/CircuitBreaker.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using Thor.Abstractions.Exceptions;

namespace Thor.Core;
namespace Thor.Service.Infrastructure;

public class CircuitBreaker(int failureThreshold, TimeSpan openTimeSpan)
{
Expand All @@ -9,7 +9,8 @@ public class CircuitBreaker(int failureThreshold, TimeSpan openTimeSpan)
private CircuitBreakerState _state = CircuitBreakerState.Closed;
private DateTime _nextRetryTime = DateTime.MinValue;

public async ValueTask ExecuteAsync(Func<Task> action, int maxAttempts, int delay = 500)
public async ValueTask<Exception?> ExecuteAsync(Func<Task> action, int maxAttempts, int delay = 500,
Action<Exception>? errorAction = null)
{
if (_state == CircuitBreakerState.Open && DateTime.UtcNow >= _nextRetryTime)
{
Expand All @@ -36,16 +37,17 @@ public async ValueTask ExecuteAsync(Func<Task> action, int maxAttempts, int dela
_state = CircuitBreakerState.Closed;
}

return; // Exit if action is successful
}
catch (ThorRateLimitException)
{
throw new ThorRateLimitException();
return null; // Exit if action is successful
}
catch (Exception ex)
{
Console.WriteLine($"Attempt {attempts} failed: {ex}");

if (ex is ThorRateLimitException or UnauthorizedAccessException)
{
return ex;
}

_failureCount++;
if (_failureCount >= failureThreshold)
{
Expand All @@ -57,10 +59,14 @@ public async ValueTask ExecuteAsync(Func<Task> action, int maxAttempts, int dela
{
throw; //
}

errorAction?.Invoke(ex);
}

await Task.Delay(delay); // 重试延迟,避免瞬间大量请求
}

return null;
}

private enum CircuitBreakerState
Expand Down
17 changes: 15 additions & 2 deletions src/Thor.Service/Service/ChannelService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
using Thor.Abstractions.Chats;
using Thor.Abstractions.Chats.Consts;
using Thor.Abstractions.Chats.Dtos;
using Thor.Abstractions.Exceptions;
using Thor.Abstractions.ObjectModels.ObjectModels.RequestModels;
using Thor.AzureOpenAI;
using Thor.Claude;
using Thor.Core;
using Thor.Hunyuan;
using Thor.OpenAI;
using Thor.Service.Infrastructure;
using Thor.SparkDesk;

namespace Thor.Service.Service;
Expand All @@ -31,7 +33,8 @@ public sealed class ChannelService(IServiceProvider serviceProvider, IMapper map
public async Task<ChatChannel[]> GetChannelsAsync()
{
return await cache.GetOrCreateAsync(CacheKey,
async () => { return await DbContext.Channels.AsNoTracking().Where(x => !x.Disable).ToArrayAsync(); }, isLock: false);
async () => { return await DbContext.Channels.AsNoTracking().Where(x => !x.Disable).ToArrayAsync(); },
isLock: false);
}

/// <summary>
Expand Down Expand Up @@ -255,12 +258,22 @@ await DbContext.Channels

ThorChatCompletionsResponse response = null;

await circuitBreaker.ExecuteAsync(async () =>
var result = await circuitBreaker.ExecuteAsync(async () =>
{
response = await chatCompletionsService.ChatCompletionsAsync(chatRequest, platformOptions,
token.Token);
}, 3).ConfigureAwait(false);

if (result is ThorRateLimitException)
{
throw new ChannelException("请求过于频繁,请稍后再试");
}

if (result is UnauthorizedAccessException)
{
throw new ChannelException("未授权");
}

sw.Stop();

// 更新渠道测试响应时间
Expand Down
49 changes: 44 additions & 5 deletions src/Thor.Service/Service/ChatService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
using Thor.Abstractions.Realtime.Dto;
using Thor.Core;
using Thor.Service.Extensions;
using Thor.Service.Infrastructure;

namespace Thor.Service.Service;

Expand Down Expand Up @@ -496,6 +497,7 @@ await userService.ConsumeAsync(user!.Id, (long)quota, requestToken, token?.Key,
}
catch (ThorRateLimitException)
{
logger.LogWarning("对话模型请求限流:{rateLimit}", rateLimit);
rateLimit++;
// TODO:限流重试次数
if (rateLimit > 3)
Expand Down Expand Up @@ -804,9 +806,19 @@ await userService.ConsumeAsync(user!.Id, (long)quota, (int)requestToken, token?.

ThorChatCompletionsResponse result = null;

await circuitBreaker.ExecuteAsync(
var err = await circuitBreaker.ExecuteAsync(
async () => { result = await openService.ChatCompletionsAsync(request, platformOptions); }, 3);

if (err is ThorRateLimitException)
{
throw new ThorRateLimitException();
}

if (err is UnauthorizedAccessException)
{
throw new UnauthorizedAccessException("未授权");
}

await context.Response.WriteAsJsonAsync(result);

if (result?.Usage?.PromptTokens is not null && result.Usage.PromptTokens > 0)
Expand Down Expand Up @@ -839,9 +851,19 @@ await circuitBreaker.ExecuteAsync(

ThorChatCompletionsResponse result = null;

await circuitBreaker.ExecuteAsync(
var err = await circuitBreaker.ExecuteAsync(
async () => { result = await openService.ChatCompletionsAsync(request, platformOptions); }, 3);

if (err is ThorRateLimitException)
{
throw new ThorRateLimitException();
}

if (err is UnauthorizedAccessException)
{
throw new UnauthorizedAccessException("未授权");
}

await context.Response.WriteAsJsonAsync(result);

responseToken = TokenHelper.GetTokens(result.Choices.FirstOrDefault()?.Delta.Content ?? string.Empty);
Expand Down Expand Up @@ -894,8 +916,6 @@ await circuitBreaker.ExecuteAsync(

var responseMessage = new StringBuilder();

context.SetEventStreamHeaders();

if (input.Messages.Any(x => x.Contents != null))
{
requestToken = TokenHelper.GetTotalTokens(input?.Messages.Where(x => x.Contents != null)
Expand Down Expand Up @@ -964,12 +984,21 @@ await circuitBreaker.ExecuteAsync(
.Select(x => x.Function!.Type!).ToArray());
}

// 是否第一次输出
bool isFirst = true;

var circuitBreaker = new CircuitBreaker(3, TimeSpan.FromSeconds(10));
await circuitBreaker.ExecuteAsync(
var err = await circuitBreaker.ExecuteAsync(
async () =>
{
await foreach (var item in openService.StreamChatCompletionsAsync(input, platformOptions))
{
if (isFirst)
{
context.SetEventStreamHeaders();
isFirst = false;
}

if (item.Error != null)
{
await context.WriteStreamErrorAsync(item.Error.Message);
Expand Down Expand Up @@ -1006,6 +1035,16 @@ await circuitBreaker.ExecuteAsync(
}
}, 3);

if (err is ThorRateLimitException)
{
throw new ThorRateLimitException();
}

if (err is UnauthorizedAccessException)
{
throw new UnauthorizedAccessException();
}

await context.WriteAsEventStreamEndAsync();

var responseToken = TokenHelper.GetTokens(responseMessage.ToString());
Expand Down
2 changes: 1 addition & 1 deletion src/Thor.Service/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
}
},
"Cache": {
"Type": "Redis",
"Type": "Memory",
"ConnectionString": "localhost:6379"
},
"Jwt": {
Expand Down

0 comments on commit 7c408e9

Please sign in to comment.