Skip to content

Commit

Permalink
add failback_cluster_invoker && failsafe_cluster_invoker. resolve #135
Browse files Browse the repository at this point in the history
  • Loading branch information
xujianhai666 committed Jul 24, 2019
1 parent 8b66663 commit 63432a2
Show file tree
Hide file tree
Showing 9 changed files with 2,501 additions and 72 deletions.
154 changes: 83 additions & 71 deletions cluster/cluster_impl/failback_cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,34 +19,40 @@ package cluster_impl

import (
"container/list"
perrors "github.com/pkg/errors"
"sync"
"time"
)

import (
perrors "github.com/pkg/errors"
)

import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol"
)

/**
* When fails, record failure requests and schedule for retry on a regular interval.
* Especially useful for services of notification.
*
* <a href="http://en.wikipedia.org/wiki/Failback">Failback</a>
*/
type failbackClusterInvoker struct {
baseClusterInvoker
}

var (
retries int64
failbackTasks int64
ticker *time.Ticker
once sync.Once
lock sync.Mutex
maxRetries int64
failbackTasks int64
taskList *Queue
)
}

func newFailbackClusterInvoker(directory cluster.Directory) protocol.Invoker {
invoker := &failbackClusterInvoker{
baseClusterInvoker: newBaseClusterInvoker(directory),
taskList: newQueue(),
}
retriesConfig := invoker.GetUrl().GetParamInt(constant.RETRIES_KEY, constant.DEFAULT_FAILBACK_TIMES)
if retriesConfig <= 0 {
Expand All @@ -56,11 +62,50 @@ func newFailbackClusterInvoker(directory cluster.Directory) protocol.Invoker {
if failbackTasksConfig <= 0 {
failbackTasksConfig = constant.DEFAULT_FAILBACK_TASKS
}
retries = retriesConfig
failbackTasks = failbackTasksConfig
invoker.maxRetries = retriesConfig
invoker.failbackTasks = failbackTasksConfig
go invoker.process()
return invoker
}

func (invoker *failbackClusterInvoker) process() {
invoker.ticker = time.NewTicker(time.Second * 1)
for range invoker.ticker.C {
check := true
// check each timeout task and re-run
for check {
value := invoker.taskList.peek()
if value == nil {
break
}

retryTask := value.(*retryTimerTask)
if time.Since(retryTask.lastT).Seconds() < 5 {
break
}

invoker.taskList.pop()

invoked := []protocol.Invoker{}
invoked = append(invoked, retryTask.lastInvoker)
retryInvoker := invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers, invoked)
var result protocol.Result
result = retryInvoker.Invoke(retryTask.invocation)
if result.Error() != nil {
perrors.Errorf("Failed retry to invoke the method %v in the service %v, wait again. The exception: %v.",
retryTask.invocation.MethodName(), invoker.GetUrl().Service(), result.Error().Error())
retryTask.retries++
if retryTask.retries > invoker.maxRetries {
perrors.Errorf("Failed retry times exceed threshold (%v), We have to abandon, invocation-> %v",
retryTask.retries, retryTask.invocation)
} else {
invoker.taskList.push(retryTask)
}
}
}
}
}

func (invoker *failbackClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result {

invokers := invoker.directory.List(invocation)
Expand Down Expand Up @@ -94,9 +139,11 @@ func (invoker *failbackClusterInvoker) Invoke(invocation protocol.Invocation) pr

if result.Error() != nil {
// add retry ticker task
addFailed(loadbalance, invocation, invokers, invoker)
timerTask := newRetryTimerTask(loadbalance, invocation, invokers, invoker, invoker.maxRetries, 5)
invoker.taskList.push(timerTask)

perrors.Errorf("Failback to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.",
methodName, invoker.GetUrl().Service(), result.Error().Error())
methodName, url.Service(), result.Error().Error())
// ignore
return &protocol.RPCResult{}
}
Expand All @@ -105,58 +152,10 @@ func (invoker *failbackClusterInvoker) Invoke(invocation protocol.Invocation) pr
}

func (invoker *failbackClusterInvoker) Destroy() {
//this is must atom operation
if invoker.destroyed.CAS(false, true) {
invoker.directory.Destroy()
}
// stop ticker
ticker.Stop()
}
invoker.baseClusterInvoker.Destroy()

func addFailed(balance cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker,
invoker *failbackClusterInvoker) {
initSingleTickerTaskInstance()
// init one retryTimerTask
timerTask := newRetryTimerTask(balance, invocation, invokers, invoker, retries, 5)
taskList.push(timerTask)
// process ticker task
go func() {
<-ticker.C
value := taskList.pop()
if value == nil {
return
}

retryTask := value.(retryTimerTask)
invoked := []protocol.Invoker{}
invoked = append(invoked, retryTask.lastInvoker)
retryInvoker := invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers,
invoked)
var result protocol.Result
result = retryInvoker.Invoke(retryTask.invocation)
if result.Error() != nil {
perrors.Errorf("Failed retry to invoke the method %v in the service %v, wait again. The exception: %v.",
invocation.MethodName(), invoker.GetUrl().Service(), result.Error().Error())
retryTask.retries++
if retryTask.retries > retries {
perrors.Errorf("Failed retry times exceed threshold (%v), We have to abandon, invocation-> %v",
retries, invocation)
} else {
taskList.push(retryTask)
}
}
}()
}

