Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(kuma-cp): disable statsForAllMethods in grpc stats #5226

Merged
merged 4 commits into from
Oct 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions UPGRADE.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ However, Zone Ingress Token is now deprecated and will be removed in the future.
- By default, the minimum TLS version allowed on servers is TLSv1.2. If you require using TLS < 1.2 you can set `KUMA_GENERAL_TLS_MIN_VERSION`.
- `KUMA_MONITORING_ASSIGNMENT_SERVER_GRPC_PORT` was removed after a long deprecation period use `KUMA_MONITORING_ASSIGNMENT_SERVER_PORT` instead.

### gRPC metrics

With this release, emitting separate statistics for every gRPC method is disabled.
gRPC metrics from different methods are now aggregated under `envoy_cluster_grpc_request_message_count`.
It will be re-enabled again in the future once Envoy with [`replace_dots_in_grpc_service_name`](https://www.envoyproxy.io/docs/envoy/latest/api-v3/extensions/filters/http/grpc_stats/v3/config.proto#envoy-v3-api-field-extensions-filters-http-grpc-stats-v3-filterconfig-stats-for-all-methods) feature is released.
If you need to enable this setting, you can use ProxyTemplate to patch `envoy.filters.http.grpc_stats` http filter.

## Upgrade to `1.8.x`

### Kumactl
Expand Down
3 changes: 0 additions & 3 deletions pkg/xds/envoy/listeners/v3/grpc_stats_configurer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ var _ FilterChainConfigurer = &GrpcStatsConfigurer{}
func (g *GrpcStatsConfigurer) Configure(filterChain *envoy_listener.FilterChain) error {
config := &envoy_grpc_stats.FilterConfig{
EmitFilterState: true,
PerMethodStatSpecifier: &envoy_grpc_stats.FilterConfig_StatsForAllMethods{
StatsForAllMethods: util_proto.Bool(true),
},
}
pbst, err := util_proto.MarshalAnyDeterministic(config)
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion pkg/xds/envoy/listeners/v3/grpc_stats_configurer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ var _ = Describe("gRPCStatsConfigurer", func() {
typedConfig:
'@type': type.googleapis.com/envoy.extensions.filters.http.grpc_stats.v3.FilterConfig
emitFilterState: true
statsForAllMethods: true
- name: envoy.filters.http.router
typedConfig:
'@type': type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,6 @@ resources:
typedConfig:
'@type': type.googleapis.com/envoy.extensions.filters.http.grpc_stats.v3.FilterConfig
emitFilterState: true
statsForAllMethods: true
- name: envoy.filters.http.router
typedConfig:
'@type': type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
Expand Down
55 changes: 55 additions & 0 deletions test/e2e_env/universal/grpc/grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package grpc

import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"github.com/kumahq/kuma/test/e2e_env/universal/env"
. "github.com/kumahq/kuma/test/framework"
)

func GRPC() {
meshName := "grpc"

BeforeAll(func() {
Expect(NewClusterSetup().
Install(MeshUniversal(meshName)).
Install(TestServerUniversal("test-server", meshName,
WithServiceName("test-server"),
WithProtocol("grpc"),
WithArgs([]string{"grpc", "server", "--port", "8080"}),
WithTransparentProxy(true),
)).
Install(TestServerUniversal("test-client", meshName,
WithServiceName("test-client"),
WithArgs([]string{"grpc", "client", "--address", "test-server.mesh:80"}),
WithTransparentProxy(true),
)).
Setup(env.Cluster)).To(Succeed())
})

E2EAfterAll(func() {
Expect(env.Cluster.DeleteMeshApps(meshName)).To(Succeed())
Expect(env.Cluster.DeleteMesh(meshName)).To(Succeed())
})

It("should emit stats from the server", func() {
Eventually(func(g Gomega) {
stdout, _, err := env.Cluster.Exec("", "", "test-server",
"curl", "-v", "--fail", "http://localhost:9901/stats?format=prometheus")
g.Expect(err).ToNot(HaveOccurred())
g.Expect(stdout).To(ContainSubstring(`envoy_cluster_grpc_request_message_count{envoy_cluster_name="localhost_8080"}`))
g.Expect(stdout).To(ContainSubstring(`envoy_cluster_grpc_response_message_count{envoy_cluster_name="localhost_8080"}`))
}, "30s", "1s").Should(Succeed())
})

It("should emit stats from the client", func() {
Eventually(func(g Gomega) {
stdout, _, err := env.Cluster.Exec("", "", "test-client",
"curl", "-v", "--fail", "http://localhost:9901/stats?format=prometheus")
g.Expect(err).ToNot(HaveOccurred())
g.Expect(stdout).To(ContainSubstring(`envoy_cluster_grpc_request_message_count{envoy_cluster_name="test-server"}`))
g.Expect(stdout).To(ContainSubstring(`envoy_cluster_grpc_response_message_count{envoy_cluster_name="test-server"}`))
}, "30s", "1s").Should(Succeed())
})
}
2 changes: 2 additions & 0 deletions test/e2e_env/universal/universal_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/kumahq/kuma/test/e2e_env/universal/env"
"github.com/kumahq/kuma/test/e2e_env/universal/externalservices"
"github.com/kumahq/kuma/test/e2e_env/universal/gateway"
"github.com/kumahq/kuma/test/e2e_env/universal/grpc"
"github.com/kumahq/kuma/test/e2e_env/universal/healthcheck"
"github.com/kumahq/kuma/test/e2e_env/universal/inspect"
"github.com/kumahq/kuma/test/e2e_env/universal/matching"
Expand Down Expand Up @@ -114,3 +115,4 @@ var _ = Describe("Zone Egress", zoneegress.ExternalServices, Ordered)
var _ = Describe("Virtual Outbound", virtualoutbound.VirtualOutbound, Ordered)
var _ = Describe("Transparent Proxy", transparentproxy.TransparentProxy, Ordered)
var _ = Describe("Mesh Traffic Permission", meshtrafficpermission.MeshTrafficPermissionUniversal, Ordered)
var _ = Describe("GRPC", grpc.GRPC, Ordered)
4 changes: 3 additions & 1 deletion test/framework/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,9 @@ func TestServerUniversal(name string, mesh string, opt ...AppDeploymentOption) I
serviceAddress = fmt.Sprintf(` serviceAddress: %s`, opts.serviceAddress)
}

args = append(args, "--port", "8080")
if len(args) < 2 || args[1] != "grpc" { // grpc client does not have port
args = append(args, "--port", "8080")
}
appYaml := fmt.Sprintf(`
type: Dataplane
mesh: %s
Expand Down
86 changes: 45 additions & 41 deletions test/server/cmd/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"time"

"github.com/pkg/errors"
"github.com/spf13/cobra"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
Expand All @@ -28,54 +29,57 @@ func newGRPCClientCmd() *cobra.Command {
# sends "Request #${n_of_streams}.${n_of_requests}" every 2 seconds.
test-server grpc client address="localhost:8080"`,
RunE: func(cmd *cobra.Command, _ []string) error {
conn, err := grpc.Dial(args.address,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 10 * time.Second,
}),
)
if err != nil {
grpcClientLog.Error(err, "dial failed")
return err
}
defer conn.Close()

c := api.NewGreeterClient(conn)

streamCounter := 0
for {
clientStream, err := c.SayHellos(context.Background())
if err != nil {
grpcClientLog.Error(err, "SayHellos failed")
return err
streamCounter++
if err := startSendingRequests(args.address, streamCounter); err != nil {
grpcClientLog.Error(err, "sending requests failed")
}
time.Sleep(1 * time.Second)
}
},
}
cmd.PersistentFlags().StringVar(&args.address, "address", "localhost:8080", "server address")
return cmd
}

