Skip to content

Commit

Permalink
add failback_cluster_invoker && failsafe_cluster_invoker. resolve apa…
Browse files Browse the repository at this point in the history
  • Loading branch information
xujianhai666 committed Jul 24, 2019
1 parent 8bba410 commit 7f25ae9
Show file tree
Hide file tree
Showing 4 changed files with 270 additions and 147 deletions.
154 changes: 83 additions & 71 deletions cluster/cluster_impl/failback_cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,34 +19,40 @@ package cluster_impl

import (
"container/list"
perrors "github.com/pkg/errors"
"sync"
"time"
)

import (
perrors "github.com/pkg/errors"
)

import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol"
)

/**
* When fails, record failure requests and schedule for retry on a regular interval.
* Especially useful for services of notification.
*
* <a href="http://en.wikipedia.org/wiki/Failback">Failback</a>
*/
type failbackClusterInvoker struct {
baseClusterInvoker
}

var (
retries int64
failbackTasks int64
ticker *time.Ticker
once sync.Once
lock sync.Mutex
maxRetries int64
failbackTasks int64
taskList *Queue
)
}

func newFailbackClusterInvoker(directory cluster.Directory) protocol.Invoker {
invoker := &failbackClusterInvoker{
baseClusterInvoker: newBaseClusterInvoker(directory),
taskList: newQueue(),
}
retriesConfig := invoker.GetUrl().GetParamInt(constant.RETRIES_KEY, constant.DEFAULT_FAILBACK_TIMES)
if retriesConfig <= 0 {
Expand All @@ -56,11 +62,50 @@ func newFailbackClusterInvoker(directory cluster.Directory) protocol.Invoker {
if failbackTasksConfig <= 0 {
failbackTasksConfig = constant.DEFAULT_FAILBACK_TASKS
}
retries = retriesConfig
failbackTasks = failbackTasksConfig
invoker.maxRetries = retriesConfig
invoker.failbackTasks = failbackTasksConfig
go invoker.process()
return invoker
}

func (invoker *failbackClusterInvoker) process() {
invoker.ticker = time.NewTicker(time.Second * 1)
for range invoker.ticker.C {
check := true
// check each timeout task and re-run
for check {
value := invoker.taskList.peek()
if value == nil {
break
}

retryTask := value.(*retryTimerTask)
if time.Since(retryTask.lastT).Seconds() < 5 {
break
}

invoker.taskList.pop()

invoked := []protocol.Invoker{}
invoked = append(invoked, retryTask.lastInvoker)
retryInvoker := invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers, invoked)
var result protocol.Result
result = retryInvoker.Invoke(retryTask.invocation)
if result.Error() != nil {
perrors.Errorf("Failed retry to invoke the method %v in the service %v, wait again. The exception: %v.",
retryTask.invocation.MethodName(), invoker.GetUrl().Service(), result.Error().Error())
retryTask.retries++
if retryTask.retries > invoker.maxRetries {
perrors.Errorf("Failed retry times exceed threshold (%v), We have to abandon, invocation-> %v",
retryTask.retries, retryTask.invocation)
} else {
invoker.taskList.push(retryTask)
}
}
}
}
}

func (invoker *failbackClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result {

invokers := invoker.directory.List(invocation)
Expand Down Expand Up @@ -94,9 +139,11 @@ func (invoker *failbackClusterInvoker) Invoke(invocation protocol.Invocation) pr

if result.Error() != nil {
// add retry ticker task
addFailed(loadbalance, invocation, invokers, invoker)
timerTask := newRetryTimerTask(loadbalance, invocation, invokers, invoker, invoker.maxRetries, 5)
invoker.taskList.push(timerTask)

perrors.Errorf("Failback to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.",
methodName, invoker.GetUrl().Service(), result.Error().Error())
methodName, url.Service(), result.Error().Error())
// ignore
return &protocol.RPCResult{}
}
Expand All @@ -105,58 +152,10 @@ func (invoker *failbackClusterInvoker) Invoke(invocation protocol.Invocation) pr
}

func (invoker *failbackClusterInvoker) Destroy() {
//this is must atom operation
if invoker.destroyed.CAS(false, true) {
invoker.directory.Destroy()
}
// stop ticker
ticker.Stop()
}
invoker.baseClusterInvoker.Destroy()

func addFailed(balance cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker,
invoker *failbackClusterInvoker) {
initSingleTickerTaskInstance()
// init one retryTimerTask
timerTask := newRetryTimerTask(balance, invocation, invokers, invoker, retries, 5)
taskList.push(timerTask)
// process ticker task
go func() {
<-ticker.C
value := taskList.pop()
if value == nil {
return
}

retryTask := value.(retryTimerTask)
invoked := []protocol.Invoker{}
invoked = append(invoked, retryTask.lastInvoker)
retryInvoker := invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers,
invoked)
var result protocol.Result
result = retryInvoker.Invoke(retryTask.invocation)
if result.Error() != nil {
perrors.Errorf("Failed retry to invoke the method %v in the service %v, wait again. The exception: %v.",
invocation.MethodName(), invoker.GetUrl().Service(), result.Error().Error())
retryTask.retries++
if retryTask.retries > retries {
perrors.Errorf("Failed retry times exceed threshold (%v), We have to abandon, invocation-> %v",
retries, invocation)
} else {
taskList.push(retryTask)
}
}
}()
}

func initSingleTickerTaskInstance() {
once.Do(func() {
newTickerTask()
})
}

func newTickerTask() {
ticker = time.NewTicker(time.Second * 1)
taskList = newQueue()
// stop ticker
invoker.ticker.Stop()
}

