From c3d70a7a8f2e26c85d3770185aa0e1603189da05 Mon Sep 17 00:00:00 2001 From: xujianhai666 Date: Wed, 24 Jul 2019 21:14:05 +0800 Subject: [PATCH] add failback_cluster_invoker && failsafe_cluster_invoker. resolve #135 --- .../cluster_impl/failback_cluster_invoker.go | 154 ++++++++++-------- cluster/cluster_impl/failback_cluster_test.go | 131 +++++++++++++++ .../cluster_impl/failsafe_cluster_invoker.go | 7 + cluster/cluster_impl/failsafe_cluster_test.go | 125 ++++++-------- 4 files changed, 270 insertions(+), 147 deletions(-) diff --git a/cluster/cluster_impl/failback_cluster_invoker.go b/cluster/cluster_impl/failback_cluster_invoker.go index 07792de0fd..bede154d3c 100644 --- a/cluster/cluster_impl/failback_cluster_invoker.go +++ b/cluster/cluster_impl/failback_cluster_invoker.go @@ -19,11 +19,14 @@ package cluster_impl import ( "container/list" - perrors "github.com/pkg/errors" "sync" "time" ) +import ( + perrors "github.com/pkg/errors" +) + import ( "github.com/apache/dubbo-go/cluster" "github.com/apache/dubbo-go/common/constant" @@ -31,22 +34,25 @@ import ( "github.com/apache/dubbo-go/protocol" ) +/** + * When fails, record failure requests and schedule for retry on a regular interval. + * Especially useful for services of notification. + * + * Failback + */ type failbackClusterInvoker struct { baseClusterInvoker -} -var ( - retries int64 - failbackTasks int64 ticker *time.Ticker - once sync.Once - lock sync.Mutex + maxRetries int64 + failbackTasks int64 taskList *Queue -) +} func newFailbackClusterInvoker(directory cluster.Directory) protocol.Invoker { invoker := &failbackClusterInvoker{ baseClusterInvoker: newBaseClusterInvoker(directory), + taskList: newQueue(), } retriesConfig := invoker.GetUrl().GetParamInt(constant.RETRIES_KEY, constant.DEFAULT_FAILBACK_TIMES) if retriesConfig <= 0 { @@ -56,11 +62,50 @@ func newFailbackClusterInvoker(directory cluster.Directory) protocol.Invoker { if failbackTasksConfig <= 0 { failbackTasksConfig = constant.DEFAULT_FAILBACK_TASKS } - retries = retriesConfig - failbackTasks = failbackTasksConfig + invoker.maxRetries = retriesConfig + invoker.failbackTasks = failbackTasksConfig + go invoker.process() return invoker } +func (invoker *failbackClusterInvoker) process() { + invoker.ticker = time.NewTicker(time.Second * 1) + for range invoker.ticker.C { + check := true + // check each timeout task and re-run + for check { + value := invoker.taskList.peek() + if value == nil { + break + } + + retryTask := value.(*retryTimerTask) + if time.Since(retryTask.lastT).Seconds() < 5 { + break + } + + invoker.taskList.pop() + + invoked := []protocol.Invoker{} + invoked = append(invoked, retryTask.lastInvoker) + retryInvoker := invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers, invoked) + var result protocol.Result + result = retryInvoker.Invoke(retryTask.invocation) + if result.Error() != nil { + perrors.Errorf("Failed retry to invoke the method %v in the service %v, wait again. The exception: %v.", + retryTask.invocation.MethodName(), invoker.GetUrl().Service(), result.Error().Error()) + retryTask.retries++ + if retryTask.retries > invoker.maxRetries { + perrors.Errorf("Failed retry times exceed threshold (%v), We have to abandon, invocation-> %v", + retryTask.retries, retryTask.invocation) + } else { + invoker.taskList.push(retryTask) + } + } + } + } +} + func (invoker *failbackClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result { invokers := invoker.directory.List(invocation) @@ -94,9 +139,11 @@ func (invoker *failbackClusterInvoker) Invoke(invocation protocol.Invocation) pr if result.Error() != nil { // add retry ticker task - addFailed(loadbalance, invocation, invokers, invoker) + timerTask := newRetryTimerTask(loadbalance, invocation, invokers, invoker, invoker.maxRetries, 5) + invoker.taskList.push(timerTask) + perrors.Errorf("Failback to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.", - methodName, invoker.GetUrl().Service(), result.Error().Error()) + methodName, url.Service(), result.Error().Error()) // ignore return &protocol.RPCResult{} } @@ -105,58 +152,10 @@ func (invoker *failbackClusterInvoker) Invoke(invocation protocol.Invocation) pr } func (invoker *failbackClusterInvoker) Destroy() { - //this is must atom operation - if invoker.destroyed.CAS(false, true) { - invoker.directory.Destroy() - } - // stop ticker - ticker.Stop() -} + invoker.baseClusterInvoker.Destroy() -func addFailed(balance cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, - invoker *failbackClusterInvoker) { - initSingleTickerTaskInstance() - // init one retryTimerTask - timerTask := newRetryTimerTask(balance, invocation, invokers, invoker, retries, 5) - taskList.push(timerTask) - // process ticker task - go func() { - <-ticker.C - value := taskList.pop() - if value == nil { - return - } - - retryTask := value.(retryTimerTask) - invoked := []protocol.Invoker{} - invoked = append(invoked, retryTask.lastInvoker) - retryInvoker := invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers, - invoked) - var result protocol.Result - result = retryInvoker.Invoke(retryTask.invocation) - if result.Error() != nil { - perrors.Errorf("Failed retry to invoke the method %v in the service %v, wait again. The exception: %v.", - invocation.MethodName(), invoker.GetUrl().Service(), result.Error().Error()) - retryTask.retries++ - if retryTask.retries > retries { - perrors.Errorf("Failed retry times exceed threshold (%v), We have to abandon, invocation-> %v", - retries, invocation) - } else { - taskList.push(retryTask) - } - } - }() -} - -func initSingleTickerTaskInstance() { - once.Do(func() { - newTickerTask() - }) -} - -func newTickerTask() { - ticker = time.NewTicker(time.Second * 1) - taskList = newQueue() + // stop ticker + invoker.ticker.Stop() } type retryTimerTask struct { @@ -165,7 +164,7 @@ type retryTimerTask struct { invokers []protocol.Invoker lastInvoker *failbackClusterInvoker retries int64 - tick int64 + lastT time.Time } func newRetryTimerTask(loadbalance cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker, @@ -176,31 +175,44 @@ func newRetryTimerTask(loadbalance cluster.LoadBalance, invocation protocol.Invo invokers: invokers, lastInvoker: lastInvoker, retries: retries, - tick: tick, + lastT: time.Now(), } } type Queue struct { data *list.List + sync.Locker } func newQueue() *Queue { - q := new(Queue) - q.data = list.New() + q := &Queue{ + data: list.New(), + Locker: new(sync.Mutex), + } return q } func (q *Queue) push(v interface{}) { - defer lock.Unlock() - lock.Lock() + defer q.Unlock() + q.Lock() q.data.PushFront(v) } func (q *Queue) pop() interface{} { - defer lock.Unlock() - lock.Lock() + defer q.Unlock() + q.Lock() iter := q.data.Back() v := iter.Value q.data.Remove(iter) return v } + +func (q *Queue) peek() interface{} { + defer q.Unlock() + q.Lock() + iter := q.data.Back() + if iter == nil { + return nil + } + return iter.Value +} diff --git a/cluster/cluster_impl/failback_cluster_test.go b/cluster/cluster_impl/failback_cluster_test.go index 57dec473c2..d482dc77fa 100644 --- a/cluster/cluster_impl/failback_cluster_test.go +++ b/cluster/cluster_impl/failback_cluster_test.go @@ -17,5 +17,136 @@ limitations under the License. package cluster_impl +import ( + "context" + "testing" + "time" +) +import ( + "github.com/golang/mock/gomock" + perrors "github.com/pkg/errors" + "github.com/stretchr/testify/assert" +) +import ( + "github.com/apache/dubbo-go/cluster/directory" + "github.com/apache/dubbo-go/cluster/loadbalance" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/invocation" +) + +func TestQueue(t *testing.T) { + q := newQueue() + d0 := retryTimerTask{retries: 0} + d1 := retryTimerTask{retries: 1} + d2 := retryTimerTask{retries: 2} + q.push(d0) + q.push(d1) + q.push(d2) + + d := q.peek() + assert.Equal(t, d, d0) + d = q.pop() + assert.Equal(t, d, d0) + d = q.pop() + assert.Equal(t, d, d1) + d = q.pop() + assert.Equal(t, d, d2) +} + +var ( + failbackUrl, _ = common.NewURL(context.TODO(), "dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider") +) + +// register_failback register failbackCluster to cluster extension. +func register_failback(t *testing.T, invoker *protocol.MockInvoker) protocol.Invoker { + extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) + failbackCluster := NewFailbackCluster() + + invokers := []protocol.Invoker{} + invokers = append(invokers, invoker) + + invoker.EXPECT().GetUrl().Return(failbackUrl) + + staticDir := directory.NewStaticDirectory(invokers) + clusterInvoker := failbackCluster.Join(staticDir) + return clusterInvoker +} + +func Test_FailbackSuceess(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invoker := protocol.NewMockInvoker(ctrl) + clusterInvoker := register_failback(t, invoker).(*failbackClusterInvoker) + + invoker.EXPECT().GetUrl().Return(failbackUrl).Times(1) + + mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} + invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult) + + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + assert.Equal(t, mockResult, result) +} + +func Test_FailbackRetryOneSuccess(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invoker := protocol.NewMockInvoker(ctrl) + clusterInvoker := register_failback(t, invoker).(*failbackClusterInvoker) + + invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes() + + // failed at first + mockFailedResult := &protocol.RPCResult{Err: perrors.New("error"),} + invoker.EXPECT().Invoke(gomock.Any()).Return(mockFailedResult) + + // success second + mockSuccResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} + invoker.EXPECT().Invoke(gomock.Any()).Return(mockSuccResult) + + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + assert.Nil(t, result.Error()) + assert.Nil(t, result.Result()) + assert.Equal(t, 0, len(result.Attachments())) + + // ensure the retry task has been executed + time.Sleep(6 * time.Second) + invoker.EXPECT().Destroy().Return() + + clusterInvoker.Destroy() + + assert.Equal(t, 0, clusterInvoker.taskList.data.Len()) +} + +func Test_FailbackRetryFailed(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invoker := protocol.NewMockInvoker(ctrl) + clusterInvoker := register_failback(t, invoker).(*failbackClusterInvoker) + + invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes() + + // failed always + mockFailedResult := &protocol.RPCResult{Err: perrors.New("error"),} + invoker.EXPECT().Invoke(gomock.Any()).Return(mockFailedResult).AnyTimes() + + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + assert.Nil(t, result.Error()) + assert.Nil(t, result.Result()) + assert.Equal(t, 0, len(result.Attachments())) + + time.Sleep(2 * time.Second) // for retry work. + invoker.EXPECT().Destroy().Return() + + clusterInvoker.Destroy() + + time.Sleep( 1 * time.Second) // for retryTimerTask thrown back to queue + + assert.Equal(t, 1, clusterInvoker.taskList.data.Len()) +} diff --git a/cluster/cluster_impl/failsafe_cluster_invoker.go b/cluster/cluster_impl/failsafe_cluster_invoker.go index a006e918a2..9d5c55e526 100644 --- a/cluster/cluster_impl/failsafe_cluster_invoker.go +++ b/cluster/cluster_impl/failsafe_cluster_invoker.go @@ -28,6 +28,13 @@ import ( "github.com/apache/dubbo-go/protocol" ) +/** + * When invoke fails, log the error message and ignore this error by returning an empty Result. + * Usually used to write audit logs and other operations + * + * Fail-safe + * + */ type failsafeClusterInvoker struct { baseClusterInvoker } diff --git a/cluster/cluster_impl/failsafe_cluster_test.go b/cluster/cluster_impl/failsafe_cluster_test.go index 7f050067b2..939b18160f 100644 --- a/cluster/cluster_impl/failsafe_cluster_test.go +++ b/cluster/cluster_impl/failsafe_cluster_test.go @@ -18,104 +18,77 @@ limitations under the License. package cluster_impl import ( - "context" - "fmt" - "net/url" - "testing" - - "github.com/apache/dubbo-go/common/logger" - "github.com/stretchr/testify/assert" + "context" + "testing" ) import ( - perrors "github.com/pkg/errors" + "github.com/golang/mock/gomock" + perrors "github.com/pkg/errors" + "github.com/stretchr/testify/assert" ) import ( - "github.com/apache/dubbo-go/cluster/directory" - "github.com/apache/dubbo-go/cluster/loadbalance" - "github.com/apache/dubbo-go/common" - "github.com/apache/dubbo-go/common/extension" - "github.com/apache/dubbo-go/protocol" - "github.com/apache/dubbo-go/protocol/invocation" + "github.com/apache/dubbo-go/cluster/directory" + "github.com/apache/dubbo-go/cluster/loadbalance" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/invocation" ) -///////////////////////////// -// mock invoker -///////////////////////////// +var ( + failsafeUrl, _ = common.NewURL(context.TODO(), "dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider") +) -var returnsuccess bool +// register_failsafe register failsafeCluster to cluster extension. +func register_failsafe(t *testing.T, invoker *protocol.MockInvoker) protocol.Invoker { + extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) + failsafeCluster := NewFailsafeCluster() -type FailsafeMockInvoker struct { - url common.URL - available bool - destroyed bool -} + invokers := []protocol.Invoker{} + invokers = append(invokers, invoker) -func NewFailsafeMockInvoker(url common.URL) *FailsafeMockInvoker { - return &FailsafeMockInvoker{ - url: url, - available: true, - destroyed: false, - } -} + invoker.EXPECT().GetUrl().Return(failbackUrl) -func (bi *FailsafeMockInvoker) GetUrl() common.URL { - return bi.url + staticDir := directory.NewStaticDirectory(invokers) + clusterInvoker := failsafeCluster.Join(staticDir) + return clusterInvoker } -func (bi *FailsafeMockInvoker) IsAvailable() bool { - return bi.available -} +func Test_FailSafeInvokeSuccess(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() -func (bi *FailsafeMockInvoker) IsDestroyed() bool { - return bi.destroyed -} + invoker := protocol.NewMockInvoker(ctrl) + clusterInvoker := register_failsafe(t, invoker) -func (bi *FailsafeMockInvoker) Invoke(invocation protocol.Invocation) protocol.Result { - var err error - if !returnsuccess { - err = perrors.New("error") - } - result := &protocol.RPCResult{Err: err, Rest: rest{tried: 0, success: returnsuccess}} + invoker.EXPECT().GetUrl().Return(failsafeUrl) - return result -} + mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} -func (bi *FailsafeMockInvoker) Destroy() { - logger.Infof("Destroy invoker: %v", bi.GetUrl().String()) - bi.destroyed = true - bi.available = false + invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult) + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + + assert.NoError(t, result.Error()) + res := result.Result().(rest) + assert.True(t, res.success) } -func failSafeInvoke(t *testing.T, urlParam url.Values) protocol.Result { - extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) - failsafeCluster := NewFailsafeCluster() +func Test_FailSafeInvokeFail(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() - invokers := []protocol.Invoker{} - for i := 0; i < 10; i++ { - url, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://192.168.1.%v:20000/com.ikurento.user.UserProvider", i), common.WithParams(urlParam)) - invokers = append(invokers, NewFailsafeMockInvoker(url)) - } + invoker := protocol.NewMockInvoker(ctrl) + clusterInvoker := register_failsafe(t, invoker) - staticDir := directory.NewStaticDirectory(invokers) - clusterInvoker := failsafeCluster.Join(staticDir) - return clusterInvoker.Invoke(&invocation.RPCInvocation{}) -} + invoker.EXPECT().GetUrl().Return(failsafeUrl) -func Test_FailSafeInvokeSuccess(t *testing.T) { - returnsuccess = true - urlParams := url.Values{} - result := failSafeInvoke(t, urlParams) - assert.NoError(t, result.Error()) - res := result.Result().(rest) - assert.True(t, res.success) -} + mockResult := &protocol.RPCResult{Err: perrors.New("error"),} -func Test_FailSafeInvokeFail(t *testing.T) { - returnsuccess = false - urlParams := url.Values{} - result := failSafeInvoke(t, urlParams) - assert.NoError(t, result.Error()) - assert.Nil(t, result.Result()) + invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult) + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + + assert.NoError(t, result.Error()) + assert.Nil(t, result.Result()) }