Skip to content

Commit

Permalink
add failback_cluster_invoker && failsafe_cluster_invoker. resolve #135
Browse files Browse the repository at this point in the history
  • Loading branch information
xujianhai666 committed Aug 2, 2019
1 parent ba7178a commit f88c400
Show file tree
Hide file tree
Showing 9 changed files with 546 additions and 117 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ vendor/

logs/
.vscode/
coverage.txt
201 changes: 99 additions & 102 deletions cluster/cluster_impl/failback_cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,37 @@
package cluster_impl

import (
"container/list"
perrors "github.com/pkg/errors"
"sync"
"time"
)

import (
"github.com/Workiva/go-datastructures/queue"
)

import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/protocol"
)

/**
* When fails, record failure requests and schedule for retry on a regular interval.
* Especially useful for services of notification.
*
* <a href="http://en.wikipedia.org/wiki/Failback">Failback</a>
*/
type failbackClusterInvoker struct {
baseClusterInvoker
}

var (
retries int64
failbackTasks int64
ticker *time.Ticker
once sync.Once
lock sync.Mutex
taskList *Queue
)
ticker *time.Ticker
maxRetries int64
failbackTasks int64
taskList *queue.Queue
}

func newFailbackClusterInvoker(directory cluster.Directory) protocol.Invoker {
invoker := &failbackClusterInvoker{
Expand All @@ -56,19 +62,71 @@ func newFailbackClusterInvoker(directory cluster.Directory) protocol.Invoker {
if failbackTasksConfig <= 0 {
failbackTasksConfig = constant.DEFAULT_FAILBACK_TASKS
}
retries = retriesConfig
failbackTasks = failbackTasksConfig
invoker.maxRetries = retriesConfig
invoker.failbackTasks = failbackTasksConfig
return invoker
}

func (invoker *failbackClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
func (invoker *failbackClusterInvoker) process() {
invoker.ticker = time.NewTicker(time.Second * 1)
for range invoker.ticker.C {
// check each timeout task and re-run
for {
value, err := invoker.taskList.Peek()
if err == queue.ErrDisposed {
return
}
if err == queue.ErrEmptyQueue {
break
}

retryTask := value.(*retryTimerTask)
if time.Since(retryTask.lastT).Seconds() < 5 {
break
}

// ignore return. the get must success.
_, err = invoker.taskList.Get(1)
if err != nil {
logger.Warnf("get task found err: %v\n", err)
break
}

go func(retryTask *retryTimerTask) {
invoked := make([]protocol.Invoker, 0)
invoked = append(invoked, retryTask.lastInvoker)

retryInvoker := invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers, invoked)
var result protocol.Result
result = retryInvoker.Invoke(retryTask.invocation)
if result.Error() != nil {
retryTask.lastInvoker = retryInvoker
invoker.checkRetry(retryTask, result.Error())
}
}(retryTask)

}
}
}

func (invoker *failbackClusterInvoker) checkRetry(retryTask *retryTimerTask, err error) {
logger.Errorf("Failed retry to invoke the method %v in the service %v, wait again. The exception: %v.\n",
retryTask.invocation.MethodName(), invoker.GetUrl().Service(), err.Error())
retryTask.retries++
retryTask.lastT = time.Now()
if retryTask.retries > invoker.maxRetries {
logger.Errorf("Failed retry times exceed threshold (%v), We have to abandon, invocation-> %v.\n",
retryTask.retries, retryTask.invocation)
} else {
invoker.taskList.Put(retryTask)
}
}

func (invoker *failbackClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result {
invokers := invoker.directory.List(invocation)
err := invoker.checkInvokers(invokers, invocation)

if err != nil {
// add retry ticker task
perrors.Errorf("Failed to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.",
logger.Errorf("Failed to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.\n",
invocation.MethodName(), invoker.GetUrl().Service(), err)
return &protocol.RPCResult{}
}
Expand All @@ -84,19 +142,30 @@ func (invoker *failbackClusterInvoker) Invoke(invocation protocol.Invocation) pr
}
loadbalance := extension.GetLoadbalance(lb)

invoked := []protocol.Invoker{}
invoked := make([]protocol.Invoker, 0)
var result protocol.Result

ivk := invoker.doSelect(loadbalance, invocation, invokers, invoked)
invoked = append(invoked, ivk)
//DO INVOKE
result = ivk.Invoke(invocation)

if result.Error() != nil {
// add retry ticker task
addFailed(loadbalance, invocation, invokers, invoker)
perrors.Errorf("Failback to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.",
methodName, invoker.GetUrl().Service(), result.Error().Error())
invoker.once.Do(func() {
invoker.taskList = queue.New(invoker.failbackTasks)
go invoker.process()
})

taskLen := invoker.taskList.Len()
if taskLen >= invoker.failbackTasks {
logger.Warnf("tasklist is too full > %d.\n", taskLen)
return &protocol.RPCResult{}
}

timerTask := newRetryTimerTask(loadbalance, invocation, invokers, ivk)
invoker.taskList.Put(timerTask)

logger.Errorf("Failback to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.\n",
methodName, url.Service(), result.Error().Error())
// ignore
return &protocol.RPCResult{}
}
Expand All @@ -105,102 +174,30 @@ func (invoker *failbackClusterInvoker) Invoke(invocation protocol.Invocation) pr
}

func (invoker *failbackClusterInvoker) Destroy() {
//this is must atom operation
if invoker.destroyed.CAS(false, true) {
invoker.directory.Destroy()
}
// stop ticker
ticker.Stop()
}

func addFailed(balance cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker,
invoker *failbackClusterInvoker) {
initSingleTickerTaskInstance()
// init one retryTimerTask
timerTask := newRetryTimerTask(balance, invocation, invokers, invoker, retries, 5)
taskList.push(timerTask)
// process ticker task
go func() {
<-ticker.C
value := taskList.pop()
if value == nil {
return
}

retryTask := value.(retryTimerTask)
invoked := []protocol.Invoker{}
invoked = append(invoked, retryTask.lastInvoker)
retryInvoker := invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers,
invoked)
var result protocol.Result
result = retryInvoker.Invoke(retryTask.invocation)
if result.Error() != nil {
perrors.Errorf("Failed retry to invoke the method %v in the service %v, wait again. The exception: %v.",
invocation.MethodName(), invoker.GetUrl().Service(), result.Error().Error())
retryTask.retries++
if retryTask.retries > retries {
perrors.Errorf("Failed retry times exceed threshold (%v), We have to abandon, invocation-> %v",
retries, invocation)
} else {
taskList.push(retryTask)
}
}
}()
}
invoker.baseClusterInvoker.Destroy()

func initSingleTickerTaskInstance() {
once.Do(func() {
newTickerTask()
})
}
// stop ticker
invoker.ticker.Stop()

func newTickerTask() {
ticker = time.NewTicker(time.Second * 1)
taskList = newQueue()
_ = invoker.taskList.Dispose()
}

type retryTimerTask struct {
loadbalance cluster.LoadBalance
invocation protocol.Invocation
invokers []protocol.Invoker
lastInvoker *failbackClusterInvoker
lastInvoker protocol.Invoker
retries int64
tick int64
lastT time.Time
}

func newRetryTimerTask(loadbalance cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker,
lastInvoker *failbackClusterInvoker, retries int64, tick int64) *retryTimerTask {
lastInvoker protocol.Invoker) *retryTimerTask {
return &retryTimerTask{
loadbalance: loadbalance,
invocation: invocation,
invokers: invokers,
lastInvoker: lastInvoker,
retries: retries,
tick: tick,
lastT: time.Now(),
}
}

type Queue struct {
data *list.List
}

func newQueue() *Queue {
q := new(Queue)
q.data = list.New()
return q
}

func (q *Queue) push(v interface{}) {
defer lock.Unlock()
lock.Lock()
q.data.PushFront(v)
}

func (q *Queue) pop() interface{} {
defer lock.Unlock()
lock.Lock()
iter := q.data.Back()
v := iter.Value
q.data.Remove(iter)
return v
}
Loading

0 comments on commit f88c400

Please sign in to comment.