Skip to content

Commit

Permalink
consistent hash load balance
Browse files Browse the repository at this point in the history
Signed-off-by: imxyb <xyb4638@gmail.com>
  • Loading branch information
imxyb committed Nov 24, 2019
1 parent 6a691cd commit b5a0468
Show file tree
Hide file tree
Showing 4 changed files with 276 additions and 0 deletions.
164 changes: 164 additions & 0 deletions cluster/loadbalance/consistent_hash.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package loadbalance

import (
"crypto/md5"
"encoding/json"
"fmt"
"hash/crc32"
"regexp"
"sort"
"strconv"
"strings"

"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/protocol"
)

const (
ConsistentHash = "consistenthash"
HashNodes = "hash.nodes"
HashArguments = "hash.arguments"
)

var selectors = make(map[string]*ConsistentHashSelector)
var re = regexp.MustCompile(constant.COMMA_SPLIT_PATTERN)

func init() {
extension.SetLoadbalance(ConsistentHash, NewConsistentHash)
}

type ConsistentHashLoadBalance struct {
}

func NewConsistentHash() cluster.LoadBalance {
return &ConsistentHashLoadBalance{}
}

func (lb *ConsistentHashLoadBalance) Select(invokers []protocol.Invoker, invocation protocol.Invocation) protocol.Invoker {
methodName := invocation.MethodName()
key := invokers[0].GetUrl().ServiceKey() + "." + methodName

// hash the invokers
bs := make([]byte, 0)
for _, invoker := range invokers {
b, err := json.Marshal(invoker)
if err != nil {
panic(fmt.Sprintf("parse json failed:%s", err.Error()))
}
bs = append(bs, b...)
}
hashCode := crc32.ChecksumIEEE(bs)
selector, ok := selectors[key]
if !ok || selector.hashCode != hashCode {
selectors[key] = NewConsistentHashSelector(invokers, methodName, hashCode)
selector = selectors[key]
}
return selector.Select(invocation)
}

type Uint32Slice []uint32

func (s Uint32Slice) Len() int {
return len(s)
}

func (s Uint32Slice) Less(i, j int) bool {
return s[i] < s[j]
}

func (s Uint32Slice) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}

type ConsistentHashSelector struct {
hashCode uint32
replicaNum int
virtualInvokers map[uint32]protocol.Invoker
keys Uint32Slice
argumentIndex []int
}

func NewConsistentHashSelector(invokers []protocol.Invoker, methodName string,
hashCode uint32) *ConsistentHashSelector {
if len(invokers) == 0 {
panic("none of invokers")
}

selector := &ConsistentHashSelector{}
selector.virtualInvokers = make(map[uint32]protocol.Invoker)
selector.hashCode = hashCode
url := invokers[0].GetUrl()
selector.replicaNum = int(url.GetMethodParamInt(methodName, HashNodes, 160))
indices := re.Split(url.GetMethodParam(methodName, HashArguments, "0"), -1)
for _, index := range indices {
i, err := strconv.Atoi(index)
if err != nil {
panic(err)
}
selector.argumentIndex = append(selector.argumentIndex, i)
}
for _, invoker := range invokers {
u := invoker.GetUrl()
address := u.Ip + ":" + u.Port
for i := 0; i < selector.replicaNum/4; i++ {
digest := md5.Sum([]byte(address + strconv.Itoa(i)))
for j := 0; j < 4; j++ {
key := selector.hash(digest, j)
selector.keys = append(selector.keys, key)
selector.virtualInvokers[key] = invoker
}
}
}
sort.Sort(selector.keys)
return selector
}

func (c *ConsistentHashSelector) Select(invocation protocol.Invocation) protocol.Invoker {
key := c.toKey(invocation.Arguments())
digest := md5.Sum([]byte(key))
return c.selectForKey(c.hash(digest, 0))
}

func (c *ConsistentHashSelector) toKey(args []interface{}) string {
var sb strings.Builder
for i := range c.argumentIndex {
if i >= 0 && i < len(args) {
fmt.Fprint(&sb, args[i].(string))
}
}
return sb.String()
}

func (c *ConsistentHashSelector) selectForKey(hash uint32) protocol.Invoker {
idx := sort.Search(len(c.keys), func(i int) bool {
return c.keys[i] >= hash
})
if idx == len(c.keys) {
idx = 0
}
return c.virtualInvokers[c.keys[idx]]
}

