From bee3d2eb7a601c21e785e4f5535b432b83ad3bc6 Mon Sep 17 00:00:00 2001 From: Chanwit Kaewkasi Date: Wed, 17 Aug 2022 00:19:28 +0700 Subject: [PATCH] make the grpc max message size configurable Signed-off-by: Chanwit Kaewkasi --- charts/tf-controller/Chart.yaml | 2 +- .../tf-controller/templates/deployment.yaml | 3 +- charts/tf-controller/values.yaml | 4 +- cmd/manager/main.go | 49 ++++++++++--------- cmd/runner/main.go | 8 +-- controllers/suite_test.go | 14 +++--- controllers/terraform_controller.go | 18 ++++--- mtls/runner_serve.go | 6 ++- 8 files changed, 59 insertions(+), 45 deletions(-) diff --git a/charts/tf-controller/Chart.yaml b/charts/tf-controller/Chart.yaml index 730e1a76..820e7486 100644 --- a/charts/tf-controller/Chart.yaml +++ b/charts/tf-controller/Chart.yaml @@ -2,5 +2,5 @@ apiVersion: v2 name: tf-controller description: The Helm chart for Weave GitOps Terraform Controller type: application -version: 0.5.0 +version: 0.5.1 appVersion: "v0.12.0-rc.1" diff --git a/charts/tf-controller/templates/deployment.yaml b/charts/tf-controller/templates/deployment.yaml index f5e7ec8c..3d79e3b5 100644 --- a/charts/tf-controller/templates/deployment.yaml +++ b/charts/tf-controller/templates/deployment.yaml @@ -34,7 +34,8 @@ spec: - --ca-cert-validity-duration={{ .Values.caCertValidityDuration }} - --cert-rotation-check-frequency={{ .Values.certRotationCheckFrequency }} - --cert-validity-duration={{ .Values.certValidityDuration }} - - --runner-creation-timeout={{ .Values.runnerCreationTimeout }} + - --runner-creation-timeout={{ .Values.runner.creationTimeout }} + - --runner-grpc-max-message-size={{ .Values.runner.grpc.maxMessageSize }} command: - /sbin/tini - -- diff --git a/charts/tf-controller/values.yaml b/charts/tf-controller/values.yaml index 3b07802f..b2956323 100644 --- a/charts/tf-controller/values.yaml +++ b/charts/tf-controller/values.yaml @@ -4,7 +4,6 @@ caCertValidityDuration: 168h0m certRotationCheckFrequency: 30m0s # Deprecated certValidityDuration: 6h0m -runnerCreationTimeout: 5m0s logLevel: info image: repository: ghcr.io/weaveworks/tf-controller @@ -28,6 +27,9 @@ runner: image: repository: ghcr.io/weaveworks/tf-runner tag: "v0.12.0-rc.1" + grpc: + maxMessageSize: 4 + creationTimeout: 5m0s serviceAccount: # Specifies whether a service account should be created create: true diff --git a/cmd/manager/main.go b/cmd/manager/main.go index 4da10696..b52863db 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -63,21 +63,22 @@ func init() { func main() { var ( - metricsAddr string - eventsAddr string - healthAddr string - concurrent int - requeueDependency time.Duration - clientOptions client.Options - logOptions logger.Options - leaderElectionOptions leaderelection.Options - watchAllNamespaces bool - httpRetry int - caValidityDuration time.Duration - certValidityDuration time.Duration - rotationCheckFrequency time.Duration - runnerGRPCPort int - runnerCreationTimeout time.Duration + metricsAddr string + eventsAddr string + healthAddr string + concurrent int + requeueDependency time.Duration + clientOptions client.Options + logOptions logger.Options + leaderElectionOptions leaderelection.Options + watchAllNamespaces bool + httpRetry int + caValidityDuration time.Duration + certValidityDuration time.Duration + rotationCheckFrequency time.Duration + runnerGRPCPort int + runnerCreationTimeout time.Duration + runnerGRPCMaxMessageSize int ) flag.StringVar(&metricsAddr, "metrics-addr", ":8080", "The address the metric endpoint binds to.") @@ -96,6 +97,7 @@ func main() { "The interval that the mTLS certificate rotator should check the certificate validity.") flag.IntVar(&runnerGRPCPort, "runner-grpc-port", 30000, "The port which will be exposed on the runner pod for gRPC connections.") flag.DurationVar(&runnerCreationTimeout, "runner-creation-timeout", 120*time.Second, "Timeout for creating a runner pod.") + flag.IntVar(&runnerGRPCMaxMessageSize, "runner-grpc-max-message-size", 4, "The maximum message size for gRPC connections in MiB.") clientOptions.BindFlags(flag.CommandLine) logOptions.BindFlags(flag.CommandLine) @@ -171,14 +173,15 @@ func main() { } reconciler := &controllers.TerraformReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - EventRecorder: eventRecorder, - MetricsRecorder: metricsRecorder, - StatusPoller: polling.NewStatusPoller(mgr.GetClient(), mgr.GetRESTMapper(), polling.Options{}), - CertRotator: rotator, - RunnerGRPCPort: runnerGRPCPort, - RunnerCreationTimeout: runnerCreationTimeout, + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + EventRecorder: eventRecorder, + MetricsRecorder: metricsRecorder, + StatusPoller: polling.NewStatusPoller(mgr.GetClient(), mgr.GetRESTMapper(), polling.Options{}), + CertRotator: rotator, + RunnerGRPCPort: runnerGRPCPort, + RunnerCreationTimeout: runnerCreationTimeout, + RunnerGRPCMaxMessageSize: runnerGRPCMaxMessageSize, } if err = reconciler.SetupWithManager(mgr, concurrent, httpRetry); err != nil { diff --git a/cmd/runner/main.go b/cmd/runner/main.go index 260e6895..474bedbf 100644 --- a/cmd/runner/main.go +++ b/cmd/runner/main.go @@ -47,12 +47,14 @@ var ( func main() { var ( - grpcPort int - tlsSecretName string + grpcPort int + tlsSecretName string + grpcMaxMessageSize int ) flag.IntVar(&grpcPort, "grpc-port", 30000, "The port on which to expose the grpc endpoint.") flag.StringVar(&tlsSecretName, "tls-secret-name", "", "The TLS secret name.") + flag.IntVar(&grpcMaxMessageSize, "grpc-max-message-size", 4, "The maximum size of gRPC messages in MiB.") flag.Parse() addr := fmt.Sprintf(":%d", grpcPort) @@ -75,7 +77,7 @@ func main() { signal.Stop(sigterm) }() - err := mtls.RunnerServe(podNamespace, addr, tlsSecretName, sigterm) + err := mtls.RunnerServe(podNamespace, addr, tlsSecretName, sigterm, grpcMaxMessageSize) if err != nil { log.Fatal(err.Error()) } diff --git a/controllers/suite_test.go b/controllers/suite_test.go index d1b1635a..90ee0d27 100644 --- a/controllers/suite_test.go +++ b/controllers/suite_test.go @@ -223,12 +223,14 @@ func TestMain(m *testing.M) { } reconciler = &TerraformReconciler{ - Client: k8sManager.GetClient(), - Scheme: k8sManager.GetScheme(), - EventRecorder: k8sManager.GetEventRecorderFor("tf-controller"), - StatusPoller: polling.NewStatusPoller(k8sManager.GetClient(), k8sManager.GetRESTMapper(), polling.Options{}), - CertRotator: rotator, - RunnerGRPCPort: 30000, + Client: k8sManager.GetClient(), + Scheme: k8sManager.GetScheme(), + EventRecorder: k8sManager.GetEventRecorderFor("tf-controller"), + StatusPoller: polling.NewStatusPoller(k8sManager.GetClient(), k8sManager.GetRESTMapper(), polling.Options{}), + CertRotator: rotator, + RunnerGRPCPort: 30000, + RunnerCreationTimeout: 120 * time.Second, + RunnerGRPCMaxMessageSize: 4, } // We use 1 concurrent and 10s httpRetry in the test diff --git a/controllers/terraform_controller.go b/controllers/terraform_controller.go index 4fc24c46..88f3168e 100644 --- a/controllers/terraform_controller.go +++ b/controllers/terraform_controller.go @@ -74,14 +74,15 @@ import ( // TerraformReconciler reconciles a Terraform object type TerraformReconciler struct { client.Client - httpClient *retryablehttp.Client - EventRecorder kuberecorder.EventRecorder - MetricsRecorder *metrics.Recorder - StatusPoller *polling.StatusPoller - Scheme *runtime.Scheme - CertRotator *mtls.CertRotator - RunnerGRPCPort int - RunnerCreationTimeout time.Duration + httpClient *retryablehttp.Client + EventRecorder kuberecorder.EventRecorder + MetricsRecorder *metrics.Recorder + StatusPoller *polling.StatusPoller + Scheme *runtime.Scheme + CertRotator *mtls.CertRotator + RunnerGRPCPort int + RunnerCreationTimeout time.Duration + RunnerGRPCMaxMessageSize int } //+kubebuilder:rbac:groups=infra.contrib.fluxcd.io,resources=terraforms,verbs=get;list;watch;create;update;patch;delete @@ -2067,6 +2068,7 @@ func (r *TerraformReconciler) runnerPodSpec(terraform infrav1.Terraform, tlsSecr Args: []string{ "--grpc-port", fmt.Sprintf("%d", r.RunnerGRPCPort), "--tls-secret-name", tlsSecretName, + "--grpc-max-message-size", fmt.Sprintf("%d", r.RunnerGRPCMaxMessageSize), }, Image: getRunnerPodImage(terraform.Spec.RunnerPodTemplate.Spec.Image), ImagePullPolicy: corev1.PullIfNotPresent, diff --git a/mtls/runner_serve.go b/mtls/runner_serve.go index 530c93e6..b2873512 100644 --- a/mtls/runner_serve.go +++ b/mtls/runner_serve.go @@ -18,7 +18,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) -func RunnerServe(namespace, addr string, tlsSecretName string, sigterm chan os.Signal) error { +func RunnerServe(namespace, addr string, tlsSecretName string, sigterm chan os.Signal, maxMessageSizeInMiB int) error { scheme := runtime.NewScheme() if err := clientgoscheme.AddToScheme(scheme); err != nil { @@ -64,7 +64,9 @@ func RunnerServe(namespace, addr string, tlsSecretName string, sigterm chan os.S return err } - grpcServer := grpc.NewServer(grpc.Creds(credentials)) + // 30 MB is the maximum allowed payload size for gRPC. + maxMsgSize := maxMessageSizeInMiB * 1024 * 1024 + grpcServer := grpc.NewServer(grpc.Creds(credentials), grpc.MaxRecvMsgSize(maxMsgSize), grpc.MaxSendMsgSize(maxMsgSize)) runner.RegisterRunnerServer(grpcServer, runnerServer) if err := grpcServer.Serve(listener); err != nil {