diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 0c74d2d..9f21f8e 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -6,8 +6,8 @@ jobs: unit-benchmark-test: strategy: matrix: - go: [ 1.17, 1.18, 1.19 ] - os: [ X64, ARM64 ] + go: [ "1.18", "1.19", "1.20", "1.21", "1.22"] + os: [ X64 ] runs-on: ${{ matrix.os }} steps: - uses: actions/checkout@v3 diff --git a/README.md b/README.md index e27e407..49cbaa7 100644 --- a/README.md +++ b/README.md @@ -31,9 +31,10 @@ To enable xDS mode in Kitex, we should invoke `xds.Init()` to initialize the xds #### Bootstrap The xdsClient is responsible for the interaction with the xDS Server (i.e. Istio). It needs some environment variables for initialization, which need to be set inside the `spec.containers.env` of the Kubernetes Manifest file in YAML format. -* `POD_NAMESPACE`: the namespace of the current service. +* `POD_NAMESPACE`: the namespace of the current service. * `POD_NAME`: the name of this pod. * `INSTANCE_IP`: the ip of this pod. +* `KITEX_XDS_METAS`: the metadata of this xDS node. Add the following part to the definition of your container that uses xDS-enabled Kitex client. @@ -50,6 +51,8 @@ valueFrom: valueFrom: fieldRef: fieldPath: status.podIP +- name: KITEX_XDS_METAS + value: '{"CLUSTER_ID":"Kubernetes","DNS_AUTO_ALLOCATE":"true","DNS_CAPTURE":"true","INSTANCE_IPS":"$(INSTANCE_IP)","NAMESPACE":"$(POD_NAMESPACE)"}' ``` ### Client-side @@ -231,6 +234,10 @@ spec: failure_percentage_threshold: 10 # the failure percentage request volume failure_percentage_request_volume: 101 + workloadSelector: + labels: + # the label of the client pod. + app.kubernetes.io/name: kitex-client ``` #### RateLimit @@ -262,12 +269,72 @@ spec: max_tokens: 4 workloadSelector: labels: - # the label of the service pod. + # the label of the server pod. app.kubernetes.io/name: kitex-server ``` +#### Retry + +Support using VirtualService and EnvoyFilter to config retry policy, the EnvoyFilter has more configuration. +``` +apiVersion: networking.istio.io/v1 +kind: VirtualService +metadata: + name: retry-sample + namespace: default +spec: + hosts: + - hello.prod.svc.cluster.local:21001 + http: + - route: + - destination: + host: hello.prod.svc.cluster.local:21001 + retries: + attempts: 1 + perTryTimeout: 2s +``` + +``` +apiVersion: networking.istio.io/v1alpha3 +kind: EnvoyFilter +metadata: + name: retry-enhance + namespace: default +spec: + configPatches: + - applyTo: HTTP_ROUTE + match: + context: SIDECAR_OUTBOUND + routeConfiguration: + # service name, should obey FQDN + name: hello.default.svc.cluster.local:21001 + vhost: + # service name, should obey FQDN + name: hello.default.svc.cluster.local:21001 + patch: + operation: MERGE + value: + route: + retryPolicy: + numRetries: 3 + perTryTimeout: 100ms + retryBackOff: + baseInterval: 100ms + maxInterval: 100ms + retriableHeaders: + - name: "kitexRetryErrorRate" + stringMatch: + exact: "0.29" + - name: "kitexRetryMethods" + stringMatch: + exact: "Echo,Greet" + workloadSelector: + labels: + # the label of the service pod. + app.kubernetes.io/name: kitex-client +``` ## Example The usage is as follows: diff --git a/README_CN.md b/README_CN.md index 7aea016..6bf48b2 100644 --- a/README_CN.md +++ b/README_CN.md @@ -32,9 +32,10 @@ Kitex 通过外部扩展 [kitex-contrib/xds](https://github.com/kitex-contrib/xd xdsClient 负责与控制面(例如 Istio)交互,以获得所需的 xDS 资源。在初始化时,需要读取环境变量用于构建 node 标识。所以,需要在K8S 的容器配置文件 `spec.containers.env` 部分加入以下几个环境变量。 -* `POD_NAMESPACE`: 当前 pod 所在的 namespace。 +* `POD_NAMESPACE`: 当前 pod 所在的 namespace。 * `POD_NAME`: pod 名。 * `INSTANCE_IP`: pod 的 ip。 +* `KITEX_XDS_METAS`: 用于构建 node 标识的元信息,格式为 json 字符串。 在需要使用 xDS 功能的容器配置中加入以下定义即可: @@ -51,6 +52,8 @@ valueFrom: valueFrom: fieldRef: fieldPath: status.podIP +- name: KITEX_XDS_METAS + value: '{"CLUSTER_ID":"Kubernetes","DNS_AUTO_ALLOCATE":"true","DNS_CAPTURE":"true","INSTANCE_IPS":"$(INSTANCE_IP)","NAMESPACE":"$(POD_NAMESPACE)"}' ``` ### Kitex 客户端 @@ -221,6 +224,10 @@ spec: failure_percentage_threshold: 10 # 触发熔断请求量 failure_percentage_request_volume: 101 + workloadSelector: + labels: + # the label of the client pod. + app.kubernetes.io/name: kitex-client ``` #### 限流配置 @@ -256,6 +263,67 @@ spec: app.kubernetes.io/name: kitex-server ``` +#### 重试配置 + +重试支持两个配置方式:VirtualService 和 EnvoyFilter,EnvoyFilter 支持更丰富的重试策略。 + +``` +apiVersion: networking.istio.io/v1 +kind: VirtualService +metadata: + name: retry-sample + namespace: default +spec: + hosts: + - hello.prod.svc.cluster.local:21001 + http: + - route: + - destination: + host: hello.prod.svc.cluster.local:21001 + retries: + attempts: 1 + perTryTimeout: 2s +``` + +``` +apiVersion: networking.istio.io/v1alpha3 +kind: EnvoyFilter +metadata: + name: retry-enhance + namespace: default +spec: + configPatches: + - applyTo: HTTP_ROUTE + match: + context: SIDECAR_OUTBOUND + routeConfiguration: + # service name, should obey FQDN + name: hello.default.svc.cluster.local:21001 + vhost: + # service name, should obey FQDN + name: hello.default.svc.cluster.local:21001 + patch: + operation: MERGE + value: + route: + retryPolicy: + numRetries: 3 + perTryTimeout: 100ms + retryBackOff: + baseInterval: 100ms + maxInterval: 100ms + retriableHeaders: + - name: "kitexRetryErrorRate" + stringMatch: + exact: "0.29" + - name: "kitexRetryMethods" + stringMatch: + exact: "Echo,Greet" + workloadSelector: + labels: + # the label of the service pod. + app.kubernetes.io/name: kitex-client +``` ## 示例 完整的客户端用法如下: diff --git a/core/xdsresource/rds.go b/core/xdsresource/rds.go index a59c448..2f70a0b 100644 --- a/core/xdsresource/rds.go +++ b/core/xdsresource/rds.go @@ -19,6 +19,8 @@ package xdsresource import ( "encoding/json" "fmt" + "strconv" + "strings" "time" v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" @@ -26,6 +28,12 @@ import ( "google.golang.org/protobuf/proto" ) +const ( + // If the error rate large than it, the retry policy do not take affect. + cBErrorRateKey = "kitexRetryErrorRate" + retryMethods = "kitexRetryMethods" +) + // RouteConfigResource is used for routing // HTTPRouteConfig is the native http route config, which consists of a list of virtual hosts. // ThriftRouteConfig is converted from the routeConfiguration in thrift proxy, which can only be configured in the listener filter @@ -34,6 +42,7 @@ type RouteConfigResource struct { HTTPRouteConfig *HTTPRouteConfig ThriftRouteConfig *ThriftRouteConfig MaxTokens uint32 + TokensPerFill uint32 } type HTTPRouteConfig struct { @@ -64,7 +73,9 @@ type RetryPolicy struct { NumRetries int PerTryTimeout time.Duration PerTryIdleTimeout time.Duration + CBErrorRate float64 RetryBackOff *RetryBackOff + Methods []string } type Route struct { @@ -181,6 +192,26 @@ func unmarshalRoutes(rs []*v3routepb.Route) ([]*Route, error) { PerTryTimeout: retryPolicy.GetPerTryTimeout().AsDuration(), PerTryIdleTimeout: retryPolicy.GetPerTryIdleTimeout().AsDuration(), } + // used for config the errRate. + for _, header := range retryPolicy.GetRetriableHeaders() { + match := header.GetStringMatch() + if match == nil { + continue + } + value := match.GetExact() + if value == "" { + continue + } + switch header.Name { + case cBErrorRateKey: + errRate, err := strconv.ParseFloat(value, 64) + if err == nil { + route.RetryPolicy.CBErrorRate = errRate + } + case retryMethods: + route.RetryPolicy.Methods = strings.Split(value, ",") + } + } if backoff := retryPolicy.GetRetryBackOff(); backoff != nil { route.RetryPolicy.RetryBackOff = &RetryBackOff{ BaseInterval: backoff.GetMaxInterval().AsDuration(), diff --git a/go.mod b/go.mod index 03ec923..e2d8531 100644 --- a/go.mod +++ b/go.mod @@ -1,16 +1,16 @@ module github.com/kitex-contrib/xds -go 1.17 +go 1.18 require ( - github.com/bytedance/gopkg v0.1.0 + github.com/bytedance/gopkg v0.1.1 github.com/cenkalti/backoff/v4 v4.1.0 github.com/cloudwego/kitex v0.10.3 github.com/cncf/udpa/go v0.0.0-20220112060539-c52dc94e7fbe github.com/envoyproxy/go-control-plane v0.11.1 github.com/golang/protobuf v1.5.3 github.com/google/go-cmp v0.5.9 - github.com/stretchr/testify v1.8.3 + github.com/stretchr/testify v1.9.0 go.uber.org/atomic v1.11.0 google.golang.org/genproto/googleapis/rpc v0.0.0-20230526203410-71b5a4ffd15e google.golang.org/protobuf v1.30.0 @@ -52,10 +52,10 @@ require ( github.com/tidwall/pretty v1.2.0 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect golang.org/x/arch v0.2.0 // indirect - golang.org/x/net v0.17.0 // indirect - golang.org/x/sync v0.1.0 // indirect - golang.org/x/sys v0.13.0 // indirect - golang.org/x/text v0.13.0 // indirect + golang.org/x/net v0.24.0 // indirect + golang.org/x/sync v0.8.0 // indirect + golang.org/x/sys v0.19.0 // indirect + golang.org/x/text v0.14.0 // indirect google.golang.org/genproto v0.0.0-20230526203410-71b5a4ffd15e // indirect google.golang.org/genproto/googleapis/api v0.0.0-20230526203410-71b5a4ffd15e // indirect google.golang.org/grpc v1.56.3 // indirect diff --git a/go.sum b/go.sum index 8e2fa93..e2bde92 100644 --- a/go.sum +++ b/go.sum @@ -614,8 +614,8 @@ github.com/boombuler/barcode v1.0.1/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl github.com/bytedance/gopkg v0.0.0-20230728082804-614d0af6619b/go.mod h1:FtQG3YbQG9L/91pbKSw787yBQPutC+457AvDW77fgUQ= github.com/bytedance/gopkg v0.0.0-20240507064146-197ded923ae3/go.mod h1:FtQG3YbQG9L/91pbKSw787yBQPutC+457AvDW77fgUQ= github.com/bytedance/gopkg v0.0.0-20240514070511-01b2cbcf35e1/go.mod h1:FtQG3YbQG9L/91pbKSw787yBQPutC+457AvDW77fgUQ= -github.com/bytedance/gopkg v0.1.0 h1:aAxB7mm1qms4Wz4sp8e1AtKDOeFLtdqvGiUe7aonRJs= -github.com/bytedance/gopkg v0.1.0/go.mod h1:FtQG3YbQG9L/91pbKSw787yBQPutC+457AvDW77fgUQ= +github.com/bytedance/gopkg v0.1.1 h1:3azzgSkiaw79u24a+w9arfH8OfnQQ4MHUt9lJFREEaE= +github.com/bytedance/gopkg v0.1.1/go.mod h1:576VvJ+eJgyCzdjS+c4+77QF3p7ubbtiKARP3TxducM= github.com/bytedance/sonic v1.11.6/go.mod h1:LysEHSvpvDySVdC2f87zGWf6CIKJcAvqab1ZaiQtds4= github.com/bytedance/sonic v1.11.8 h1:Zw/j1KfiS+OYTi9lyB3bb0CFxPJVkM17k1wyDG32LRA= github.com/bytedance/sonic v1.11.8/go.mod h1:LysEHSvpvDySVdC2f87zGWf6CIKJcAvqab1ZaiQtds4= @@ -914,6 +914,7 @@ github.com/spf13/afero v1.9.2/go.mod h1:iUV7ddyEEZPO5gA3zD4fJt6iStLlL+Lg4m2cihcD github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= @@ -924,8 +925,10 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/stretchr/testify v1.8.3 h1:RP3t2pwF7cMEbC1dqtB6poj3niw/9gnV4Cjg5oW5gtY= github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/tidwall/gjson v1.9.3 h1:hqzS9wAHMO+KVBBkLxYdkEeeFHuqr95GfClRLKlgK0E= github.com/tidwall/gjson v1.9.3/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= @@ -969,6 +972,8 @@ golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= +golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= +golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -1085,8 +1090,10 @@ golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= -golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= +golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w= +golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -1131,8 +1138,9 @@ golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -1213,8 +1221,10 @@ golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= +golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc= @@ -1225,6 +1235,8 @@ golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U= golang.org/x/term v0.7.0/go.mod h1:P32HKFT3hSsZrRxla30E9HqToFYAQPCMs/zFMBUFqPY= golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= +golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= +golang.org/x/term v0.19.0/go.mod h1:2CuTdWZ7KHSQwUzKva0cbMg6q2DMI3Mmxp+gKJbskEk= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -1241,8 +1253,9 @@ golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= -golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= diff --git a/xdssuite/circuitbreak.go b/xdssuite/circuitbreak.go index cbb666b..7474b31 100644 --- a/xdssuite/circuitbreak.go +++ b/xdssuite/circuitbreak.go @@ -86,7 +86,8 @@ func updateCircuitPolicy(res map[string]xdsresource.Resource, handler func(map[s } // NewCircuitBreaker integrate xds config and kitex circuitbreaker -func NewCircuitBreaker() client.Option { +func NewCircuitBreaker(opts ...Option) client.Option { + opt := NewOptions(opts) m := xdsResourceManager.getManager() if m == nil { panic("xds resource manager has not been initialized") @@ -98,7 +99,10 @@ func NewCircuitBreaker() client.Option { m.RegisterXDSUpdateHandler(xdsresource.ClusterType, func(res map[string]xdsresource.Resource) { updateCircuitPolicy(res, cb.updateAllCircuitConfigs) }) - return client.WithCircuitBreaker(cb.cb) + if opt.enableServiceCircuitBreak { + return client.WithCircuitBreaker(cb.cb) + } + return client.WithInstanceMW(cb.cb.ServiceCBMW()) } // keep consistent when initialising the circuit breaker suit and updating diff --git a/xdssuite/option.go b/xdssuite/option.go index f6b88b7..f83782e 100644 --- a/xdssuite/option.go +++ b/xdssuite/option.go @@ -29,8 +29,10 @@ type routerMetaExtractor func(context.Context) map[string]string // Options for xds suite type Options struct { - routerMetaExtractor routerMetaExtractor // use metainfo.GetAllValues by default. - servicePort uint32 + routerMetaExtractor routerMetaExtractor // use metainfo.GetAllValues by default. + servicePort uint32 + matchRetryMethod bool + enableServiceCircuitBreak bool } func (o *Options) Apply(opts []Option) { @@ -53,6 +55,33 @@ func NewOptions(opts []Option) *Options { return o } +// WithServicePort configures the service port, used for rate limit. +func WithServicePort(port uint32) Option { + return Option{ + F: func(o *Options) { + o.servicePort = port + }, + } +} + +// WithMatchRetryMethod configures the flag of matchRetryMethod +func WithMatchRetryMethod(match bool) Option { + return Option{ + F: func(o *Options) { + o.matchRetryMethod = match + }, + } +} + +// WithServiceCircuitBreak if enable service dimension circuitbreak +func WithServiceCircuitBreak(enable bool) Option { + return Option{ + F: func(o *Options) { + o.enableServiceCircuitBreak = enable + }, + } +} + // WithRouterMetaExtractor configures the extractor for metadata func WithRouterMetaExtractor(routerMetaExtractor routerMetaExtractor) Option { return Option{ @@ -77,8 +106,8 @@ func NewClientSuite(opts ...Option) *clientSuite { RouterMiddleware: NewXDSRouterMiddleware(opts...), Resolver: NewXDSResolver(), }), - NewCircuitBreaker(), - NewRetryPolicy(), + NewCircuitBreaker(opts...), + NewRetryPolicy(opts...), } return &clientSuite{cOpts} } diff --git a/xdssuite/resolver.go b/xdssuite/resolver.go index a7c22a1..0d73e8f 100644 --- a/xdssuite/resolver.go +++ b/xdssuite/resolver.go @@ -92,7 +92,11 @@ func (r *XDSResolver) getEndpoints(ctx context.Context, desc string) ([]*xdsreso return nil, fmt.Errorf("no endpoints for cluster: %s", desc) } // TODO: filter localities - return endpoints.Localities[0].Endpoints, nil + eps := make([]*xdsresource.Endpoint, 0, len(endpoints.Localities)*3) + for _, locality := range endpoints.Localities { + eps = append(eps, locality.Endpoints...) + } + return eps, nil } // Diff implements the Resolver interface. Use DefaultDiff. diff --git a/xdssuite/retry.go b/xdssuite/retry.go index 56587b0..d224972 100644 --- a/xdssuite/retry.go +++ b/xdssuite/retry.go @@ -21,19 +21,19 @@ import ( "sync/atomic" "github.com/cloudwego/kitex/client" + "github.com/cloudwego/kitex/pkg/klog" "github.com/cloudwego/kitex/pkg/retry" "github.com/cloudwego/kitex/pkg/rpcinfo" + "github.com/cloudwego/kitex/pkg/rpcinfo/remoteinfo" "github.com/kitex-contrib/xds/core/xdsresource" ) -const ( - wildcardRetryKey = "*" -) - type retrySuit struct { lastPolicies atomic.Value *retry.Container + router *XDSRouter + matchMethod bool } func updateRetryPolicy(rc *retrySuit, res map[string]xdsresource.Resource) { @@ -43,7 +43,6 @@ func updateRetryPolicy(rc *retrySuit, res map[string]xdsresource.Resource) { lastPolicies = val.(map[string]struct{}) } - var wildcarRetryPolicy *retry.Policy thisPolicies := make(map[string]struct{}) defer rc.lastPolicies.Store(thisPolicies) for _, resource := range res { @@ -65,7 +64,10 @@ func updateRetryPolicy(rc *retrySuit, res map[string]xdsresource.Resource) { RetrySameNode: false, StopPolicy: retry.StopPolicy{ MaxRetryTimes: route.RetryPolicy.NumRetries, - MaxDurationMS: uint32(route.RetryPolicy.PerTryTimeout.Milliseconds()), + MaxDurationMS: uint32(route.RetryPolicy.PerTryTimeout.Milliseconds()) * uint32(route.RetryPolicy.NumRetries), + CBPolicy: retry.CBPolicy{ + ErrorRate: route.RetryPolicy.CBErrorRate, + }, }, }, } @@ -88,22 +90,17 @@ func updateRetryPolicy(rc *retrySuit, res map[string]xdsresource.Resource) { } retryPolicy.FailurePolicy.BackOffPolicy = bop rc.Container.NotifyPolicyChange(cluster.Name, retryPolicy) - // FIXME: The logic of retry is before the router, the value of key RouterClusterKey - // can't be found, use wildcard temporary and set the global policy here. And it recommend - // using envoyfilter to config the retry policy. - wildcarRetryPolicy = retryPolicy.DeepCopy() + for _, method := range route.RetryPolicy.Methods { + key := retryPolicyKey(cluster.Name, method) + thisPolicies[key] = struct{}{} + rc.Container.NotifyPolicyChange(key, *retryPolicy.DeepCopy()) + } } } } } - if wildcarRetryPolicy == nil { - wildcarRetryPolicy = &retry.Policy{ - Enable: false, - } - } - rc.Container.NotifyPolicyChange(wildcardRetryKey, *wildcarRetryPolicy) - + klog.Debugf("[XDS] update retry policy: %v", thisPolicies) for key := range lastPolicies { if _, ok := thisPolicies[key]; !ok { rc.Container.DeletePolicy(key) @@ -111,35 +108,62 @@ func updateRetryPolicy(rc *retrySuit, res map[string]xdsresource.Resource) { } } +func retryPolicyKey(cluster, method string) string { + return cluster + "|" + method +} + // NewRetryPolicy integrate xds config and kitex circuitbreaker -func NewRetryPolicy() client.Option { +func NewRetryPolicy(opts ...Option) client.Option { + opt := NewOptions(opts) m := xdsResourceManager.getManager() if m == nil { panic("xds resource manager has not been initialized") } - retry := &retrySuit{ - Container: retry.NewRetryContainer(retry.WithCustomizeKeyFunc(genRetryServiceKey)), + rs := &retrySuit{ + router: NewXDSRouter(opts...), + matchMethod: opt.matchRetryMethod, } + rs.Container = retry.NewRetryContainer(retry.WithCustomizeKeyFunc(rs.genRetryServiceKey)) m.RegisterXDSUpdateHandler(xdsresource.RouteConfigType, func(res map[string]xdsresource.Resource) { - updateRetryPolicy(retry, res) + updateRetryPolicy(rs, res) }) - return client.WithRetryContainer(retry.Container) + return client.WithRetryContainer(rs.Container) } // keep consistent when initialising the circuit breaker suit and updating // the retry policy. -func genRetryServiceKey(ctx context.Context, ri rpcinfo.RPCInfo) string { +func (rs *retrySuit) genRetryServiceKey(ctx context.Context, ri rpcinfo.RPCInfo) string { if ri == nil { return "" } + dest := ri.To() + if dest == nil { + return "" + } + // the value of RouterClusterKey is stored in route process. - // FIXME: The logic of retry is before the router, the value of key RouterClusterKey - // can't be found, use wildcard temporary. - key, _ := ri.To().Tag(RouterClusterKey) - if key == "" { - return wildcardRetryKey + key, exist := ri.To().Tag(RouterClusterKey) + if exist { + return key + } + res, err := rs.router.Route(ctx, ri) + if err != nil { + klog.Warnf("[XDS] get router key failed err: %v", err) + return "" + } + // set destination + _ = remoteinfo.AsRemoteInfo(dest).SetTag(RouterClusterKey, res.ClusterPicked) + remoteinfo.AsRemoteInfo(dest).SetTagLock(RouterClusterKey) + // set timeout + _ = rpcinfo.AsMutableRPCConfig(ri.Config()).SetRPCTimeout(res.RPCTimeout) + return rs.retryPolicyKey(res.ClusterPicked, dest.Method()) +} + +func (rs *retrySuit) retryPolicyKey(cluster, method string) string { + if !rs.matchMethod { + return cluster } - return key + return retryPolicyKey(cluster, method) } diff --git a/xdssuite/retry_test.go b/xdssuite/retry_test.go index 401d8fe..c1ff797 100644 --- a/xdssuite/retry_test.go +++ b/xdssuite/retry_test.go @@ -120,7 +120,7 @@ func TestRetry(t *testing.T) { RetrySameNode: false, StopPolicy: retry.StopPolicy{ MaxRetryTimes: 2, - MaxDurationMS: 1000, + MaxDurationMS: 2000, }, BackOffPolicy: &retry.BackOffPolicy{ BackOffType: retry.FixedBackOffType, @@ -137,7 +137,7 @@ func TestRetry(t *testing.T) { RetrySameNode: false, StopPolicy: retry.StopPolicy{ MaxRetryTimes: 3, - MaxDurationMS: 1000, + MaxDurationMS: 3000, }, BackOffPolicy: &retry.BackOffPolicy{ BackOffType: retry.RandomBackOffType, @@ -182,7 +182,7 @@ func TestRetry(t *testing.T) { RetrySameNode: false, StopPolicy: retry.StopPolicy{ MaxRetryTimes: 2, - MaxDurationMS: 5000, + MaxDurationMS: 10000, }, BackOffPolicy: &retry.BackOffPolicy{ BackOffType: retry.NoneBackOffType, diff --git a/xdssuite/router.go b/xdssuite/router.go index 59ded83..e6ccd8f 100644 --- a/xdssuite/router.go +++ b/xdssuite/router.go @@ -44,6 +44,11 @@ func NewXDSRouterMiddleware(opts ...Option) endpoint.Middleware { return func(next endpoint.Endpoint) endpoint.Endpoint { return func(ctx context.Context, request, response interface{}) error { ri := rpcinfo.GetRPCInfo(ctx) + _, exist := ri.To().Tag(RouterClusterKey) + if exist { + return next(ctx, request, response) + } + dest := ri.To() if dest == nil { return kerrors.ErrNoDestService