Skip to content

Commit

Permalink
add failback_cluster_invoker && failsafe_cluster_invoker. resolve #135
Browse files Browse the repository at this point in the history
  • Loading branch information
xujianhai666 committed Jul 27, 2019
1 parent 8b66663 commit 6cb8a9e
Show file tree
Hide file tree
Showing 9 changed files with 2,526 additions and 73 deletions.
179 changes: 107 additions & 72 deletions cluster/cluster_impl/failback_cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,34 +19,42 @@ package cluster_impl

import (
"container/list"
perrors "github.com/pkg/errors"
"sync"
"time"
)

import (
perrors "github.com/pkg/errors"
)

import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/protocol"
)

/**
* When fails, record failure requests and schedule for retry on a regular interval.
* Especially useful for services of notification.
*
* <a href="http://en.wikipedia.org/wiki/Failback">Failback</a>
*/
type failbackClusterInvoker struct {
baseClusterInvoker
}

var (
retries int64
failbackTasks int64
ticker *time.Ticker
once sync.Once
lock sync.Mutex
ticker *time.Ticker
maxRetries int64
failbackTasks int64
taskList *Queue
)
}

func newFailbackClusterInvoker(directory cluster.Directory) protocol.Invoker {
invoker := &failbackClusterInvoker{
baseClusterInvoker: newBaseClusterInvoker(directory),
taskList: newQueue(),
}
retriesConfig := invoker.GetUrl().GetParamInt(constant.RETRIES_KEY, constant.DEFAULT_FAILBACK_TIMES)
if retriesConfig <= 0 {
Expand All @@ -56,18 +64,68 @@ func newFailbackClusterInvoker(directory cluster.Directory) protocol.Invoker {
if failbackTasksConfig <= 0 {
failbackTasksConfig = constant.DEFAULT_FAILBACK_TASKS
}
retries = retriesConfig
failbackTasks = failbackTasksConfig
invoker.maxRetries = retriesConfig
invoker.failbackTasks = failbackTasksConfig
return invoker
}

func (invoker *failbackClusterInvoker) process() {
invoker.ticker = time.NewTicker(time.Second * 1)
for range invoker.ticker.C {
// check each timeout task and re-run
for {
value := invoker.taskList.peek()
if value == nil {
break
}

retryTask := value.(*retryTimerTask)
if time.Since(retryTask.lastT).Seconds() < 5 {
break
}

invoker.taskList.pop()

err := invoker.checkWhetherDestroyed()
if err != nil {
invoker.checkRetry(retryTask, err)
return
}

go func(retryTask *retryTimerTask) {
invoked := []protocol.Invoker{}
invoked = append(invoked, retryTask.lastInvoker)

retryInvoker := invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers, invoked)
var result protocol.Result
result = retryInvoker.Invoke(retryTask.invocation)
if result.Error() != nil {
invoker.checkRetry(retryTask, result.Error())
}
}(retryTask)

}
}
}

func (invoker *failbackClusterInvoker) checkRetry(retryTask *retryTimerTask, err error) {
perrors.Errorf("Failed retry to invoke the method %v in the service %v, wait again. The exception: %v.",
retryTask.invocation.MethodName(), invoker.GetUrl().Service(), err.Error())
retryTask.retries++
if retryTask.retries > invoker.maxRetries {
perrors.Errorf("Failed retry times exceed threshold (%v), We have to abandon, invocation-> %v",
retryTask.retries, retryTask.invocation)
} else {
invoker.taskList.push(retryTask)
}
}

func (invoker *failbackClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result {

invokers := invoker.directory.List(invocation)
err := invoker.checkInvokers(invokers, invocation)

if err != nil {
// add retry ticker task
perrors.Errorf("Failed to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.",
invocation.MethodName(), invoker.GetUrl().Service(), err)
return &protocol.RPCResult{}
Expand All @@ -93,10 +151,16 @@ func (invoker *failbackClusterInvoker) Invoke(invocation protocol.Invocation) pr
result = ivk.Invoke(invocation)

if result.Error() != nil {
invoker.once.Do(func() {
go invoker.process()
})

// add retry ticker task
addFailed(loadbalance, invocation, invokers, invoker)
timerTask := newRetryTimerTask(loadbalance, invocation, invokers, invoker, invoker.maxRetries, 5)
invoker.taskList.push(timerTask)

perrors.Errorf("Failback to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.",
methodName, invoker.GetUrl().Service(), result.Error().Error())
methodName, url.Service(), result.Error().Error())
// ignore
return &protocol.RPCResult{}
}
Expand All @@ -105,58 +169,10 @@ func (invoker *failbackClusterInvoker) Invoke(invocation protocol.Invocation) pr
}

func (invoker *failbackClusterInvoker) Destroy() {
//this is must atom operation
if invoker.destroyed.CAS(false, true) {
invoker.directory.Destroy()
}
// stop ticker
ticker.Stop()
}

func addFailed(balance cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker,
invoker *failbackClusterInvoker) {
initSingleTickerTaskInstance()
// init one retryTimerTask
timerTask := newRetryTimerTask(balance, invocation, invokers, invoker, retries, 5)
taskList.push(timerTask)
// process ticker task
go func() {
<-ticker.C
value := taskList.pop()
if value == nil {
return
}

retryTask := value.(retryTimerTask)
invoked := []protocol.Invoker{}
invoked = append(invoked, retryTask.lastInvoker)
retryInvoker := invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers,
invoked)
var result protocol.Result
result = retryInvoker.Invoke(retryTask.invocation)
if result.Error() != nil {
perrors.Errorf("Failed retry to invoke the method %v in the service %v, wait again. The exception: %v.",
invocation.MethodName(), invoker.GetUrl().Service(), result.Error().Error())
retryTask.retries++
if retryTask.retries > retries {
perrors.Errorf("Failed retry times exceed threshold (%v), We have to abandon, invocation-> %v",
retries, invocation)
} else {
taskList.push(retryTask)
}
}
}()
}
invoker.baseClusterInvoker.Destroy()

func initSingleTickerTaskInstance() {
once.Do(func() {
newTickerTask()
})
}

func newTickerTask() {
ticker = time.NewTicker(time.Second * 1)
taskList = newQueue()
// stop ticker
invoker.ticker.Stop()
}

type retryTimerTask struct {
Expand All @@ -165,7 +181,7 @@ type retryTimerTask struct {
invokers []protocol.Invoker
lastInvoker *failbackClusterInvoker
retries int64
tick int64
lastT time.Time
}

func newRetryTimerTask(loadbalance cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker,
Expand All @@ -176,31 +192,50 @@ func newRetryTimerTask(loadbalance cluster.LoadBalance, invocation protocol.Invo
invokers: invokers,
lastInvoker: lastInvoker,
retries: retries,
tick: tick,
lastT: time.Now(),
}
}

type Queue struct {
data *list.List
sync.Locker
}

func newQueue() *Queue {
q := new(Queue)
q.data = list.New()
q := &Queue{
data: list.New(),
Locker: new(sync.Mutex),
}
return q
}

func (q *Queue) push(v interface{}) {
defer lock.Unlock()
lock.Lock()
defer q.Unlock()
q.Lock()
if q.data.Len() > 1000 {
logger.Warnf("retry work in failbackClusterInvoker is too large: %d > 1000", q.data.Len())
}
q.data.PushFront(v)
}

func (q *Queue) pop() interface{} {
defer lock.Unlock()
lock.Lock()
defer q.Unlock()
q.Lock()
iter := q.data.Back()
if iter == nil {
return nil
}
v := iter.Value
q.data.Remove(iter)
return v
}

func (q *Queue) peek() interface{} {
defer q.Unlock()
q.Lock()
iter := q.data.Back()
if iter == nil {
return nil
}
return iter.Value
}
Loading

0 comments on commit 6cb8a9e

Please sign in to comment.