requestCounter := 0
for {
err := clientStream.Send(&api.HelloRequest{
Name: fmt.Sprintf("Request #%d.%d", streamCounter, requestCounter),
})
if err != nil {
grpcClientLog.Error(err, "Send failed")
break
}
func startSendingRequests(address string, streamCounter int) error {
conn, err := grpc.Dial(address,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 10 * time.Second,
}),
)
if err != nil {
return errors.Wrap(err, "dial failed")
}
defer conn.Close()

resp, err := clientStream.Recv()
if err != nil {
grpcClientLog.Error(err, "Recv failed")
break
}
grpcClientLog.Info("received response", "msg", resp.GetMessage())
c := api.NewGreeterClient(conn)
for {
clientStream, err := c.SayHellos(context.Background())
if err != nil {
return errors.Wrap(err, "SayHellos failed")
}

time.Sleep(2 * time.Second)
requestCounter++
}
requestCounter := 0
for {
err := clientStream.Send(&api.HelloRequest{
Name: fmt.Sprintf("Request #%d.%d", streamCounter, requestCounter),
})
if err != nil {
return errors.Wrap(err, "Send failed")
}

streamCounter++
resp, err := clientStream.Recv()
if err != nil {
return errors.Wrap(err, "Recv failed")
}
},
grpcClientLog.Info("received response", "msg", resp.GetMessage())

time.Sleep(2 * time.Second)
requestCounter++
}
}
cmd.PersistentFlags().StringVar(&args.address, "address", "localhost:8080", "server address")
return cmd
}