Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add adaptive service #1649

Merged
merged 43 commits into from
Dec 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
8f8cf96
feat(adasvc): add infrastructure for adaptive service
justxuewei Oct 29, 2021
4dfc01e
feat(adasvc): reference config support adaptive service
justxuewei Oct 29, 2021
2ca1018
feat(adasvc): add p2c load balance
justxuewei Oct 29, 2021
e83a8a6
feat(cluster): add capacity evaluator interface
justxuewei Oct 31, 2021
f0c64e6
feat(cluster): add capacity updater
justxuewei Nov 1, 2021
69dd9d3
feat(cluster): add capacity updater
justxuewei Nov 3, 2021
2db43f7
feat(cluster): add fields to vegas capeva
justxuewei Nov 3, 2021
588f392
feat(cluster): refactor capeva interface
justxuewei Nov 4, 2021
e0a4d1a
feat(cluster): add more fields to vegas capeva
justxuewei Nov 4, 2021
9af855a
feat(cluster): vegas evaupdater done
justxuewei Nov 5, 2021
785c74c
Merge branch '3.0' into feat/adasvc
justxuewei Nov 6, 2021
1d5fca1
fix(common): fix typo
justxuewei Nov 6, 2021
7a537ff
fix(common): fix typo
justxuewei Nov 6, 2021
6d866b5
fix(cluster): add apache license
justxuewei Nov 6, 2021
865c058
feat(cluster): define limiter & update interface
justxuewei Nov 15, 2021
9d26e68
Merge branch '3.0' into feat/adasvc
justxuewei Nov 15, 2021
c3f9dbe
feat(cluster): remove cpu stat temporarily
justxuewei Nov 16, 2021
ff4e60b
feat(cluster): update hill climbing limiter
justxuewei Nov 17, 2021
0472bf1
feat(cluster): hill climbing done
justxuewei Nov 19, 2021
7e98a27
Merge branch '3.0' into feat/adasvc
justxuewei Nov 19, 2021
70ca487
fix(cluster): fix issue where init limitation is 0
justxuewei Nov 20, 2021
c949cef
Merge branch '3.0' into feat/adasvc
justxuewei Nov 20, 2021
ece019f
feat(cluster): provder-side filter done
justxuewei Nov 20, 2021
bf43b0b
fix(cluster): fix uint64 subtraction issue
justxuewei Nov 20, 2021
d8ca7f1
fix(cluster): add adaptivesvc filter to default service filters
justxuewei Nov 22, 2021
30dcb14
style: go fmt
justxuewei Nov 22, 2021
5b48b40
fix(filter): import adaptivesvc
justxuewei Nov 22, 2021
99f6919
Merge branch '3.0' into feat/adasvc
justxuewei Nov 22, 2021
f906714
Merge branch '3.0' into feat/adasvc
justxuewei Nov 25, 2021
c7edac5
fix(imports): import adaptivesvc cluster and p2c loadbalance
justxuewei Nov 26, 2021
73b4f70
fix(config): fix unexpectedly panic
justxuewei Nov 26, 2021
1fa48ca
feat(adasvc): add debug logs
justxuewei Dec 1, 2021
0acc52e
fix(adasvc): pass attachements with string
justxuewei Dec 1, 2021
e5de62f
feat(adasvc): detail debug logs
justxuewei Dec 2, 2021
6f1d7bf
fix(adasvc): fix log info
justxuewei Dec 2, 2021
4a04d4a
feat: detail dubbo logs
justxuewei Dec 5, 2021
4ed7505
feat: remove useless logs
justxuewei Dec 5, 2021
8b12eac
fix(adasvc): fix incorrect type
justxuewei Dec 5, 2021
9ac6763
Merge branch '3.0' into feat-adasvc
justxuewei Dec 5, 2021
c23612b
style: go fmt & dubbofmt
justxuewei Dec 5, 2021
0f7459b
fix: rpc result attrs is not initialized
justxuewei Dec 5, 2021
429a336
fix(protocol): fix result panic when attrs is not initialized
justxuewei Dec 5, 2021
bfc336e
Merge branch '3.0' into feat-adasvc
justxuewei Dec 6, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions cluster/cluster/adaptivesvc/cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package adaptivesvc

import (
"sync"
)

import (
clusterpkg "dubbo.apache.org/dubbo-go/v3/cluster/cluster"
"dubbo.apache.org/dubbo-go/v3/cluster/directory"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/protocol"
)