func (c *ConsistentHashSelector) hash(digest [16]byte, i int) uint32 {
return uint32((digest[3+i*4]&0xFF)<<24) | uint32((digest[2+i*4]&0xFF)<<16) |
uint32((digest[1+i*4]&0xFF)<<8) | uint32(digest[i*4]&0xFF)&0xFFFFFFF
}
106 changes: 106 additions & 0 deletions cluster/loadbalance/consistent_hash_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package loadbalance

import (
"context"
"testing"

"github.com/stretchr/testify/suite"

"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
)

func TestConsistentHashSelectorSuite(t *testing.T) {
suite.Run(t, new(consistentHashSelectorSuite))
}

type consistentHashSelectorSuite struct {
suite.Suite
selector *ConsistentHashSelector
}

func (s *consistentHashSelectorSuite) SetupTest() {
var invokers []protocol.Invoker
url, _ := common.NewURL(context.TODO(),
"dubbo://192.168.1.0:20000/org.apache.demo.HelloService?methods.echo.hash.arguments=0,1")
invokers = append(invokers, protocol.NewBaseInvoker(url))
s.selector = NewConsistentHashSelector(invokers, "echo", 999944)
}

func (s *consistentHashSelectorSuite) TestToKey() {
result := s.selector.toKey([]interface{}{"username", "age"})
s.Equal(result, "usernameage")
}

func (s *consistentHashSelectorSuite) TestSelectForKey() {
url1, _ := common.NewURL(context.TODO(), "dubbo://192.168.1.0:8080")
url2, _ := common.NewURL(context.TODO(), "dubbo://192.168.1.0:8081")
s.selector.virtualInvokers = make(map[uint32]protocol.Invoker)
s.selector.virtualInvokers[0] = protocol.NewBaseInvoker(url1)
s.selector.virtualInvokers[1] = protocol.NewBaseInvoker(url2)
s.selector.keys = []uint32{99874, 9999945}
result := s.selector.selectForKey(9999944)
s.Equal(result.GetUrl().String(), "dubbo://192.168.1.0:8081?")
}

func TestConsistentHashLoadBalanceSuite(t *testing.T) {
suite.Run(t, new(consistentHashLoadBalanceSuite))
}

type consistentHashLoadBalanceSuite struct {
suite.Suite
url1 common.URL
url2 common.URL
url3 common.URL
invokers []protocol.Invoker
invoker1 protocol.Invoker
invoker2 protocol.Invoker
invoker3 protocol.Invoker
lb cluster.LoadBalance
}

func (s *consistentHashLoadBalanceSuite) SetupTest() {
var err error
s.url1, err = common.NewURL(context.TODO(), "dubbo://192.168.1.0:8080/org.apache.demo.HelloService?methods.echo.hash.arguments=0,1")
s.NoError(err)
s.url2, err = common.NewURL(context.TODO(), "dubbo://192.168.1.0:8081/org.apache.demo.HelloService?methods.echo.hash.arguments=0,1")
s.NoError(err)
s.url3, err = common.NewURL(context.TODO(), "dubbo://192.168.1.0:8082/org.apache.demo.HelloService?methods.echo.hash.arguments=0,1")
s.NoError(err)

s.invoker1 = protocol.NewBaseInvoker(s.url1)
s.invoker2 = protocol.NewBaseInvoker(s.url2)
s.invoker3 = protocol.NewBaseInvoker(s.url3)

s.invokers = append(s.invokers, s.invoker1, s.invoker2, s.invoker3)
s.lb = NewConsistentHash()
}

func (s *consistentHashLoadBalanceSuite) TestSelect() {
args := []interface{}{"name", "password", "age"}
invoker := s.lb.Select(s.invokers, invocation.NewRPCInvocation("echo", args, nil))
s.Equal(invoker.GetUrl().Location, "192.168.1.0:8080")

args = []interface{}{"ok", "abc"}
invoker = s.lb.Select(s.invokers, invocation.NewRPCInvocation("echo", args, nil))
s.Equal(invoker.GetUrl().Location, "192.168.1.0:8082")
}
4 changes: 4 additions & 0 deletions common/constant/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,7 @@ const (
APP_DYNAMIC_CONFIGURATORS_CATEGORY = "appdynamicconfigurators"
PROVIDER_CATEGORY = "providers"
)

const (
COMMA_SPLIT_PATTERN = "\\s*[,]+\\s*"
)
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,5 @@ require (
google.golang.org/grpc v1.22.1
gopkg.in/yaml.v2 v2.2.2
)

go 1.13

0 comments on commit b5a0468

Please sign in to comment.