type retryTimerTask struct {
Expand All @@ -165,7 +164,7 @@ type retryTimerTask struct {
invokers []protocol.Invoker
lastInvoker *failbackClusterInvoker
retries int64
tick int64
lastT time.Time
}

func newRetryTimerTask(loadbalance cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker,
Expand All @@ -176,31 +175,44 @@ func newRetryTimerTask(loadbalance cluster.LoadBalance, invocation protocol.Invo
invokers: invokers,
lastInvoker: lastInvoker,
retries: retries,
tick: tick,
lastT: time.Now(),
}
}

type Queue struct {
data *list.List
sync.Locker
}

func newQueue() *Queue {
q := new(Queue)
q.data = list.New()
q := &Queue{
data: list.New(),
Locker: new(sync.Mutex),
}
return q
}

func (q *Queue) push(v interface{}) {
defer lock.Unlock()
lock.Lock()
defer q.Unlock()
q.Lock()
q.data.PushFront(v)
}

func (q *Queue) pop() interface{} {
defer lock.Unlock()
lock.Lock()
defer q.Unlock()
q.Lock()
iter := q.data.Back()
v := iter.Value
q.data.Remove(iter)
return v
}

func (q *Queue) peek() interface{} {
defer q.Unlock()
q.Lock()
iter := q.data.Back()
if iter == nil {
return nil
}
return iter.Value
}
131 changes: 131 additions & 0 deletions cluster/cluster_impl/failback_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,136 @@ limitations under the License.

package cluster_impl

import (
"context"
"testing"
"time"
)

import (
"github.com/golang/mock/gomock"
perrors "github.com/pkg/errors"
"github.com/stretchr/testify/assert"
)

import (
"github.com/apache/dubbo-go/cluster/directory"
"github.com/apache/dubbo-go/cluster/loadbalance"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
)

func TestQueue(t *testing.T) {
q := newQueue()
d0 := retryTimerTask{retries: 0}
d1 := retryTimerTask{retries: 1}
d2 := retryTimerTask{retries: 2}
q.push(d0)
q.push(d1)
q.push(d2)

d := q.peek()
assert.Equal(t, d, d0)
d = q.pop()
assert.Equal(t, d, d0)
d = q.pop()
assert.Equal(t, d, d1)
d = q.pop()
assert.Equal(t, d, d2)
}

var (
failbackUrl, _ = common.NewURL(context.TODO(), "dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider")
)

// register_failback register failbackCluster to cluster extension.
func register_failback(t *testing.T, invoker *protocol.MockInvoker) protocol.Invoker {
extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance)
failbackCluster := NewFailbackCluster()

invokers := []protocol.Invoker{}
invokers = append(invokers, invoker)

invoker.EXPECT().GetUrl().Return(failbackUrl)

staticDir := directory.NewStaticDirectory(invokers)
clusterInvoker := failbackCluster.Join(staticDir)
return clusterInvoker
}

func Test_FailbackSuceess(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

invoker := protocol.NewMockInvoker(ctrl)
clusterInvoker := register_failback(t, invoker).(*failbackClusterInvoker)

invoker.EXPECT().GetUrl().Return(failbackUrl).Times(1)

mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)

result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
assert.Equal(t, mockResult, result)
}

func Test_FailbackRetryOneSuccess(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

invoker := protocol.NewMockInvoker(ctrl)
clusterInvoker := register_failback(t, invoker).(*failbackClusterInvoker)

invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()

// failed at first
mockFailedResult := &protocol.RPCResult{Err: perrors.New("error"),}
invoker.EXPECT().Invoke(gomock.Any()).Return(mockFailedResult)

// success second
mockSuccResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}}
invoker.EXPECT().Invoke(gomock.Any()).Return(mockSuccResult)

result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
assert.Nil(t, result.Error())
assert.Nil(t, result.Result())
assert.Equal(t, 0, len(result.Attachments()))

// ensure the retry task has been executed
time.Sleep(6 * time.Second)
invoker.EXPECT().Destroy().Return()

clusterInvoker.Destroy()

assert.Equal(t, 0, clusterInvoker.taskList.data.Len())
}

func Test_FailbackRetryFailed(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

invoker := protocol.NewMockInvoker(ctrl)
clusterInvoker := register_failback(t, invoker).(*failbackClusterInvoker)

invoker.EXPECT().GetUrl().Return(failbackUrl).AnyTimes()

// failed always
mockFailedResult := &protocol.RPCResult{Err: perrors.New("error"),}
invoker.EXPECT().Invoke(gomock.Any()).Return(mockFailedResult).AnyTimes()

result := clusterInvoker.Invoke(&invocation.RPCInvocation{})
assert.Nil(t, result.Error())
assert.Nil(t, result.Result())
assert.Equal(t, 0, len(result.Attachments()))

time.Sleep(2 * time.Second) // for retry work.
invoker.EXPECT().Destroy().Return()

clusterInvoker.Destroy()

time.Sleep( 1 * time.Second) // for retryTimerTask thrown back to queue

assert.Equal(t, 1, clusterInvoker.taskList.data.Len())
}
7 changes: 7 additions & 0 deletions cluster/cluster_impl/failsafe_cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ import (
"github.com/apache/dubbo-go/protocol"
)

/**
* When invoke fails, log the error message and ignore this error by returning an empty Result.
* Usually used to write audit logs and other operations
*
* <a href="http://en.wikipedia.org/wiki/Fail-safe">Fail-safe</a>
*
*/
type failsafeClusterInvoker struct {
baseClusterInvoker
}
Expand Down
Loading

0 comments on commit 7f25ae9

Please sign in to comment.