var (
once sync.Once
instance clusterpkg.Cluster
)

func init() {
extension.SetCluster(constant.ClusterKeyAdaptiveService, newAdaptiveServiceCluster)
}

// adaptiveServiceCluster is a cluster for adaptive service.
type adaptiveServiceCluster struct{}

func newAdaptiveServiceCluster() clusterpkg.Cluster {
if instance == nil {
once.Do(func() {
instance = &adaptiveServiceCluster{}
})
}
return instance
}

func (c *adaptiveServiceCluster) Join(directory directory.Directory) protocol.Invoker {
return clusterpkg.BuildInterceptorChain(newAdaptiveServiceClusterInvoker(directory))
}
85 changes: 85 additions & 0 deletions cluster/cluster/adaptivesvc/cluster_invoker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package adaptivesvc

import (
"context"
"strconv"
)

import (
perrors "github.com/pkg/errors"
)

import (
"dubbo.apache.org/dubbo-go/v3/cluster/cluster/base"
"dubbo.apache.org/dubbo-go/v3/cluster/directory"
"dubbo.apache.org/dubbo-go/v3/cluster/metrics"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/common/logger"
"dubbo.apache.org/dubbo-go/v3/protocol"
)

type adaptiveServiceClusterInvoker struct {
base.ClusterInvoker
}

func newAdaptiveServiceClusterInvoker(directory directory.Directory) protocol.Invoker {
return &adaptiveServiceClusterInvoker{
ClusterInvoker: base.NewClusterInvoker(directory),
}
}

func (ivk *adaptiveServiceClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
invokers := ivk.Directory.List(invocation)
if err := ivk.CheckInvokers(invokers, invocation); err != nil {
return protocol.NewRPCResult(nil, err)
}

// get loadBalance
lbKey := invokers[0].GetURL().GetParam(constant.LoadbalanceKey, constant.LoadBalanceKeyP2C)
if lbKey != constant.LoadBalanceKeyP2C {
return protocol.NewRPCResult(nil, perrors.Errorf("adaptive service not supports %s load balance", lbKey))
}
lb := extension.GetLoadbalance(lbKey)

// select a node by the loadBalance
invoker := lb.Select(invokers, invocation)

// invoke
result := invoker.Invoke(ctx, invocation)

// update metrics
remainingStr := result.Attachment(constant.AdaptiveServiceRemainingKey, "").(string)
remaining, err := strconv.Atoi(remainingStr)
if err != nil {
logger.Warnf("the remaining is unexpected, we need a int type, but we got %s, err: %v.", remainingStr, err)
return result
}
logger.Debugf("[adasvc cluster] The server status was received successfully, %s: %#v",
constant.AdaptiveServiceRemainingKey, remainingStr)
err = metrics.LocalMetrics.SetMethodMetrics(invoker.GetURL(),
invocation.MethodName(), metrics.HillClimbing, uint64(remaining))
if err != nil {
logger.Warnf("adaptive service metrics update is failed, err: %v", err)
return protocol.NewRPCResult(nil, err)
}

return result
}
6 changes: 3 additions & 3 deletions cluster/cluster/available/cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,18 @@ func (invoker *clusterInvoker) Invoke(ctx context.Context, invocation protocol.I
invokers := invoker.Directory.List(invocation)
err := invoker.CheckInvokers(invokers, invocation)
if err != nil {
return &protocol.RPCResult{Err: err}
return protocol.NewRPCResult(nil, err)
}

err = invoker.CheckWhetherDestroyed()
if err != nil {
return &protocol.RPCResult{Err: err}
return protocol.NewRPCResult(nil, err)
}

for _, ivk := range invokers {
if ivk.IsAvailable() {
return ivk.Invoke(ctx, invocation)
}
}
return &protocol.RPCResult{Err: errors.New(fmt.Sprintf("no provider available in %v", invokers))}
return protocol.NewRPCResult(nil, errors.New(fmt.Sprintf("no provider available in %v", invokers)))
}
2 changes: 1 addition & 1 deletion cluster/cluster/available/cluster_invoker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestAvailableClusterInvokerSuccess(t *testing.T) {
invoker := mock.NewMockInvoker(ctrl)
clusterInvoker := registerAvailable(invoker)

mockResult := &protocol.RPCResult{Rest: clusterpkg.Rest{Tried: 0, Success: true}}
mockResult := protocol.NewRPCResult(clusterpkg.Rest{Tried: 0, Success: true}, nil)
invoker.EXPECT().IsAvailable().Return(true)
invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult)