func initSingleTickerTaskInstance() {
once.Do(func() {
newTickerTask()
})
}

func newTickerTask() {
ticker = time.NewTicker(time.Second * 1)
taskList = newQueue()
// stop ticker
invoker.ticker.Stop()
}

type retryTimerTask struct {
Expand All @@ -165,7 +164,7 @@ type retryTimerTask struct {
invokers []protocol.Invoker
lastInvoker *failbackClusterInvoker
retries int64
tick int64
lastT time.Time
}

func newRetryTimerTask(loadbalance cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker,
Expand All @@ -176,31 +175,44 @@ func newRetryTimerTask(loadbalance cluster.LoadBalance, invocation protocol.Invo
invokers: invokers,
lastInvoker: lastInvoker,
retries: retries,
tick: tick,
lastT: time.Now(),
}
}

type Queue struct {
data *list.List
sync.Locker
}

func newQueue() *Queue {
q := new(Queue)
q.data = list.New()
q := &Queue{
data: list.New(),
Locker: new(sync.Mutex),
}
return q
}

func (q *Queue) push(v interface{}) {
defer lock.Unlock()
lock.Lock()
defer q.Unlock()
q.Lock()
q.data.PushFront(v)
}

func (q *Queue) pop() interface{} {
defer lock.Unlock()
lock.Lock()
defer q.Unlock()
q.Lock()
iter := q.data.Back()
v := iter.Value
q.data.Remove(iter)
return v
}

func (q *Queue) peek() interface{} {
defer q.Unlock()
q.Lock()
iter := q.data.Back()
if iter == nil {
return nil
}
return iter.Value
}
152 changes: 152 additions & 0 deletions cluster/cluster_impl/failback_cluster_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cluster_impl

import (
"context"
"testing"
"time"
)

import (
"github.com/golang/mock/gomock"
perrors "github.com/pkg/errors"
"github.com/stretchr/testify/assert"
)

import (
"github.com/apache/dubbo-go/cluster/directory"
"github.com/apache/dubbo-go/cluster/loadbalance"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
)

func TestQueue(t *testing.T) {
q := newQueue()
d0 := retryTimerTask{retries: 0}
d1 := retryTimerTask{retries: 1}
d2 := retryTimerTask{retries: 2}
q.push(d0)
q.push(d1)
q.push(d2)

d := q.peek()
assert.Equal(t, d, d0)
d = q.pop()
assert.Equal(t, d, d0)
d = q.pop()
assert.Equal(t, d, d1)
d = q.pop()
assert.Equal(t, d, d2)
}

var (
failbackUrl, _ = common.NewURL(context.TODO(), "dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider")
)

// register_failback register failbackCluster to cluster extension.
func register_failback(t *testing.T, invoker *protocol.MockInvoker) protocol.Invoker {
extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance)
failbackCluster := NewFailbackCluster()

invokers := []protocol.Invoker{}
invokers = append(invokers, invoker)

invoker.EXPECT().GetUrl().Return(failbackUrl)

staticDir := directory.NewStaticDirectory(invokers)
clusterInvoker := failbackCluster.Join(staticDir)
return clusterInvoker
}

func Test_FailbackSuceess(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

invoker := protocol.NewMockInvoker(ctrl)
clusterInvoker := register_failback(t, invoker).(*failbackClusterInvoker)

invoker.EXPECT().GetUrl().Return(failbackUrl).Times(1)

mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)

result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
assert.Equal(t, mockResult, result)
}

func Test_FailbackRetryOneSuccess(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

invoker := protocol.NewMockInvoker(ctrl)
clusterInvoker := register_failback(t, invoker).(*failbackClusterInvoker)

invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()

// failed at first
mockFailedResult := &protocol.RPCResult{Err: perrors.New("error")}
invoker.EXPECT().Invoke(gomock.Any()).Return(mockFailedResult)

// success second
mockSuccResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}
invoker.EXPECT().Invoke(gomock.Any()).Return(mockSuccResult)

result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
assert.Nil(t, result.Error())
assert.Nil(t, result.Result())
assert.Equal(t, 0, len(result.Attachments()))

// ensure the retry task has been executed
time.Sleep(6 * time.Second)
invoker.EXPECT().Destroy().Return()

clusterInvoker.Destroy()

assert.Equal(t, 0, clusterInvoker.taskList.data.Len())
}

func Test_FailbackRetryFailed(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

invoker := protocol.NewMockInvoker(ctrl)
clusterInvoker := register_failback(t, invoker).(*failbackClusterInvoker)

invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()

// failed always
mockFailedResult := &protocol.RPCResult{Err: perrors.New("error")}
invoker.EXPECT().Invoke(gomock.Any()).Return(mockFailedResult).AnyTimes()

result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
assert.Nil(t, result.Error())
assert.Nil(t, result.Result())
assert.Equal(t, 0, len(result.Attachments()))

time.Sleep(2 * time.Second) // for retry work.
invoker.EXPECT().Destroy().Return()

clusterInvoker.Destroy()

time.Sleep(1 * time.Second) // for retryTimerTask thrown back to queue

assert.Equal(t, 1, clusterInvoker.taskList.data.Len())
}
Loading

0 comments on commit 63432a2

Please sign in to comment.