Skip to content

Commit

Permalink
Merge pull request #301 from weaveworks/fix-grpc-msg-size
Browse files Browse the repository at this point in the history
make the grpc max message size configurable
  • Loading branch information
Chanwit Kaewkasi authored Aug 17, 2022
2 parents 8ea566a + bee3d2e commit 31d1688
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 45 deletions.
2 changes: 1 addition & 1 deletion charts/tf-controller/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ apiVersion: v2
name: tf-controller
description: The Helm chart for Weave GitOps Terraform Controller
type: application
version: 0.5.0
version: 0.5.1
appVersion: "v0.12.0-rc.1"
3 changes: 2 additions & 1 deletion charts/tf-controller/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ spec:
- --ca-cert-validity-duration={{ .Values.caCertValidityDuration }}
- --cert-rotation-check-frequency={{ .Values.certRotationCheckFrequency }}
- --cert-validity-duration={{ .Values.certValidityDuration }}
- --runner-creation-timeout={{ .Values.runnerCreationTimeout }}
- --runner-creation-timeout={{ .Values.runner.creationTimeout }}
- --runner-grpc-max-message-size={{ .Values.runner.grpc.maxMessageSize }}
command:
- /sbin/tini
- --
Expand Down
4 changes: 3 additions & 1 deletion charts/tf-controller/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ caCertValidityDuration: 168h0m
certRotationCheckFrequency: 30m0s
# Deprecated
certValidityDuration: 6h0m
runnerCreationTimeout: 5m0s
logLevel: info
image:
repository: ghcr.io/weaveworks/tf-controller
Expand All @@ -28,6 +27,9 @@ runner:
image:
repository: ghcr.io/weaveworks/tf-runner
tag: "v0.12.0-rc.1"
grpc:
maxMessageSize: 4
creationTimeout: 5m0s
serviceAccount:
# Specifies whether a service account should be created
create: true
Expand Down
49 changes: 26 additions & 23 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,21 +63,22 @@ func init() {

func main() {
var (
metricsAddr string
eventsAddr string
healthAddr string
concurrent int
requeueDependency time.Duration
clientOptions client.Options
logOptions logger.Options
leaderElectionOptions leaderelection.Options
watchAllNamespaces bool
httpRetry int
caValidityDuration time.Duration
certValidityDuration time.Duration
rotationCheckFrequency time.Duration
runnerGRPCPort int
runnerCreationTimeout time.Duration
metricsAddr string
eventsAddr string
healthAddr string
concurrent int
requeueDependency time.Duration
clientOptions client.Options
logOptions logger.Options
leaderElectionOptions leaderelection.Options
watchAllNamespaces bool
httpRetry int
caValidityDuration time.Duration
certValidityDuration time.Duration
rotationCheckFrequency time.Duration
runnerGRPCPort int
runnerCreationTimeout time.Duration
runnerGRPCMaxMessageSize int
)

flag.StringVar(&metricsAddr, "metrics-addr", ":8080", "The address the metric endpoint binds to.")
Expand All @@ -96,6 +97,7 @@ func main() {
"The interval that the mTLS certificate rotator should check the certificate validity.")
flag.IntVar(&runnerGRPCPort, "runner-grpc-port", 30000, "The port which will be exposed on the runner pod for gRPC connections.")
flag.DurationVar(&runnerCreationTimeout, "runner-creation-timeout", 120*time.Second, "Timeout for creating a runner pod.")
flag.IntVar(&runnerGRPCMaxMessageSize, "runner-grpc-max-message-size", 4, "The maximum message size for gRPC connections in MiB.")

clientOptions.BindFlags(flag.CommandLine)
logOptions.BindFlags(flag.CommandLine)
Expand Down Expand Up @@ -171,14 +173,15 @@ func main() {
}

reconciler := &controllers.TerraformReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
EventRecorder: eventRecorder,
MetricsRecorder: metricsRecorder,
StatusPoller: polling.NewStatusPoller(mgr.GetClient(), mgr.GetRESTMapper(), polling.Options{}),
CertRotator: rotator,
RunnerGRPCPort: runnerGRPCPort,
RunnerCreationTimeout: runnerCreationTimeout,
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
EventRecorder: eventRecorder,
MetricsRecorder: metricsRecorder,
StatusPoller: polling.NewStatusPoller(mgr.GetClient(), mgr.GetRESTMapper(), polling.Options{}),
CertRotator: rotator,
RunnerGRPCPort: runnerGRPCPort,
RunnerCreationTimeout: runnerCreationTimeout,
RunnerGRPCMaxMessageSize: runnerGRPCMaxMessageSize,
}

if err = reconciler.SetupWithManager(mgr, concurrent, httpRetry); err != nil {
Expand Down
8 changes: 5 additions & 3 deletions cmd/runner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,14 @@ var (

func main() {
var (
grpcPort int
tlsSecretName string
grpcPort int
tlsSecretName string
grpcMaxMessageSize int
)

flag.IntVar(&grpcPort, "grpc-port", 30000, "The port on which to expose the grpc endpoint.")
flag.StringVar(&tlsSecretName, "tls-secret-name", "", "The TLS secret name.")
flag.IntVar(&grpcMaxMessageSize, "grpc-max-message-size", 4, "The maximum size of gRPC messages in MiB.")
flag.Parse()

addr := fmt.Sprintf(":%d", grpcPort)
Expand All @@ -75,7 +77,7 @@ func main() {
signal.Stop(sigterm)
}()

err := mtls.RunnerServe(podNamespace, addr, tlsSecretName, sigterm)
err := mtls.RunnerServe(podNamespace, addr, tlsSecretName, sigterm, grpcMaxMessageSize)
if err != nil {
log.Fatal(err.Error())
}
Expand Down
14 changes: 8 additions & 6 deletions controllers/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,12 +223,14 @@ func TestMain(m *testing.M) {
}

reconciler = &TerraformReconciler{
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
EventRecorder: k8sManager.GetEventRecorderFor("tf-controller"),
StatusPoller: polling.NewStatusPoller(k8sManager.GetClient(), k8sManager.GetRESTMapper(), polling.Options{}),
CertRotator: rotator,
RunnerGRPCPort: 30000,
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
EventRecorder: k8sManager.GetEventRecorderFor("tf-controller"),
StatusPoller: polling.NewStatusPoller(k8sManager.GetClient(), k8sManager.GetRESTMapper(), polling.Options{}),
CertRotator: rotator,
RunnerGRPCPort: 30000,
RunnerCreationTimeout: 120 * time.Second,
RunnerGRPCMaxMessageSize: 4,
}

// We use 1 concurrent and 10s httpRetry in the test
Expand Down
18 changes: 10 additions & 8 deletions controllers/terraform_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,15 @@ import (
// TerraformReconciler reconciles a Terraform object
type TerraformReconciler struct {
client.Client
httpClient *retryablehttp.Client
EventRecorder kuberecorder.EventRecorder
MetricsRecorder *metrics.Recorder
StatusPoller *polling.StatusPoller
Scheme *runtime.Scheme
CertRotator *mtls.CertRotator
RunnerGRPCPort int
RunnerCreationTimeout time.Duration
httpClient *retryablehttp.Client
EventRecorder kuberecorder.EventRecorder
MetricsRecorder *metrics.Recorder
StatusPoller *polling.StatusPoller
Scheme *runtime.Scheme
CertRotator *mtls.CertRotator
RunnerGRPCPort int
RunnerCreationTimeout time.Duration
RunnerGRPCMaxMessageSize int
}

//+kubebuilder:rbac:groups=infra.contrib.fluxcd.io,resources=terraforms,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -2067,6 +2068,7 @@ func (r *TerraformReconciler) runnerPodSpec(terraform infrav1.Terraform, tlsSecr
Args: []string{
"--grpc-port", fmt.Sprintf("%d", r.RunnerGRPCPort),
"--tls-secret-name", tlsSecretName,
"--grpc-max-message-size", fmt.Sprintf("%d", r.RunnerGRPCMaxMessageSize),
},
Image: getRunnerPodImage(terraform.Spec.RunnerPodTemplate.Spec.Image),
ImagePullPolicy: corev1.PullIfNotPresent,
Expand Down
6 changes: 4 additions & 2 deletions mtls/runner_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

func RunnerServe(namespace, addr string, tlsSecretName string, sigterm chan os.Signal) error {
func RunnerServe(namespace, addr string, tlsSecretName string, sigterm chan os.Signal, maxMessageSizeInMiB int) error {
scheme := runtime.NewScheme()

if err := clientgoscheme.AddToScheme(scheme); err != nil {
Expand Down Expand Up @@ -64,7 +64,9 @@ func RunnerServe(namespace, addr string, tlsSecretName string, sigterm chan os.S
return err
}

grpcServer := grpc.NewServer(grpc.Creds(credentials))
// 30 MB is the maximum allowed payload size for gRPC.
maxMsgSize := maxMessageSizeInMiB * 1024 * 1024
grpcServer := grpc.NewServer(grpc.Creds(credentials), grpc.MaxRecvMsgSize(maxMsgSize), grpc.MaxSendMsgSize(maxMsgSize))
runner.RegisterRunnerServer(grpcServer, runnerServer)

if err := grpcServer.Serve(listener); err != nil {
Expand Down

0 comments on commit 31d1688

Please sign in to comment.