Expand Down
1 change: 1 addition & 0 deletions cluster/cluster_impl/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package cluster_impl
// This package may be DEPRECATED OR REMOVED in the future.

import (
_ "dubbo.apache.org/dubbo-go/v3/cluster/cluster/adaptivesvc"
_ "dubbo.apache.org/dubbo-go/v3/cluster/cluster/available"
_ "dubbo.apache.org/dubbo-go/v3/cluster/cluster/broadcast"
_ "dubbo.apache.org/dubbo-go/v3/cluster/cluster/failback"
Expand Down
129 changes: 129 additions & 0 deletions cluster/loadbalance/p2c/loadbalance.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package p2c

import (
"errors"
"fmt"
"math/rand"
"sync"
"time"
)

import (
"dubbo.apache.org/dubbo-go/v3/cluster/loadbalance"
"dubbo.apache.org/dubbo-go/v3/cluster/metrics"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/common/logger"
"dubbo.apache.org/dubbo-go/v3/protocol"
)

func init() {
extension.SetLoadbalance(constant.LoadBalanceKeyP2C, newLoadBalance)
}

var (
once sync.Once
instance loadbalance.LoadBalance
)

type loadBalance struct{}

func newLoadBalance() loadbalance.LoadBalance {
if instance == nil {
once.Do(func() {
instance = &loadBalance{}
})
}
return instance
}

func (l *loadBalance) Select(invokers []protocol.Invoker, invocation protocol.Invocation) protocol.Invoker {
if len(invokers) == 0 {
return nil
}
if len(invokers) == 1 {
return invokers[0]
}
// m is the Metrics, which saves the metrics of instance, invokers and methods
// The local metrics is available only for the earlier version.
m := metrics.LocalMetrics
// picks two nodes randomly
var i, j int
if len(invokers) == 2 {
i, j = 0, 1
} else {
rand.Seed(time.Now().Unix())
i = rand.Intn(len(invokers))
j = i
for i == j {
j = rand.Intn(len(invokers))
}
}
logger.Debugf("[P2C select] Two invokers were selected, i: %d, j: %d, invoker[i]: %s, invoker[j]: %s.",
i, j, invokers[i], invokers[j])

// TODO(justxuewei): please consider how to get the real method name from $invoke,
// see also [#1511](https://github.com/apache/dubbo-go/issues/1511)
methodName := invocation.MethodName()
// remainingIIface, remainingJIface means remaining capacity of node i and node j.
// If one of the metrics is empty, invoke the invocation to that node directly.
remainingIIface, err := m.GetMethodMetrics(invokers[i].GetURL(), methodName, metrics.HillClimbing)
if err != nil {
if errors.Is(err, metrics.ErrMetricsNotFound) {
logger.Debugf("[P2C select] The invoker[i] was selected, because it hasn't been selected before.")
return invokers[i]
}
logger.Warnf("get method metrics err: %v", err)
return nil
}

remainingJIface, err := m.GetMethodMetrics(invokers[j].GetURL(), methodName, metrics.HillClimbing)
if err != nil {
if errors.Is(err, metrics.ErrMetricsNotFound) {
logger.Debugf("[P2C select] The invoker[j] was selected, because it hasn't been selected before.")
return invokers[j]
}
logger.Warnf("get method metrics err: %v", err)
return nil
}

// Convert interface to int, if the type is unexpected, panic immediately
remainingI, ok := remainingIIface.(uint64)
if !ok {
panic(fmt.Sprintf("[P2C select] the type of %s expects to be uint64, but gets %T",
metrics.HillClimbing, remainingIIface))
}

remainingJ, ok := remainingJIface.(uint64)
if !ok {
panic(fmt.Sprintf("the type of %s expects to be uint64, but gets %T", metrics.HillClimbing, remainingJIface))
}

logger.Debugf("[P2C select] The invoker[i] remaining is %d, and the invoker[j] is %d.", remainingI, remainingJ)

// For the remaining capacity, the bigger, the better.
if remainingI > remainingJ {
logger.Debugf("[P2C select] The invoker[i] was selected.")
return invokers[i]
}

logger.Debugf("[P2C select] The invoker[j] was selected.")
return invokers[j]
}
22 changes: 22 additions & 0 deletions cluster/metrics/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package metrics

const (
HillClimbing = "hill-climbing"
)
Loading