-
Notifications
You must be signed in to change notification settings - Fork 0
/
ChannelCircuitBreaker.cs
122 lines (100 loc) · 3.46 KB
/
ChannelCircuitBreaker.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
using System;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
using Grpc.Core;
using UniRx.Async;
namespace _Script.Application.Utility
{
public class ChannelCircuitBreaker<T> where T : ClientBase<T>
{
private enum CircuitState { Open, Closed }
public bool IsConnected => channel.State == ChannelState.Idle;
public DateTime LastFailureTime { get; private set; }
private int failedThreshold;
private int failureCount;
private float invocationTimeout;
private Action<Channel> connectAction;
private Channel channel;
private CircuitState State => failureCount >= failedThreshold ? CircuitState.Open : CircuitState.Closed;
public ChannelCircuitBreaker(Channel channel,int failedThreshold = 5, float invocationTimeout = 0.2f)
{
this.channel = channel;
this.failedThreshold = failedThreshold;
this.invocationTimeout = invocationTimeout;
}
public ChannelCircuitBreaker<T> Create(in Action<Channel> func)
{
connectAction = Unsafe.AsRef(func);
return this;
}
public async UniTask Execute()
{
if(channel.State == ChannelState.Ready)
return;
switch (State)
{
case CircuitState.Open:
{
LastFailureTime = DateTime.Now;
throw new TimeoutException("接続できません。");
}
case CircuitState.Closed:
try
{
await CheckConnect();
Reset();
}
catch
{
RecordFailure();
await Execute();
}
break;
default:
throw new ArgumentOutOfRangeException();
}
async UniTask CheckConnect()
{
await channel
.ConnectAsync()
.Timeout(TimeSpan.FromSeconds(invocationTimeout));
connectAction(channel);
}
}
public ChannelCircuitBreaker<T> FailedThreshold(int threshold)
{
if (threshold > 0)
failedThreshold = threshold;
return this;
}
public ChannelCircuitBreaker<T> InvocationTimeout(float second)
{
if (second > 0f)
invocationTimeout = second;
return this;
}
public async UniTask Retry()
{
if(State == CircuitState.Closed)
return;
Reset();
await Execute();
}
private void RecordFailure() => ++failureCount;
private void Reset() => failureCount = 0;
}
public static class TaskExtensions
{
public static async Task Timeout(this Task task, TimeSpan timeout)
{
var delay = Task.Delay(timeout);
if (await Task.WhenAny(task, delay) == delay)
throw new TimeoutException();
}
public static async Task<T> Timeout<T>(this Task<T> task, TimeSpan timeout)
{
await ((Task)task).Timeout(timeout);
return await task;
}
}
}