diff --git a/api/models/cluster_create.go b/api/models/cluster_create.go index c314f30..c8f5692 100644 --- a/api/models/cluster_create.go +++ b/api/models/cluster_create.go @@ -20,6 +20,12 @@ type ClusterCreate struct { // annotations Annotations map[string]string `json:"annotations,omitempty"` + // cluster name + Name string `json:"name,omitempty"` + + // cluster namespace + Namespace string `json:"namespace,omitempty"` + // cluster version Version string `json:"version,omitempty"` } diff --git a/api/models/cluster_delete.go b/api/models/cluster_delete.go index b92a8bc..972989c 100644 --- a/api/models/cluster_delete.go +++ b/api/models/cluster_delete.go @@ -19,6 +19,12 @@ type ClusterDelete struct { // true means force delete, default false Force *bool `json:"force,omitempty"` + + // cluster name + Name string `json:"name,omitempty"` + + // cluster namespace + Namespace string `json:"namespace,omitempty"` } // Validate validates this cluster delete diff --git a/api/models/cluster_get.go b/api/models/cluster_get.go new file mode 100644 index 0000000..5952b8b --- /dev/null +++ b/api/models/cluster_get.go @@ -0,0 +1,53 @@ +// Code generated by go-swagger; DO NOT EDIT. + +package models + +// This file was generated by the swagger tool. +// Editing this file might prove futile when you re-run the swagger generate command + +import ( + "context" + + "github.com/go-openapi/strfmt" + "github.com/go-openapi/swag" +) + +// ClusterGet cluster get params +// +// swagger:model Cluster_get +type ClusterGet struct { + + // cluster name + Name string `json:"name,omitempty"` + + // cluster namespace + Namespace string `json:"namespace,omitempty"` +} + +// Validate validates this cluster get +func (m *ClusterGet) Validate(formats strfmt.Registry) error { + return nil +} + +// ContextValidate validates this cluster get based on context it is used +func (m *ClusterGet) ContextValidate(ctx context.Context, formats strfmt.Registry) error { + return nil +} + +// MarshalBinary interface implementation +func (m *ClusterGet) MarshalBinary() ([]byte, error) { + if m == nil { + return nil, nil + } + return swag.WriteJSON(m) +} + +// UnmarshalBinary interface implementation +func (m *ClusterGet) UnmarshalBinary(b []byte) error { + var res ClusterGet + if err := swag.ReadJSON(b, &res); err != nil { + return err + } + *m = res + return nil +} diff --git a/api/models/cluster_info.go b/api/models/cluster_info.go index 9b9ad47..f4b0ca3 100644 --- a/api/models/cluster_info.go +++ b/api/models/cluster_info.go @@ -17,6 +17,12 @@ import ( // swagger:model Cluster_info type ClusterInfo struct { + // cluster name + Name string `json:"name,omitempty"` + + // cluster namespace + Namespace string `json:"namespace,omitempty"` + // cluster status Status string `json:"status,omitempty"` diff --git a/api/models/cluster_patch.go b/api/models/cluster_patch.go index 6c231bc..30815a3 100644 --- a/api/models/cluster_patch.go +++ b/api/models/cluster_patch.go @@ -20,6 +20,12 @@ type ClusterPatch struct { // annotations Annotations map[string]string `json:"annotations,omitempty"` + // cluster name + Name string `json:"name,omitempty"` + + // cluster namespace + Namespace string `json:"namespace,omitempty"` + // cluster version Version string `json:"version,omitempty"` } diff --git a/api/restapi/embedded_spec.go b/api/restapi/embedded_spec.go index 2c7f3de..da29ee3 100644 --- a/api/restapi/embedded_spec.go +++ b/api/restapi/embedded_spec.go @@ -42,6 +42,17 @@ func init() { "cluster" ], "operationId": "getCluster", + "parameters": [ + { + "name": "get", + "in": "body", + "required": true, + "schema": { + "type": "object", + "$ref": "#/definitions/Cluster_get" + } + } + ], "responses": { "200": { "description": "OK", @@ -553,6 +564,14 @@ func init() { "type": "string" } }, + "name": { + "description": "cluster name", + "type": "string" + }, + "namespace": { + "description": "cluster namespace", + "type": "string" + }, "version": { "description": "cluster version", "type": "string" @@ -567,6 +586,28 @@ func init() { "description": "true means force delete, default false", "type": "boolean", "default": false + }, + "name": { + "description": "cluster name", + "type": "string" + }, + "namespace": { + "description": "cluster namespace", + "type": "string" + } + } + }, + "Cluster_get": { + "description": "cluster get params", + "type": "object", + "properties": { + "name": { + "description": "cluster name", + "type": "string" + }, + "namespace": { + "description": "cluster namespace", + "type": "string" } } }, @@ -574,6 +615,14 @@ func init() { "description": "cluster info", "type": "object", "properties": { + "name": { + "description": "cluster name", + "type": "string" + }, + "namespace": { + "description": "cluster namespace", + "type": "string" + }, "status": { "description": "cluster status", "type": "string" @@ -597,6 +646,14 @@ func init() { "type": "string" } }, + "name": { + "description": "cluster name", + "type": "string" + }, + "namespace": { + "description": "cluster namespace", + "type": "string" + }, "version": { "description": "cluster version", "type": "string" @@ -785,6 +842,17 @@ func init() { "cluster" ], "operationId": "getCluster", + "parameters": [ + { + "name": "get", + "in": "body", + "required": true, + "schema": { + "type": "object", + "$ref": "#/definitions/Cluster_get" + } + } + ], "responses": { "200": { "description": "OK", @@ -1296,6 +1364,14 @@ func init() { "type": "string" } }, + "name": { + "description": "cluster name", + "type": "string" + }, + "namespace": { + "description": "cluster namespace", + "type": "string" + }, "version": { "description": "cluster version", "type": "string" @@ -1310,6 +1386,28 @@ func init() { "description": "true means force delete, default false", "type": "boolean", "default": false + }, + "name": { + "description": "cluster name", + "type": "string" + }, + "namespace": { + "description": "cluster namespace", + "type": "string" + } + } + }, + "Cluster_get": { + "description": "cluster get params", + "type": "object", + "properties": { + "name": { + "description": "cluster name", + "type": "string" + }, + "namespace": { + "description": "cluster namespace", + "type": "string" } } }, @@ -1317,6 +1415,14 @@ func init() { "description": "cluster info", "type": "object", "properties": { + "name": { + "description": "cluster name", + "type": "string" + }, + "namespace": { + "description": "cluster namespace", + "type": "string" + }, "status": { "description": "cluster status", "type": "string" @@ -1340,6 +1446,14 @@ func init() { "type": "string" } }, + "name": { + "description": "cluster name", + "type": "string" + }, + "namespace": { + "description": "cluster namespace", + "type": "string" + }, "version": { "description": "cluster version", "type": "string" diff --git a/api/restapi/operations/cluster/get_cluster_parameters.go b/api/restapi/operations/cluster/get_cluster_parameters.go index 6c701e4..bd557f6 100644 --- a/api/restapi/operations/cluster/get_cluster_parameters.go +++ b/api/restapi/operations/cluster/get_cluster_parameters.go @@ -6,10 +6,16 @@ package cluster // Editing this file might prove futile when you re-run the swagger generate command import ( + "context" + "io" "net/http" "github.com/go-openapi/errors" + "github.com/go-openapi/runtime" "github.com/go-openapi/runtime/middleware" + "github.com/go-openapi/validate" + + "github.com/vanus-labs/vanus-operator/api/models" ) // NewGetClusterParams creates a new GetClusterParams object @@ -28,6 +34,12 @@ type GetClusterParams struct { // HTTP Request Object HTTPRequest *http.Request `json:"-"` + + /* + Required: true + In: body + */ + Get *models.ClusterGet } // BindRequest both binds and validates a request, it assumes that complex things implement a Validatable(strfmt.Registry) error interface @@ -39,6 +51,33 @@ func (o *GetClusterParams) BindRequest(r *http.Request, route *middleware.Matche o.HTTPRequest = r + if runtime.HasBody(r) { + defer r.Body.Close() + var body models.ClusterGet + if err := route.Consumer.Consume(r.Body, &body); err != nil { + if err == io.EOF { + res = append(res, errors.Required("get", "body", "")) + } else { + res = append(res, errors.NewParseError("get", "body", "", err)) + } + } else { + // validate body object + if err := body.Validate(route.Formats); err != nil { + res = append(res, err) + } + + ctx := validate.WithOperationRequest(context.Background()) + if err := body.ContextValidate(ctx, route.Formats); err != nil { + res = append(res, err) + } + + if len(res) == 0 { + o.Get = &body + } + } + } else { + res = append(res, errors.Required("get", "body", "")) + } if len(res) > 0 { return errors.CompositeValidationError(res...) } diff --git a/api/swagger.yaml b/api/swagger.yaml index 009d4a4..e39650b 100644 --- a/api/swagger.yaml +++ b/api/swagger.yaml @@ -21,6 +21,13 @@ paths: - "cluster" description: "get Cluster" operationId: "getCluster" + parameters: + - name: "get" + in: "body" + required: true + schema: + type: "object" + $ref: "#/definitions/Cluster_get" responses: '200': description: OK @@ -410,6 +417,12 @@ definitions: type: "object" description: "cluster info" properties: + name: + type: "string" + description: "cluster name" + namespace: + type: "string" + description: "cluster namespace" version: type: "string" description: "cluster version" @@ -417,10 +430,27 @@ definitions: type: "string" description: "cluster status" + Cluster_get: + type: object + description: "cluster get params" + properties: + name: + type: "string" + description: "cluster name" + namespace: + type: "string" + description: "cluster namespace" + Cluster_create: type: "object" description: "cluster create params" properties: + name: + type: "string" + description: "cluster name" + namespace: + type: "string" + description: "cluster namespace" version: type: "string" description: "cluster version" @@ -435,6 +465,12 @@ definitions: type: object description: "cluster patch params" properties: + name: + type: "string" + description: "cluster name" + namespace: + type: "string" + description: "cluster namespace" version: type: "string" description: "cluster version" @@ -449,6 +485,12 @@ definitions: type: object description: "cluster create params" properties: + name: + type: "string" + description: "cluster name" + namespace: + type: "string" + description: "cluster namespace" force: type: "boolean" default: false @@ -540,4 +582,3 @@ definitions: reason: type: "string" description: "connector status reason" - diff --git a/controllers/controller_component.go b/controllers/controller_component.go index 59f71c9..230f41b 100644 --- a/controllers/controller_component.go +++ b/controllers/controller_component.go @@ -41,15 +41,15 @@ func (r *CoreReconciler) handleController(ctx context.Context, logger logr.Logge controller := r.generateController(core) // Check if the statefulSet already exists, if not create a new one sts := &appsv1.StatefulSet{} - err := r.Get(ctx, types.NamespacedName{Name: cons.DefaultControllerComponentName, Namespace: cons.DefaultNamespace}, sts) + err := r.Get(ctx, types.NamespacedName{Name: cons.DefaultControllerComponentName, Namespace: core.Namespace}, sts) if err != nil { if errors.IsNotFound(err) { // Create Controller ConfigMap - controllerConfigMap := r.generateConfigMapForController(core) - logger.Info("Creating a new Controller ConfigMap.", "Namespace", controllerConfigMap.Namespace, "Name", controllerConfigMap.Name) - err = r.Create(ctx, controllerConfigMap) + cm := r.generateConfigMapForController(core) + logger.Info("Creating a new Controller ConfigMap.", "Namespace", cm.Namespace, "Name", cm.Name) + err = r.Create(ctx, cm) if err != nil { - logger.Error(err, "Failed to create new Controller ConfigMap", "Namespace", controllerConfigMap.Namespace, "Name", controllerConfigMap.Name) + logger.Error(err, "Failed to create new Controller ConfigMap", "Namespace", cm.Namespace, "Name", cm.Name) return ctrl.Result{}, err } else { logger.Info("Successfully create Controller ConfigMap") @@ -121,7 +121,6 @@ func (r *CoreReconciler) handleController(ctx context.Context, logger logr.Logge } } logger.Info("Controller is ready", "WaitingTime", time.Since(start)) - return ctrl.Result{}, nil } @@ -132,7 +131,7 @@ func (r *CoreReconciler) generateController(core *vanusv1alpha1.Core) *appsv1.St sts := &appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ Name: cons.DefaultControllerComponentName, - Namespace: cons.DefaultNamespace, + Namespace: core.Namespace, Labels: labels, }, Spec: appsv1.StatefulSetSpec{ @@ -172,7 +171,7 @@ func (r *CoreReconciler) generateController(core *vanusv1alpha1.Core) *appsv1.St func (r *CoreReconciler) waitControllerIsReady(ctx context.Context, core *vanusv1alpha1.Core) (bool, error) { sts := &appsv1.StatefulSet{} - err := r.Get(ctx, types.NamespacedName{Name: cons.DefaultControllerComponentName, Namespace: cons.DefaultNamespace}, sts) + err := r.Get(ctx, types.NamespacedName{Name: cons.DefaultControllerComponentName, Namespace: core.Namespace}, sts) if err != nil { return false, err } @@ -268,7 +267,7 @@ func (r *CoreReconciler) generateConfigMapForController(core *vanusv1alpha1.Core // TODO(jiangkai): automatic generation value.WriteString("secret_encryption_salt: encryption_salt\n") value.WriteString("root_controllers:\n") - for i := int32(0); i < cons.DefaultControllerReplicas; i++ { + for i := int32(0); i < cons.DefaultRootControllerReplicas; i++ { value.WriteString(fmt.Sprintf(" - vanus-root-controller-%d.vanus-root-controller:%s\n", i, core.Annotations[cons.CoreComponentRootControllerSvcPortAnnotation])) } value.WriteString("observability:\n") @@ -278,7 +277,6 @@ func (r *CoreReconciler) generateConfigMapForController(core *vanusv1alpha1.Core value.WriteString(" enable: false\n") value.WriteString(" # OpenTelemetry Collector endpoint, https://opentelemetry.io/docs/collector/getting-started/\n") value.WriteString(" otel_collector: http://127.0.0.1:4318\n") - value.WriteString("cluster:\n") value.WriteString(" component_name: controller\n") value.WriteString(" lease_duration_in_sec: 15\n") @@ -294,13 +292,12 @@ func (r *CoreReconciler) generateConfigMapForController(core *vanusv1alpha1.Core data["controller.yaml"] = value.String() cm := &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ - Namespace: cons.DefaultNamespace, + Namespace: core.Namespace, Name: cons.DefaultControllerConfigMapName, Finalizers: []string{metav1.FinalizerOrphanDependents}, }, Data: data, } - controllerutil.SetControllerReference(core, cm, r.Scheme) return cm } @@ -310,7 +307,7 @@ func (r *CoreReconciler) generateSvcForController(core *vanusv1alpha1.Core) *cor labels := genLabels(cons.DefaultControllerComponentName) svc := &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ - Namespace: cons.DefaultNamespace, + Namespace: core.Namespace, Name: cons.DefaultControllerComponentName, Labels: labels, Finalizers: []string{metav1.FinalizerOrphanDependents}, @@ -328,7 +325,6 @@ func (r *CoreReconciler) generateSvcForController(core *vanusv1alpha1.Core) *cor }, }, } - controllerutil.SetControllerReference(core, svc, r.Scheme) return svc } diff --git a/controllers/etcd_component.go b/controllers/etcd_component.go index 41b2eca..cf13c9c 100644 --- a/controllers/etcd_component.go +++ b/controllers/etcd_component.go @@ -40,7 +40,7 @@ func (r *CoreReconciler) handleEtcd(ctx context.Context, logger logr.Logger, cor // Create Etcd StatefulSet // Check if the statefulSet already exists, if not create a new one sts := &appsv1.StatefulSet{} - err := r.Get(ctx, types.NamespacedName{Name: cons.DefaultEtcdComponentName, Namespace: cons.DefaultNamespace}, sts) + err := r.Get(ctx, types.NamespacedName{Name: cons.DefaultEtcdComponentName, Namespace: core.Namespace}, sts) if err != nil { if errors.IsNotFound(err) { // Create Etcd Service @@ -80,7 +80,7 @@ func (r *CoreReconciler) handleEtcd(ctx context.Context, logger logr.Logger, cor ticker := time.NewTicker(defaultWaitForReadyTimeout) defer ticker.Stop() for { - ready, err := r.waitEtcdIsReady(ctx) + ready, err := r.waitEtcdIsReady(ctx, core) if err != nil { logger.Error(err, "Wait for Etcd install is ready but got error") return ctrl.Result{RequeueAfter: time.Duration(cons.DefaultRequeueIntervalInSecond) * time.Second}, err @@ -108,9 +108,9 @@ func (r *CoreReconciler) handleEtcd(ctx context.Context, logger logr.Logger, cor return ctrl.Result{}, nil } -func (r *CoreReconciler) waitEtcdIsReady(ctx context.Context) (bool, error) { +func (r *CoreReconciler) waitEtcdIsReady(ctx context.Context, core *vanusv1alpha1.Core) (bool, error) { sts := &appsv1.StatefulSet{} - err := r.Get(ctx, types.NamespacedName{Name: cons.DefaultEtcdComponentName, Namespace: cons.DefaultNamespace}, sts) + err := r.Get(ctx, types.NamespacedName{Name: cons.DefaultEtcdComponentName, Namespace: core.Namespace}, sts) if err != nil { return false, err } @@ -132,7 +132,7 @@ func (r *CoreReconciler) generateEtcd(core *vanusv1alpha1.Core) *appsv1.Stateful sts := &appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ Name: cons.DefaultEtcdComponentName, - Namespace: cons.DefaultNamespace, + Namespace: core.Namespace, Labels: labels, }, Spec: appsv1.StatefulSetSpec{ @@ -270,8 +270,16 @@ func getEnvForEtcd(core *vanusv1alpha1.Core) []corev1.EnvVar { }, { Name: "ETCD_CLUSTER_DOMAIN", Value: "vanus-etcd.vanus.svc.cluster.local", + }, { + Name: "ETCD_QUOTA_BACKEND_BYTES", + Value: "8589934592", + }, { + Name: "ETCD_AUTO_COMPACTION_MODE", + Value: "periodic", + }, { + Name: "ETCD_AUTO_COMPACTION_RETENTION", + Value: "60m", }} - return defaultEnvs } @@ -338,7 +346,7 @@ func (r *CoreReconciler) generateSvcForEtcd(core *vanusv1alpha1.Core) *corev1.Se labels := genLabels(cons.DefaultEtcdComponentName) svc := &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ - Namespace: cons.DefaultNamespace, + Namespace: core.Namespace, Name: cons.DefaultEtcdComponentName, Labels: labels, Finalizers: []string{metav1.FinalizerOrphanDependents}, @@ -361,7 +369,6 @@ func (r *CoreReconciler) generateSvcForEtcd(core *vanusv1alpha1.Core) *corev1.Se PublishNotReadyAddresses: true, }, } - controllerutil.SetControllerReference(core, svc, r.Scheme) return svc } diff --git a/controllers/gateway_component.go b/controllers/gateway_component.go index eeaa2c6..0ccf697 100644 --- a/controllers/gateway_component.go +++ b/controllers/gateway_component.go @@ -108,7 +108,7 @@ func (r *CoreReconciler) generateGateway(core *vanusv1alpha1.Core) *appsv1.Deplo dep := &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Name: cons.DefaultGatewayComponentName, - Namespace: cons.DefaultNamespace, + Namespace: core.Namespace, Labels: labels, }, Spec: appsv1.DeploymentSpec{ @@ -122,7 +122,6 @@ func (r *CoreReconciler) generateGateway(core *vanusv1alpha1.Core) *appsv1.Deplo Annotations: annotations, }, Spec: corev1.PodSpec{ - ServiceAccountName: cons.OperatorServiceAccountName, Containers: []corev1.Container{{ Name: cons.DefaultGatewayContainerName, Image: fmt.Sprintf("%s:%s", cons.DefaultGatewayContainerImageName, core.Spec.Version), @@ -221,17 +220,23 @@ func (r *CoreReconciler) generateConfigMapForGateway(core *vanusv1alpha1.Core) * for i := int32(0); i < cons.DefaultControllerReplicas; i++ { value.WriteString(fmt.Sprintf(" - vanus-controller-%d.vanus-controller:%s\n", i, core.Annotations[cons.CoreComponentControllerSvcPortAnnotation])) } + value.WriteString("observability:\n") + value.WriteString(" metrics:\n") + value.WriteString(" enable: true\n") + value.WriteString(" tracing:\n") + value.WriteString(" enable: false\n") + value.WriteString(" # OpenTelemetry Collector endpoint, https://opentelemetry.io/docs/collector/getting-started/\n") + value.WriteString(" otel_collector: http://127.0.0.1:4318\n") data := make(map[string]string) data["gateway.yaml"] = value.String() cm := &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ - Namespace: cons.DefaultNamespace, + Namespace: core.Namespace, Name: cons.DefaultGatewayConfigMapName, Finalizers: []string{metav1.FinalizerOrphanDependents}, }, Data: data, } - controllerutil.SetControllerReference(core, cm, r.Scheme) return cm } @@ -246,7 +251,7 @@ func (r *CoreReconciler) generateSvcForGateway(core *vanusv1alpha1.Core) *corev1 labels := genLabels(cons.DefaultGatewayComponentName) svc := &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ - Namespace: cons.DefaultNamespace, + Namespace: core.Namespace, Name: cons.DefaultGatewayComponentName, Labels: labels, Finalizers: []string{metav1.FinalizerOrphanDependents}, @@ -272,7 +277,6 @@ func (r *CoreReconciler) generateSvcForGateway(core *vanusv1alpha1.Core) *corev1 Type: corev1.ServiceTypeNodePort, }, } - controllerutil.SetControllerReference(core, svc, r.Scheme) return svc } diff --git a/controllers/root_controller_component.go b/controllers/root_controller_component.go index 399044c..5de9953 100644 --- a/controllers/root_controller_component.go +++ b/controllers/root_controller_component.go @@ -40,15 +40,15 @@ func (r *CoreReconciler) handleRootController(ctx context.Context, logger logr.L rootController := r.generateRootController(core) // Check if the statefulSet already exists, if not create a new one sts := &appsv1.StatefulSet{} - err := r.Get(ctx, types.NamespacedName{Name: cons.DefaultRootControllerComponentName, Namespace: cons.DefaultNamespace}, sts) + err := r.Get(ctx, types.NamespacedName{Name: cons.DefaultRootControllerComponentName, Namespace: core.Namespace}, sts) if err != nil { if errors.IsNotFound(err) { // Create rootController ConfigMap - rootControllerConfigMap := r.generateConfigMapForRootController(core) - logger.Info("Creating a new rootController ConfigMap.", "Namespace", rootControllerConfigMap.Namespace, "Name", rootControllerConfigMap.Name) - err = r.Create(ctx, rootControllerConfigMap) + cm := r.generateConfigMapForRootController(core) + logger.Info("Creating a new rootController ConfigMap.", "Namespace", cm.Namespace, "Name", cm.Name) + err = r.Create(ctx, cm) if err != nil { - logger.Error(err, "Failed to create new rootController ConfigMap", "Namespace", rootControllerConfigMap.Namespace, "Name", rootControllerConfigMap.Name) + logger.Error(err, "Failed to create new rootController ConfigMap", "Namespace", cm.Namespace, "Name", cm.Name) return ctrl.Result{}, err } else { logger.Info("Successfully create rootController ConfigMap") @@ -120,7 +120,6 @@ func (r *CoreReconciler) handleRootController(ctx context.Context, logger logr.L } } logger.Info("rootController is ready", "WaitingTime", time.Since(start)) - return ctrl.Result{}, nil } @@ -131,11 +130,11 @@ func (r *CoreReconciler) generateRootController(core *vanusv1alpha1.Core) *appsv sts := &appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ Name: cons.DefaultRootControllerComponentName, - Namespace: cons.DefaultNamespace, + Namespace: core.Namespace, Labels: labels, }, Spec: appsv1.StatefulSetSpec{ - Replicas: &cons.DefaultControllerReplicas, + Replicas: &cons.DefaultRootControllerReplicas, Selector: &metav1.LabelSelector{ MatchLabels: labels, }, @@ -171,11 +170,11 @@ func (r *CoreReconciler) generateRootController(core *vanusv1alpha1.Core) *appsv func (r *CoreReconciler) waitRootControllerIsReady(ctx context.Context, core *vanusv1alpha1.Core) (bool, error) { sts := &appsv1.StatefulSet{} - err := r.Get(ctx, types.NamespacedName{Name: cons.DefaultRootControllerComponentName, Namespace: cons.DefaultNamespace}, sts) + err := r.Get(ctx, types.NamespacedName{Name: cons.DefaultRootControllerComponentName, Namespace: core.Namespace}, sts) if err != nil { return false, err } - if sts.Status.Replicas == cons.DefaultControllerReplicas && sts.Status.ReadyReplicas == cons.DefaultControllerReplicas && sts.Status.AvailableReplicas == cons.DefaultControllerReplicas { + if sts.Status.Replicas == cons.DefaultRootControllerReplicas && sts.Status.ReadyReplicas == cons.DefaultRootControllerReplicas && sts.Status.AvailableReplicas == cons.DefaultRootControllerReplicas { return true, nil } return false, nil @@ -251,7 +250,6 @@ func (r *CoreReconciler) generateConfigMapForRootController(core *vanusv1alpha1. value.WriteString(" enable: false\n") value.WriteString(" # OpenTelemetry Collector endpoint, https://opentelemetry.io/docs/collector/getting-started/\n") value.WriteString(" otel_collector: http://127.0.0.1:4318\n") - value.WriteString("cluster:\n") value.WriteString(" component_name: root-controller\n") value.WriteString(" lease_duration_in_sec: 15\n") @@ -260,19 +258,18 @@ func (r *CoreReconciler) generateConfigMapForRootController(core *vanusv1alpha1. value.WriteString(fmt.Sprintf(" - vanus-etcd-%d.vanus-etcd:%s\n", i, core.Annotations[cons.CoreComponentEtcdPortClientAnnotation])) } value.WriteString(" topology:\n") - for i := int32(0); i < cons.DefaultControllerReplicas; i++ { + for i := int32(0); i < cons.DefaultRootControllerReplicas; i++ { value.WriteString(fmt.Sprintf(" vanus-root-controller-%d: vanus-root-controller-%d.vanus-root-controller.vanus.svc:%s\n", i, i, core.Annotations[cons.CoreComponentRootControllerSvcPortAnnotation])) } data["root.yaml"] = value.String() cm := &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ - Namespace: cons.DefaultNamespace, + Namespace: core.Namespace, Name: cons.DefaultRootControllerConfigMapName, Finalizers: []string{metav1.FinalizerOrphanDependents}, }, Data: data, } - controllerutil.SetControllerReference(core, cm, r.Scheme) return cm } @@ -282,7 +279,7 @@ func (r *CoreReconciler) generateSvcForRootController(core *vanusv1alpha1.Core) labels := genLabels(cons.DefaultRootControllerComponentName) svc := &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ - Namespace: cons.DefaultNamespace, + Namespace: core.Namespace, Name: cons.DefaultRootControllerComponentName, Labels: labels, Finalizers: []string{metav1.FinalizerOrphanDependents}, @@ -300,7 +297,6 @@ func (r *CoreReconciler) generateSvcForRootController(core *vanusv1alpha1.Core) }, }, } - controllerutil.SetControllerReference(core, svc, r.Scheme) return svc } diff --git a/controllers/store_component.go b/controllers/store_component.go index d2f2bb7..fd0424b 100644 --- a/controllers/store_component.go +++ b/controllers/store_component.go @@ -88,7 +88,7 @@ func (r *CoreReconciler) generateStore(core *vanusv1alpha1.Core) *appsv1.Statefu sts := &appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ Name: cons.DefaultStoreComponentName, - Namespace: cons.DefaultNamespace, + Namespace: core.Namespace, Labels: labels, }, Spec: appsv1.StatefulSetSpec{ @@ -221,7 +221,7 @@ func annotationsForStore() map[string]string { func (r *CoreReconciler) generateConfigMapForStore(core *vanusv1alpha1.Core) *corev1.ConfigMap { value := bytes.Buffer{} value.WriteString(fmt.Sprintf("port: %d\n", cons.DefaultStoreContainerPortGrpc)) - value.WriteString("ip: ${POD_IP}\n") + value.WriteString("ip: ${POD_NAME}.vanus-store.vanus.svc.cluster.local\n") value.WriteString("controllers:\n") for i := int32(0); i < cons.DefaultControllerReplicas; i++ { value.WriteString(fmt.Sprintf(" - vanus-controller-%d.vanus-controller:%s\n", i, core.Annotations[cons.CoreComponentControllerSvcPortAnnotation])) @@ -243,17 +243,28 @@ func (r *CoreReconciler) generateConfigMapForStore(core *vanusv1alpha1.Core) *co value.WriteString(" wal:\n") value.WriteString(" io:\n") value.WriteString(" engine: psync\n") + value.WriteString("vsb:\n") + value.WriteString(" flush_batch_size: 16384\n") + value.WriteString(" io:\n") + value.WriteString(" engine: psync\n") + value.WriteString(" parallel: 16\n") + value.WriteString("observability:\n") + value.WriteString(" metrics:\n") + value.WriteString(" enable: true\n") + value.WriteString(" tracing:\n") + value.WriteString(" enable: false\n") + value.WriteString(" # OpenTelemetry Collector endpoint, https://opentelemetry.io/docs/collector/getting-started/\n") + value.WriteString(" otel_collector: http://127.0.0.1:4318\n") data := make(map[string]string) data["store.yaml"] = value.String() cm := &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ - Namespace: cons.DefaultNamespace, + Namespace: core.Namespace, Name: cons.DefaultStoreConfigMapName, Finalizers: []string{metav1.FinalizerOrphanDependents}, }, Data: data, } - controllerutil.SetControllerReference(core, cm, r.Scheme) return cm } diff --git a/controllers/timer_component.go b/controllers/timer_component.go index 47503d6..4763809 100644 --- a/controllers/timer_component.go +++ b/controllers/timer_component.go @@ -85,7 +85,7 @@ func (r *CoreReconciler) generateTimer(core *vanusv1alpha1.Core) *appsv1.Deploym dep := &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Name: cons.DefaultTimerComponentName, - Namespace: cons.DefaultNamespace, + Namespace: core.Namespace, Labels: labels, }, Spec: appsv1.DeploymentSpec{ @@ -99,7 +99,6 @@ func (r *CoreReconciler) generateTimer(core *vanusv1alpha1.Core) *appsv1.Deploym Annotations: annotations, }, Spec: corev1.PodSpec{ - ServiceAccountName: cons.OperatorServiceAccountName, Containers: []corev1.Container{{ Name: cons.DefaultTimerContainerName, Image: fmt.Sprintf("%s:%s", cons.DefaultTimerContainerImageName, core.Spec.Version), @@ -191,16 +190,22 @@ func (r *CoreReconciler) generateConfigMapForTimer(core *vanusv1alpha1.Core) *co value.WriteString(fmt.Sprintf(" tick: %s\n", core.Annotations[cons.CoreComponentTimerTimingWheelTickAnnotation])) value.WriteString(fmt.Sprintf(" wheel_size: %s\n", core.Annotations[cons.CoreComponentTimerTimingWheelSizeAnnotation])) value.WriteString(fmt.Sprintf(" layers: %s\n", core.Annotations[cons.CoreComponentTimerTimingWheelLayersAnnotation])) + value.WriteString("observability:\n") + value.WriteString(" metrics:\n") + value.WriteString(" enable: true\n") + value.WriteString(" tracing:\n") + value.WriteString(" enable: false\n") + value.WriteString(" # OpenTelemetry Collector endpoint, https://opentelemetry.io/docs/collector/getting-started/\n") + value.WriteString(" otel_collector: http://127.0.0.1:4318\n") data["timer.yaml"] = value.String() cm := &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ - Namespace: cons.DefaultNamespace, + Namespace: core.Namespace, Name: cons.DefaultTimerConfigMapName, Finalizers: []string{metav1.FinalizerOrphanDependents}, }, Data: data, } - controllerutil.SetControllerReference(core, cm, r.Scheme) return cm } diff --git a/controllers/trigger_component.go b/controllers/trigger_component.go index 8c588d3..31732a4 100644 --- a/controllers/trigger_component.go +++ b/controllers/trigger_component.go @@ -87,7 +87,7 @@ func (r *CoreReconciler) generateTrigger(core *vanusv1alpha1.Core) *appsv1.Deplo dep := &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Name: cons.DefaultTriggerComponentName, - Namespace: cons.DefaultNamespace, + Namespace: core.Namespace, Labels: labels, }, Spec: appsv1.DeploymentSpec{ @@ -101,7 +101,6 @@ func (r *CoreReconciler) generateTrigger(core *vanusv1alpha1.Core) *appsv1.Deplo Annotations: annotations, }, Spec: corev1.PodSpec{ - ServiceAccountName: cons.OperatorServiceAccountName, Containers: []corev1.Container{{ Name: cons.DefaultTriggerContainerName, Image: fmt.Sprintf("%s:%s", cons.DefaultTriggerContainerImageName, core.Spec.Version), @@ -187,20 +186,30 @@ func (r *CoreReconciler) generateConfigMapForTrigger(core *vanusv1alpha1.Core) * value := bytes.Buffer{} value.WriteString(fmt.Sprintf("port: %d\n", cons.DefaultTriggerContainerPortGrpc)) value.WriteString("ip: ${POD_IP}\n") + value.WriteString("send_event_goroutine_size: 1000\n") + value.WriteString("send_event_batch_size: 32\n") + value.WriteString("pull_event_batch_size: 32\n") + value.WriteString("max_uack_event_number: 10000\n") value.WriteString("controllers:\n") for i := int32(0); i < cons.DefaultControllerReplicas; i++ { value.WriteString(fmt.Sprintf(" - vanus-controller-%d.vanus-controller.vanus.svc:%s\n", i, core.Annotations[cons.CoreComponentControllerSvcPortAnnotation])) } + value.WriteString("observability:\n") + value.WriteString(" metrics:\n") + value.WriteString(" enable: true\n") + value.WriteString(" tracing:\n") + value.WriteString(" enable: false\n") + value.WriteString(" # OpenTelemetry Collector endpoint, https://opentelemetry.io/docs/collector/getting-started/\n") + value.WriteString(" otel_collector: http://127.0.0.1:4318\n") data["trigger.yaml"] = value.String() cm := &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ - Namespace: cons.DefaultNamespace, + Namespace: core.Namespace, Name: cons.DefaultTriggerConfigMapName, Finalizers: []string{metav1.FinalizerOrphanDependents}, }, Data: data, } - controllerutil.SetControllerReference(core, cm, r.Scheme) return cm } diff --git a/internal/constants/constants.go b/internal/constants/constants.go index 5929b2d..b7dfd50 100644 --- a/internal/constants/constants.go +++ b/internal/constants/constants.go @@ -36,7 +36,7 @@ const ( DefaultControllerComponentName = "vanus-controller" DefaultControllerContainerName = "controller" DefaultControllerPortGrpc = 2048 - DefaultControllerSegmentCapacity = "64Mi" // 64Mi: 64*1024*1024=67108864 + DefaultControllerSegmentCapacity = "4Mi" // 4Mi: 4*1024*1024=4194304 DefaultControllerContainerImageName = "public.ecr.aws/vanus/controller" DefaultControllerConfigMapName = "config-controller" // Root Controller @@ -52,14 +52,14 @@ const ( DefaultEtcdPortPeer = 2380 DefaultEtcdContainerImageName = "public.ecr.aws/vanus/etcd:v3.5.7" // from 'docker.io/bitnami/etcd:3.5.7-debian-11-r9' DefaultEtcdVolumeMountPath = "/bitnami/etcd" - DefaultEtcdStorageSize = "10Gi" + DefaultEtcdStorageSize = "20Gi" // Store DefaultStoreComponentName = "vanus-store" DefaultStoreContainerName = "store" DefaultStoreContainerPortGrpc = 11811 DefaultStoreContainerImageName = "public.ecr.aws/vanus/store" DefaultStoreConfigMapName = "config-store" - DefaultStoreStorageSize = "10Gi" + DefaultStoreStorageSize = "20Gi" // Trigger DefaultTriggerComponentName = "vanus-trigger" DefaultTriggerContainerName = "trigger" @@ -95,7 +95,7 @@ const ( ) const ( - // OperatorServiceAccountName is the ServiceAccount name of Vanus cluster + // OperatorServiceAccountName is the ServiceAccount name of Vanus connector OperatorServiceAccountName = "vanus-operator" HeadlessServiceClusterIP = "None" @@ -113,13 +113,14 @@ const ( ) var ( - DefaultControllerReplicas int32 = 2 - DefaultEtcdReplicas int32 = 3 - DefaultStoreReplicas int32 = 3 - DefaultGatewayReplicas int32 = 1 - DefaultTriggerReplicas int32 = 1 - DefaultTimerReplicas int32 = 2 - DefaultConnectorReplicas int32 = 1 + DefaultControllerReplicas int32 = 2 + DefaultRootControllerReplicas int32 = 2 + DefaultEtcdReplicas int32 = 3 + DefaultStoreReplicas int32 = 3 + DefaultGatewayReplicas int32 = 1 + DefaultTriggerReplicas int32 = 1 + DefaultTimerReplicas int32 = 2 + DefaultConnectorReplicas int32 = 1 ) // Annotations supported by Core diff --git a/pkg/apiserver/handlers/cluster.go b/pkg/apiserver/handlers/cluster.go index c4c154c..49cfe20 100644 --- a/pkg/apiserver/handlers/cluster.go +++ b/pkg/apiserver/handlers/cluster.go @@ -22,7 +22,6 @@ import ( "github.com/vanus-labs/vanus-operator/api/models" "github.com/vanus-labs/vanus-operator/api/restapi/operations/cluster" vanusv1alpha1 "github.com/vanus-labs/vanus-operator/api/v1alpha1" - cons "github.com/vanus-labs/vanus-operator/internal/constants" "github.com/vanus-labs/vanus-operator/internal/convert" "github.com/vanus-labs/vanus-operator/pkg/apiserver/utils" "k8s.io/apimachinery/pkg/api/errors" @@ -49,22 +48,29 @@ func (a *Api) createClusterHandler(params cluster.CreateClusterParams) middlewar return utils.Response(400, err) } + // Check if the cluster namespace already exists, if exist, return error + err = a.createNsIfNotExist(params.Create) + if err != nil { + log.Errorf("create namespace %s failed, err: %s\n", params.Create.Namespace, err.Error()) + return utils.Response(500, err) + } + // Check if the cluster already exists, if exist, return error - exist, err := a.checkClusterExist() + exist, err := a.checkClusterExist(params.Create) if err != nil { log.Errorf("check cluster exist failed, err: %s\n", err.Error()) return utils.Response(500, err) } if exist { - log.Warning("Cluster already exist") + log.Warningf("Cluster already exist in namespace %s\n", params.Create.Namespace) return utils.Response(400, stderr.New("cluster already exist")) } - log.Infof("Creating a new cluster %s/%s\n", cons.DefaultNamespace, cons.DefaultVanusCoreName) core := generateCore(params.Create) - resultCore, err := a.createCore(core, cons.DefaultNamespace) + log.Infof("Creating a new cluster %s/%s\n", core.Namespace, core.Name) + resultCore, err := a.createCore(core) if err != nil { - log.Errorf("Failed to create new cluster %s/%s, err: %s\n", cons.DefaultNamespace, cons.DefaultVanusCoreName, err.Error()) + log.Errorf("Failed to create new cluster %s/%s, err: %s\n", core.Namespace, core.Name, err.Error()) return utils.Response(500, err) } log.Infof("Successfully create cluster: %+v\n", resultCore) @@ -76,17 +82,17 @@ func (a *Api) createClusterHandler(params cluster.CreateClusterParams) middlewar } func (a *Api) deleteClusterHandler(params cluster.DeleteClusterParams) middleware.Responder { - _, err := a.getCore(cons.DefaultNamespace, cons.DefaultVanusCoreName, &metav1.GetOptions{}) + _, err := a.getCore(params.Delete.Namespace, params.Delete.Name, &metav1.GetOptions{}) if err != nil { if errors.IsNotFound(err) { - log.Errorf("Cluster %s/%s not found, err: %s\n", cons.DefaultNamespace, cons.DefaultVanusCoreName, err.Error()) + log.Errorf("Cluster %s/%s not found, err: %s\n", params.Delete.Namespace, params.Delete.Name, err.Error()) return utils.Response(404, err) } - log.Errorf("Failed to get cluster %s/%s, err: %s\n", cons.DefaultNamespace, cons.DefaultVanusCoreName, err.Error()) + log.Errorf("Failed to get cluster %s/%s, err: %s\n", params.Delete.Namespace, params.Delete.Name, err.Error()) return utils.Response(500, err) } - err = a.deleteCore(cons.DefaultNamespace, cons.DefaultVanusCoreName) + err = a.deleteCore(params.Delete.Namespace, params.Delete.Name) if err != nil { log.Errorf("delete cluster failed, err: %s\n", err.Error()) return utils.Response(500, err) @@ -99,19 +105,19 @@ func (a *Api) deleteClusterHandler(params cluster.DeleteClusterParams) middlewar } func (a *Api) patchClusterHandler(params cluster.PatchClusterParams) middleware.Responder { - _, err := a.getCore(cons.DefaultNamespace, cons.DefaultVanusCoreName, &metav1.GetOptions{}) + _, err := a.getCore(params.Patch.Namespace, params.Patch.Name, &metav1.GetOptions{}) if err != nil { if errors.IsNotFound(err) { - log.Errorf("Cluster %s/%s not found, err: %s\n", cons.DefaultNamespace, cons.DefaultVanusCoreName, err.Error()) + log.Errorf("Cluster %s/%s not found, err: %s\n", params.Patch.Namespace, params.Patch.Name, err.Error()) return utils.Response(404, err) } - log.Errorf("Failed to get cluster %s/%s, err: %s\n", cons.DefaultNamespace, cons.DefaultVanusCoreName, err.Error()) + log.Errorf("Failed to get cluster %s/%s, err: %s\n", params.Patch.Namespace, params.Patch.Name, err.Error()) return utils.Response(500, err) } core := &vanusv1alpha1.Core{ ObjectMeta: metav1.ObjectMeta{ - Namespace: cons.DefaultNamespace, - Name: cons.DefaultVanusCoreName, + Namespace: params.Patch.Namespace, + Name: params.Patch.Name, Annotations: params.Patch.Annotations, }, Spec: vanusv1alpha1.CoreSpec{ @@ -120,7 +126,7 @@ func (a *Api) patchClusterHandler(params cluster.PatchClusterParams) middleware. } resultCore, err := a.patchCore(core) if err != nil { - log.Errorf("Failed to patch cluster %s/%s, err: %s\n", cons.DefaultNamespace, cons.DefaultVanusCoreName, err.Error()) + log.Errorf("Failed to patch cluster %s/%s, err: %s\n", params.Patch.Namespace, params.Patch.Name, err.Error()) return utils.Response(500, err) } log.Infof("Successfully patch cluster: %+v\n", resultCore) @@ -131,20 +137,22 @@ func (a *Api) patchClusterHandler(params cluster.PatchClusterParams) middleware. } func (a *Api) getClusterHandler(params cluster.GetClusterParams) middleware.Responder { - vanus, err := a.getCore(cons.DefaultNamespace, cons.DefaultVanusCoreName, &metav1.GetOptions{}) + core, err := a.getCore(params.Get.Namespace, params.Get.Name, &metav1.GetOptions{}) if err != nil { if errors.IsNotFound(err) { - log.Errorf("Cluster %s/%s not found, err: %s\n", cons.DefaultNamespace, cons.DefaultVanusCoreName, err.Error()) + log.Errorf("Cluster %s/%s not found, err: %s\n", params.Get.Namespace, params.Get.Name, err.Error()) return utils.Response(404, err) } - log.Errorf("Failed to get cluster %s/%s, err: %s\n", cons.DefaultNamespace, cons.DefaultVanusCoreName, err.Error()) + log.Errorf("Failed to get cluster %s/%s, err: %s\n", params.Get.Namespace, params.Get.Name, err.Error()) return utils.Response(500, err) } return cluster.NewGetClusterOK().WithPayload(&cluster.GetClusterOKBody{ Code: convert.PtrInt32(200), Data: &models.ClusterInfo{ - Version: vanus.Spec.Version, - Status: "Running", + Name: core.Name, + Namespace: core.Namespace, + Version: core.Spec.Version, + Status: "Running", }, Message: convert.PtrS("success"), }) @@ -161,20 +169,32 @@ func (a *Api) checkParamsValid(cluster *models.ClusterCreate) (bool, error) { return true, nil } -func (a *Api) checkClusterExist() (bool, error) { - _, exist, err := a.existCore(cons.DefaultNamespace, cons.DefaultVanusCoreName, &metav1.GetOptions{}) +func (a *Api) checkClusterExist(cluster *models.ClusterCreate) (bool, error) { + clusterList, err := a.listCore(cluster.Namespace, &metav1.ListOptions{}) if err != nil { - log.Errorf("Failed to get cluster, err: %s\n", err.Error()) + log.Errorf("Failed to list cluster in namespace %s, err: %s\n", cluster.Namespace, err.Error()) return false, err } - return exist, err + return len(clusterList.Items) != 0, err +} + +func (a *Api) createNsIfNotExist(cluster *models.ClusterCreate) error { + exist, err := a.existNamespace(cluster.Namespace) + if err != nil { + log.Errorf("Failed to check namespace %s if exist, err: %s\n", cluster.Namespace, err.Error()) + return err + } + if exist { + return nil + } + return a.createNamespace(cluster.Namespace) } func generateCore(cluster *models.ClusterCreate) *vanusv1alpha1.Core { controller := &vanusv1alpha1.Core{ ObjectMeta: metav1.ObjectMeta{ - Namespace: cons.DefaultNamespace, - Name: cons.DefaultVanusCoreName, + Name: cluster.Name, + Namespace: cluster.Namespace, Annotations: cluster.Annotations, }, Spec: vanusv1alpha1.CoreSpec{ diff --git a/pkg/apiserver/handlers/components.go b/pkg/apiserver/handlers/components.go index f80f521..a398a64 100644 --- a/pkg/apiserver/handlers/components.go +++ b/pkg/apiserver/handlers/components.go @@ -35,8 +35,8 @@ const ( ResourceConnector = "connectors" ) -func (a *Api) createCore(vanus *vanusv1alpha1.Core, namespace string) (*vanusv1alpha1.Core, error) { - existCore, exist, err := a.existCore(namespace, vanus.Name, &metav1.GetOptions{}) +func (a *Api) createCore(vanus *vanusv1alpha1.Core) (*vanusv1alpha1.Core, error) { + existCore, exist, err := a.existCore(vanus.Namespace, vanus.Name, &metav1.GetOptions{}) if err != nil { return existCore, err } @@ -46,7 +46,7 @@ func (a *Api) createCore(vanus *vanusv1alpha1.Core, namespace string) (*vanusv1a result := &vanusv1alpha1.Core{} err = a.ctrl.ClientSet(). Post(). - Namespace(namespace). + Namespace(vanus.Namespace). Resource(ResourceCore). Body(vanus). Do(context.TODO()). @@ -281,3 +281,27 @@ func (a *Api) deleteConnectorPVC(connector *vanusv1alpha1.Connector) error { } return nil } + +func (a *Api) existNamespace(name string) (bool, error) { + _, err := a.ctrl.K8SClientSet().CoreV1().Namespaces().Get(a.ctx, name, metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + return false, nil + } + return false, err + } + return true, nil +} + +func (a *Api) createNamespace(name string) error { + ns := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + } + _, err := a.ctrl.K8SClientSet().CoreV1().Namespaces().Create(a.ctx, ns, metav1.CreateOptions{}) + if err != nil { + return err + } + return nil +}