Skip to content

Commit

Permalink
add failback_cluster_invoker && failsafe_cluster_invoker. resolve apa…
Browse files Browse the repository at this point in the history
  • Loading branch information
xujianhai666 committed Jul 27, 2019
1 parent 8b66663 commit ea55844
Show file tree
Hide file tree
Showing 9 changed files with 2,575 additions and 97 deletions.
192 changes: 96 additions & 96 deletions cluster/cluster_impl/failback_cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@
package cluster_impl

import (
"container/list"
perrors "github.com/pkg/errors"
"sync"
"time"

"github.com/apache/dubbo-go/common/logger"
)

import (
"github.com/Workiva/go-datastructures/queue"
perrors "github.com/pkg/errors"
)

import (
Expand All @@ -31,22 +36,26 @@ import (
"github.com/apache/dubbo-go/protocol"
)

/**
* When fails, record failure requests and schedule for retry on a regular interval.
* Especially useful for services of notification.
*
* <a href="http://en.wikipedia.org/wiki/Failback">Failback</a>
*/
type failbackClusterInvoker struct {
baseClusterInvoker
}

var (
retries int64
failbackTasks int64
ticker *time.Ticker
once sync.Once
lock sync.Mutex
taskList *Queue
)
ticker *time.Ticker
maxRetries int64
failbackTasks int64
taskList *queue.Queue
}

func newFailbackClusterInvoker(directory cluster.Directory) protocol.Invoker {
invoker := &failbackClusterInvoker{
baseClusterInvoker: newBaseClusterInvoker(directory),
taskList: queue.New(1000),
}
retriesConfig := invoker.GetUrl().GetParamInt(constant.RETRIES_KEY, constant.DEFAULT_FAILBACK_TIMES)
if retriesConfig <= 0 {
Expand All @@ -56,18 +65,71 @@ func newFailbackClusterInvoker(directory cluster.Directory) protocol.Invoker {
if failbackTasksConfig <= 0 {
failbackTasksConfig = constant.DEFAULT_FAILBACK_TASKS
}
retries = retriesConfig
failbackTasks = failbackTasksConfig
invoker.maxRetries = retriesConfig
invoker.failbackTasks = failbackTasksConfig
return invoker
}

func (invoker *failbackClusterInvoker) process() {
invoker.ticker = time.NewTicker(time.Second * 1)
for range invoker.ticker.C {
// check each timeout task and re-run
for {
value, err := invoker.taskList.Peek()
if err == queue.ErrDisposed {
return
}
if err == queue.ErrEmptyQueue {
break
}

retryTask := value.(*retryTimerTask)
if time.Since(retryTask.lastT).Seconds() < 5 {
break
}

// ignore return. the get must success.
_, err = invoker.taskList.Get(1)
if err != nil {
break
}

go func(retryTask *retryTimerTask) {
invoked := []protocol.Invoker{}
invoked = append(invoked, retryTask.lastInvoker)

retryInvoker := invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers, invoked)
var result protocol.Result
result = retryInvoker.Invoke(retryTask.invocation)
if result.Error() != nil {
retryTask.lastInvoker = retryInvoker
invoker.checkRetry(retryTask, result.Error())
}
}(retryTask)

}
}
}

func (invoker *failbackClusterInvoker) checkRetry(retryTask *retryTimerTask, err error) {
perrors.Errorf("Failed retry to invoke the method %v in the service %v, wait again. The exception: %v.",
retryTask.invocation.MethodName(), invoker.GetUrl().Service(), err.Error())
retryTask.retries++
retryTask.lastT = time.Now()
if retryTask.retries > invoker.maxRetries {
perrors.Errorf("Failed retry times exceed threshold (%v), We have to abandon, invocation-> %v",
retryTask.retries, retryTask.invocation)
} else {
invoker.taskList.Put(retryTask)
}
}

func (invoker *failbackClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result {

invokers := invoker.directory.List(invocation)
err := invoker.checkInvokers(invokers, invocation)

if err != nil {
// add retry ticker task
perrors.Errorf("Failed to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.",
invocation.MethodName(), invoker.GetUrl().Service(), err)
return &protocol.RPCResult{}
Expand All @@ -93,10 +155,20 @@ func (invoker *failbackClusterInvoker) Invoke(invocation protocol.Invocation) pr
result = ivk.Invoke(invocation)

if result.Error() != nil {
// add retry ticker task
addFailed(loadbalance, invocation, invokers, invoker)
invoker.once.Do(func() {
go invoker.process()
})

taskLen := invoker.taskList.Len()
if taskLen > 1000 {
logger.Warn("tasklist is too full %d > 1000.", taskLen)
}

timerTask := newRetryTimerTask(loadbalance, invocation, invokers, ivk)
invoker.taskList.Put(timerTask)

perrors.Errorf("Failback to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.",
methodName, invoker.GetUrl().Service(), result.Error().Error())
methodName, url.Service(), result.Error().Error())
// ignore
return &protocol.RPCResult{}
}
Expand All @@ -105,102 +177,30 @@ func (invoker *failbackClusterInvoker) Invoke(invocation protocol.Invocation) pr
}

func (invoker *failbackClusterInvoker) Destroy() {
//this is must atom operation
if invoker.destroyed.CAS(false, true) {
invoker.directory.Destroy()
}
// stop ticker
ticker.Stop()
}
invoker.baseClusterInvoker.Destroy()

func addFailed(balance cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker,
invoker *failbackClusterInvoker) {
initSingleTickerTaskInstance()
// init one retryTimerTask
timerTask := newRetryTimerTask(balance, invocation, invokers, invoker, retries, 5)
taskList.push(timerTask)
// process ticker task
go func() {
<-ticker.C
value := taskList.pop()
if value == nil {
return
}

retryTask := value.(retryTimerTask)
invoked := []protocol.Invoker{}
invoked = append(invoked, retryTask.lastInvoker)
retryInvoker := invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers,
invoked)
var result protocol.Result
result = retryInvoker.Invoke(retryTask.invocation)
if result.Error() != nil {
perrors.Errorf("Failed retry to invoke the method %v in the service %v, wait again. The exception: %v.",
invocation.MethodName(), invoker.GetUrl().Service(), result.Error().Error())
retryTask.retries++
if retryTask.retries > retries {
perrors.Errorf("Failed retry times exceed threshold (%v), We have to abandon, invocation-> %v",
retries, invocation)
} else {
taskList.push(retryTask)
}
}
}()
}

func initSingleTickerTaskInstance() {
once.Do(func() {
newTickerTask()
})
}
// stop ticker
invoker.ticker.Stop()

func newTickerTask() {
ticker = time.NewTicker(time.Second * 1)
taskList = newQueue()
_ = invoker.taskList.Dispose()
}

type retryTimerTask struct {
loadbalance cluster.LoadBalance
invocation protocol.Invocation
invokers []protocol.Invoker
lastInvoker *failbackClusterInvoker
lastInvoker protocol.Invoker
retries int64
tick int64
lastT time.Time
}

func newRetryTimerTask(loadbalance cluster.LoadBalance, invocation protocol.Invocation, invokers []protocol.Invoker,
lastInvoker *failbackClusterInvoker, retries int64, tick int64) *retryTimerTask {
lastInvoker protocol.Invoker) *retryTimerTask {
return &retryTimerTask{
loadbalance: loadbalance,
invocation: invocation,
invokers: invokers,
lastInvoker: lastInvoker,
retries: retries,
tick: tick,
lastT: time.Now(),
}
}

type Queue struct {
data *list.List
}

func newQueue() *Queue {
q := new(Queue)
q.data = list.New()
return q
}

func (q *Queue) push(v interface{}) {
defer lock.Unlock()
lock.Lock()
q.data.PushFront(v)
}

func (q *Queue) pop() interface{} {
defer lock.Unlock()
lock.Lock()
iter := q.data.Back()
v := iter.Value
q.data.Remove(iter)
return v
}
Loading

0 comments on commit ea55844

Please sign in to comment.