From 75b9c6bffee10c3685f7b32d655c0c03c03bc0e9 Mon Sep 17 00:00:00 2001 From: xieydd Date: Mon, 30 Oct 2023 10:48:53 +0800 Subject: [PATCH 1/3] feat: Support volume Signed-off-by: xieydd --- agent/api/types/inference_deployment.go | 34 +--- agent/api/types/secret.go | 15 +- agent/client/const.go | 1 + agent/client/secret_create.go | 16 ++ agent/pkg/docs/docs.go | 153 +++++++++++++-- agent/pkg/k8s/convert_inference.go | 4 +- agent/pkg/k8s/convert_inference_test.go | 4 +- agent/pkg/k8s/secret.go | 184 ++++++++++++++++++ agent/pkg/runtime/inference_create.go | 16 +- agent/pkg/runtime/mock/mock.go | 14 ++ agent/pkg/runtime/runtime.go | 5 + agent/pkg/runtime/secret.go | 15 ++ agent/pkg/server/handler_secret_create.go | 30 +++ agent/pkg/server/server_init_route.go | 4 + agent/pkg/server/validator/validator.go | 7 +- .../pkg/apis/modelzetes/v2alpha1/types.go | 27 +++ modelzetes/pkg/consts/consts.go | 5 + modelzetes/pkg/controller/controller.go | 43 ++++ modelzetes/pkg/controller/deployment.go | 87 +++++++-- modelzetes/pkg/controller/persistentvolume.go | 89 +++++++++ .../pkg/controller/persistentvolumeclaim.go | 59 ++++++ modelzetes/pkg/k8s/secrets.go | 176 ----------------- 22 files changed, 726 insertions(+), 262 deletions(-) create mode 100644 agent/client/secret_create.go create mode 100644 agent/pkg/k8s/secret.go create mode 100644 agent/pkg/runtime/secret.go create mode 100644 agent/pkg/server/handler_secret_create.go create mode 100644 modelzetes/pkg/controller/persistentvolume.go create mode 100644 modelzetes/pkg/controller/persistentvolumeclaim.go diff --git a/agent/api/types/inference_deployment.go b/agent/api/types/inference_deployment.go index ba649b7..97774cb 100644 --- a/agent/api/types/inference_deployment.go +++ b/agent/api/types/inference_deployment.go @@ -1,5 +1,9 @@ package types +import ( + modelzetes "github.com/tensorchord/openmodelz/modelzetes/pkg/apis/modelzetes/v2alpha1" +) + // InferenceDeployment represents a request to create or update a Model. type InferenceDeployment struct { Spec InferenceDeploymentSpec `json:"spec"` @@ -14,7 +18,7 @@ type InferenceDeploymentSpec struct { Namespace string `json:"namespace,omitempty"` // Scaling is the scaling configuration for the inference. - Scaling *ScalingConfig `json:"scaling,omitempty"` + Scaling *modelzetes.ScalingConfig `json:"scaling,omitempty"` // Framework is the inference framework. Framework Framework `json:"framework,omitempty"` @@ -48,6 +52,9 @@ type InferenceDeploymentSpec struct { // Resources are the compute resource requirements. Resources *ResourceRequirements `json:"resources,omitempty"` + + // Volumes are the volumes to mount. + Volumes []modelzetes.VolumeConfig `json:"volumes,omitempty"` } // Framework is the inference framework. It is only used to set the default port @@ -63,31 +70,6 @@ const ( FrameworkOther Framework = "other" ) -type ScalingConfig struct { - // MinReplicas is the lower limit for the number of replicas to which the - // autoscaler can scale down. It defaults to 0. - MinReplicas *int32 `json:"min_replicas,omitempty"` - // MaxReplicas is the upper limit for the number of replicas to which the - // autoscaler can scale up. It cannot be less that minReplicas. It defaults - // to 1. - MaxReplicas *int32 `json:"max_replicas,omitempty"` - // TargetLoad is the target load. In capacity mode, it is the expected number of the inflight requests per replica. - TargetLoad *int32 `json:"target_load,omitempty"` - // Type is the scaling type. It can be either "capacity" or "rps". Default is "capacity". - Type *ScalingType `json:"type,omitempty"` - // ZeroDuration is the duration (in seconds) of zero load before scaling down to zero. Default is 5 minutes. - ZeroDuration *int32 `json:"zero_duration,omitempty"` - // StartupDuration is the duration (in seconds) of startup time. - StartupDuration *int32 `json:"startup_duration,omitempty"` -} - -type ScalingType string - -const ( - ScalingTypeCapacity ScalingType = "capacity" - ScalingTypeRPS ScalingType = "rps" -) - // ResourceRequirements describes the compute resource requirements. type ResourceRequirements struct { // Limits describes the maximum amount of compute resources allowed. diff --git a/agent/api/types/secret.go b/agent/api/types/secret.go index 71f9373..dbdd87e 100644 --- a/agent/api/types/secret.go +++ b/agent/api/types/secret.go @@ -8,10 +8,15 @@ type Secret struct { // Namespace if applicable for the secret Namespace string `json:"namespace,omitempty"` - // Value is a string representing the string's value - Value string `json:"value,omitempty"` + // Data contains the secret data. Each key must consist of alphanumeric + // characters, '-', '_' or '.'. The serialized form of the secret data is a + // base64 encoded string, representing the arbitrary (possibly non-string) + // data value here. Described in https://tools.ietf.org/html/rfc4648#section-4 + Data map[string][]byte `json:"data,omitempty"` - // RawValue can be used to provide binary data when - // Value is not set - RawValue []byte `json:"rawValue,omitempty"` + // stringData allows specifying non-binary secret data in string form. + // It is provided as a write-only input field for convenience. + // All keys and values are merged into the data field on write, overwriting any existing values. + // The stringData field is never output when reading from the API. + StringData map[string]string `json:"stringData,omitempty"` } diff --git a/agent/client/const.go b/agent/client/const.go index a01cd92..16b33f7 100644 --- a/agent/client/const.go +++ b/agent/client/const.go @@ -24,6 +24,7 @@ const ( gatewayBuildControlPlanePath = "/system/build" gatewayBuildInstanceControlPlanePath = "/system/build/%s" gatewayImageCacheControlPlanePath = "/system/image-cache" + gatewaySecretControlPlanePath = "/system/secrets" modelzCloudClusterControlPlanePath = "/api/v1/users/%s/clusters/%s" modelzCloudClusterWithUserControlPlanePath = "/api/v1/users/%s/clusters" modelzCloudClusterAPIKeyControlPlanePath = "/api/v1/users/%s/clusters/%s/api_keys" diff --git a/agent/client/secret_create.go b/agent/client/secret_create.go new file mode 100644 index 0000000..791a2da --- /dev/null +++ b/agent/client/secret_create.go @@ -0,0 +1,16 @@ +package client + +import ( + "context" + "net/url" + + "github.com/tensorchord/openmodelz/agent/api/types" +) + +func (cli *Client) SecretCreate(ctx context.Context, secret types.Secret) error { + urlValues := url.Values{} + + resp, err := cli.post(ctx, gatewaySecretControlPlanePath, urlValues, secret, nil) + defer ensureReaderClosed(resp) + return wrapResponseError(err, resp, "secret", secret.Namespace+"/"+secret.Name) +} diff --git a/agent/pkg/docs/docs.go b/agent/pkg/docs/docs.go index d0d7e03..fbc82df 100644 --- a/agent/pkg/docs/docs.go +++ b/agent/pkg/docs/docs.go @@ -1166,6 +1166,40 @@ const docTemplate = `{ } } }, + "/system/secrets": { + "post": { + "description": "Create the secret.", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "secret" + ], + "summary": "Create the secret.", + "parameters": [ + { + "description": "Secret", + "name": "body", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/types.Secret" + } + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/types.Secret" + } + } + } + } + }, "/system/server/{name}/delete": { "delete": { "description": "Delete a node.", @@ -1472,7 +1506,7 @@ const docTemplate = `{ }, "scaling": { "description": "Scaling is the scaling configuration for the inference.", - "$ref": "#/definitions/types.ScalingConfig" + "$ref": "#/definitions/v2alpha1.ScalingConfig" }, "secrets": { "description": "Secrets list of secrets to be made available to inference.", @@ -1480,6 +1514,13 @@ const docTemplate = `{ "items": { "type": "string" } + }, + "volumes": { + "description": "Volumes are the volumes to mount.", + "type": "array", + "items": { + "$ref": "#/definitions/v2alpha1.VolumeConfig" + } } } }, @@ -1585,6 +1626,10 @@ const docTemplate = `{ "osImage": { "description": "OS Image reported by the node from /etc/os-release (e.g. Debian GNU/Linux 7 (wheezy)).", "type": "string" + }, + "resourceType": { + "description": "The Resource Type reported by the node", + "type": "string" } } }, @@ -1638,32 +1683,33 @@ const docTemplate = `{ } } }, - "types.ScalingConfig": { + "types.Secret": { "type": "object", "properties": { - "max_replicas": { - "description": "MaxReplicas is the upper limit for the number of replicas to which the\nautoscaler can scale up. It cannot be less that minReplicas. It defaults\nto 1.", - "type": "integer" - }, - "min_replicas": { - "description": "MinReplicas is the lower limit for the number of replicas to which the\nautoscaler can scale down. It defaults to 0.", - "type": "integer" - }, - "startup_duration": { - "description": "StartupDuration is the duration (in seconds) of startup time.", - "type": "integer" + "data": { + "description": "Data contains the secret data. Each key must consist of alphanumeric\ncharacters, '-', '_' or '.'. The serialized form of the secret data is a\nbase64 encoded string, representing the arbitrary (possibly non-string)\ndata value here. Described in https://tools.ietf.org/html/rfc4648#section-4", + "type": "object", + "additionalProperties": { + "type": "array", + "items": { + "type": "integer" + } + } }, - "target_load": { - "description": "TargetLoad is the target load. In capacity mode, it is the expected number of the inflight requests per replica.", - "type": "integer" + "name": { + "description": "Name of the secret", + "type": "string" }, - "type": { - "description": "Type is the scaling type. It can be either \"capacity\" or \"rps\". Default is \"capacity\".", + "namespace": { + "description": "Namespace if applicable for the secret", "type": "string" }, - "zero_duration": { - "description": "ZeroDuration is the duration (in seconds) of zero load before scaling down to zero. Default is 5 minutes.", - "type": "integer" + "stringData": { + "description": "stringData allows specifying non-binary secret data in string form.\nIt is provided as a write-only input field for convenience.\nAll keys and values are merged into the data field on write, overwriting any existing values.\nThe stringData field is never output when reading from the API.", + "type": "object", + "additionalProperties": { + "type": "string" + } } } }, @@ -1737,6 +1783,71 @@ const docTemplate = `{ "type": "string" } } + }, + "v2alpha1.ScalingConfig": { + "type": "object", + "properties": { + "max_replicas": { + "description": "MaxReplicas is the upper limit for the number of replicas to which the\nautoscaler can scale up. It cannot be less that minReplicas. It defaults\nto 1.", + "type": "integer" + }, + "min_replicas": { + "description": "MinReplicas is the lower limit for the number of replicas to which the\nautoscaler can scale down. It defaults to 0.", + "type": "integer" + }, + "startup_duration": { + "description": "StartupDuration is the duration of startup time.", + "type": "integer" + }, + "target_load": { + "description": "TargetLoad is the target load. In capacity mode, it is the expected number of the inflight requests per replica.", + "type": "integer" + }, + "type": { + "description": "Type is the scaling type. It can be either \"capacity\" or \"rps\". Default is \"capacity\".", + "type": "string" + }, + "zero_duration": { + "description": "ZeroDuration is the duration of zero load before scaling down to zero. Default is 5 minutes.", + "type": "integer" + } + } + }, + "v2alpha1.VolumeConfig": { + "type": "object", + "properties": { + "mount_option": { + "description": "MountOption is the mount option.", + "type": "string" + }, + "mount_path": { + "description": "MountPath is the path in pod to mount the volume.", + "type": "string" + }, + "name": { + "description": "Name is the name of the volume.", + "type": "string" + }, + "node_name": { + "description": "NodeNames are the name list of the node. It is only used for local volume.", + "type": "array", + "items": { + "type": "string" + } + }, + "secret_name": { + "description": "SecretName is the name of the secret. It is only used for object storage volume.", + "type": "string" + }, + "sub_path": { + "description": "SubPath is the sub path of the volume.", + "type": "string" + }, + "type": { + "description": "Type of the volume.", + "type": "string" + } + } } } }` diff --git a/agent/pkg/k8s/convert_inference.go b/agent/pkg/k8s/convert_inference.go index ca4bc9f..c5bf23d 100644 --- a/agent/pkg/k8s/convert_inference.go +++ b/agent/pkg/k8s/convert_inference.go @@ -31,7 +31,7 @@ func AsInferenceDeployment(inf *v2alpha1.Inference, item *appsv1.Deployment) *ty } if inf.Spec.Scaling != nil { - res.Spec.Scaling = &types.ScalingConfig{ + res.Spec.Scaling = &v2alpha1.ScalingConfig{ MinReplicas: inf.Spec.Scaling.MinReplicas, MaxReplicas: inf.Spec.Scaling.MaxReplicas, TargetLoad: inf.Spec.Scaling.TargetLoad, @@ -39,7 +39,7 @@ func AsInferenceDeployment(inf *v2alpha1.Inference, item *appsv1.Deployment) *ty StartupDuration: inf.Spec.Scaling.StartupDuration, } if inf.Spec.Scaling.Type != nil { - typ := types.ScalingType(*inf.Spec.Scaling.Type) + typ := v2alpha1.ScalingType(*inf.Spec.Scaling.Type) res.Spec.Scaling.Type = &typ } } diff --git a/agent/pkg/k8s/convert_inference_test.go b/agent/pkg/k8s/convert_inference_test.go index b091042..5f5f8e6 100644 --- a/agent/pkg/k8s/convert_inference_test.go +++ b/agent/pkg/k8s/convert_inference_test.go @@ -112,8 +112,8 @@ var _ = Describe("agent/pkg/k8s/convert_inference", func() { }), expect: Ptr(types.InferenceDeployment{ Spec: types.InferenceDeploymentSpec{ - Scaling: Ptr(types.ScalingConfig{ - Type: Ptr(types.ScalingTypeCapacity), + Scaling: Ptr(v2alpha1.ScalingConfig{ + Type: Ptr(v2alpha1.ScalingTypeCapacity), }), }, Status: types.InferenceDeploymentStatus{ diff --git a/agent/pkg/k8s/secret.go b/agent/pkg/k8s/secret.go new file mode 100644 index 0000000..98ded0b --- /dev/null +++ b/agent/pkg/k8s/secret.go @@ -0,0 +1,184 @@ +package k8s + +import ( + "context" + "errors" + "fmt" + "log" + "strings" + + "github.com/tensorchord/openmodelz/agent/api/types" + apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + typedv1 "k8s.io/client-go/kubernetes/typed/core/v1" +) + +const ( + secretsMountPath = "/var/modelz/secrets" + secretLabel = "app.kubernetes.io/managed-by" + secretLabelValue = "modelz" + secretsProjectVolumeNameTmpl = "projected-secrets" +) + +// SecretsClient exposes the standardized CRUD behaviors for Kubernetes secrets. These methods +// will ensure that the secrets are structured and labelled correctly for use by the modelz system. +type SecretsClient interface { + // List returns a list of available function secrets. Only the names are returned + // to ensure we do not accidentally read or print the sensitive values during + // read operations. + List(namespace string) (names []string, err error) + // Create adds a new secret, with the appropriate labels and structure to be + // used as a function secret. + Create(secret types.Secret) error + // Replace updates the value of a function secret + Replace(secret types.Secret) error + // Delete removes a function secret + Delete(name string, namespace string) error + // GetSecrets queries Kubernetes for a list of secrets by name in the given k8s namespace. + // This should only be used if you need access to the actual secret structure/value. Specifically, + // inside the FunctionFactory. + GetSecrets(namespace string, secretNames []string) (map[string]*apiv1.Secret, error) +} + +// SecretInterfacer exposes the SecretInterface getter for the k8s client. +// This is implemented by the CoreV1Interface() interface in the Kubernetes client. +// The SecretsClient only needs this one interface, but needs to be able to set the +// namespaces when the interface is instantiated, meaning, we need the Getter and not the +// SecretInterface itself. +type SecretInterfacer interface { + // Secrets returns a SecretInterface scoped to the specified namespace + Secrets(namespace string) typedv1.SecretInterface +} + +type SecretClient struct { + kube SecretInterfacer +} + +// NewSecretsClient constructs a new SecretsClient using the provided Kubernetes client. +func NewSecretClient(kube kubernetes.Interface) SecretsClient { + return &SecretClient{ + kube: kube.CoreV1(), + } +} + +func (c SecretClient) List(namespace string) (names []string, err error) { + res, err := c.kube.Secrets(namespace).List(context.TODO(), c.selector()) + if err != nil { + log.Printf("failed to list secrets in %s: %v\n", namespace, err) + return nil, err + } + + names = make([]string, len(res.Items)) + for idx, item := range res.Items { + // this is safe because size of names matches res.Items exactly + names[idx] = item.Name + } + return names, nil +} + +func (c SecretClient) Create(secret types.Secret) error { + err := c.validateSecret(secret) + if err != nil { + return err + } + + req := &apiv1.Secret{ + Type: apiv1.SecretTypeOpaque, + ObjectMeta: metav1.ObjectMeta{ + Name: secret.Name, + Namespace: secret.Namespace, + Labels: map[string]string{ + secretLabel: secretLabelValue, + }, + }, + } + + if len(secret.Data) > 0 { + req.Data = secret.Data + } + + if len(secret.StringData) > 0 { + req.StringData = secret.StringData + } + + s, err := c.kube.Secrets(secret.Namespace).Get(context.Background(), secret.Name, metav1.GetOptions{}) + if err == nil && s != nil { + log.Printf("secret %s.%s already exists\n", secret.Name, secret.Namespace) + return nil + } + + _, err = c.kube.Secrets(secret.Namespace).Create(context.TODO(), req, metav1.CreateOptions{}) + if err != nil { + log.Printf("failed to create secret %s.%s: %v\n", secret.Name, secret.Namespace, err) + return err + } + + log.Printf("created secret %s.%s\n", secret.Name, secret.Namespace) + + return nil +} + +func (c SecretClient) Replace(secret types.Secret) error { + err := c.validateSecret(secret) + if err != nil { + return err + } + + kube := c.kube.Secrets(secret.Namespace) + found, err := kube.Get(context.TODO(), secret.Name, metav1.GetOptions{}) + if err != nil { + log.Printf("can not retrieve secret for update %s.%s: %v\n", secret.Name, secret.Namespace, err) + return err + } + + _, err = kube.Update(context.TODO(), found, metav1.UpdateOptions{}) + if err != nil { + log.Printf("can not update secret %s.%s: %v\n", secret.Name, secret.Namespace, err) + return err + } + + return nil +} + +func (c SecretClient) Delete(namespace string, name string) error { + err := c.kube.Secrets(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{}) + if err != nil { + log.Printf("can not delete %s.%s: %v\n", name, namespace, err) + } + return err +} + +func (c SecretClient) GetSecrets(namespace string, secretNames []string) (map[string]*apiv1.Secret, error) { + kube := c.kube.Secrets(namespace) + opts := metav1.GetOptions{} + + secrets := map[string]*apiv1.Secret{} + for _, secretName := range secretNames { + secret, err := kube.Get(context.TODO(), secretName, opts) + if err != nil { + return nil, err + } + secrets[secretName] = secret + } + + return secrets, nil +} + +func (c SecretClient) selector() metav1.ListOptions { + return metav1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=%s", secretLabel, secretLabelValue), + } +} + +func (c SecretClient) validateSecret(secret types.Secret) error { + if strings.TrimSpace(secret.Namespace) == "" { + return errors.New("namespace may not be empty") + } + + if strings.TrimSpace(secret.Name) == "" { + return errors.New("name may not be empty") + } + + return nil +} diff --git a/agent/pkg/runtime/inference_create.go b/agent/pkg/runtime/inference_create.go index 72eca88..df17809 100644 --- a/agent/pkg/runtime/inference_create.go +++ b/agent/pkg/runtime/inference_create.go @@ -157,17 +157,11 @@ func makeInference(request types.InferenceDeployment) (*v2alpha1.Inference, erro } if request.Spec.Scaling != nil { - is.Spec.Scaling = &v2alpha1.ScalingConfig{ - MinReplicas: request.Spec.Scaling.MinReplicas, - MaxReplicas: request.Spec.Scaling.MaxReplicas, - TargetLoad: request.Spec.Scaling.TargetLoad, - ZeroDuration: request.Spec.Scaling.ZeroDuration, - StartupDuration: request.Spec.Scaling.StartupDuration, - } - if request.Spec.Scaling.Type != nil { - buf := v2alpha1.ScalingType(*request.Spec.Scaling.Type) - is.Spec.Scaling.Type = &buf - } + is.Spec.Scaling = request.Spec.Scaling + } + + if len(request.Spec.Volumes) != 0 { + is.Spec.Volumes = request.Spec.Volumes } rr, err := createResources(request) diff --git a/agent/pkg/runtime/mock/mock.go b/agent/pkg/runtime/mock/mock.go index d5c6a05..4c2ba03 100644 --- a/agent/pkg/runtime/mock/mock.go +++ b/agent/pkg/runtime/mock/mock.go @@ -82,6 +82,20 @@ func (mr *MockRuntimeMockRecorder) BuildList(ctx, namespace interface{}) *gomock return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BuildList", reflect.TypeOf((*MockRuntime)(nil).BuildList), ctx, namespace) } +// CreateSecret mocks base method. +func (m *MockRuntime) CreateSecret(ctx context.Context, secret *types.Secret) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateSecret", ctx, secret) + ret0, _ := ret[0].(error) + return ret0 +} + +// CreateSecret indicates an expected call of CreateSecret. +func (mr *MockRuntimeMockRecorder) CreateSecret(ctx, secret interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateSecret", reflect.TypeOf((*MockRuntime)(nil).CreateSecret), ctx, secret) +} + // GetClusterInfo mocks base method. func (m *MockRuntime) GetClusterInfo(cluster *types.ManagedCluster) error { m.ctrl.T.Helper() diff --git a/agent/pkg/runtime/runtime.go b/agent/pkg/runtime/runtime.go index 5e70a7e..0771ea8 100644 --- a/agent/pkg/runtime/runtime.go +++ b/agent/pkg/runtime/runtime.go @@ -17,6 +17,7 @@ import ( "github.com/tensorchord/openmodelz/agent/api/types" "github.com/tensorchord/openmodelz/agent/pkg/config" "github.com/tensorchord/openmodelz/agent/pkg/event" + "github.com/tensorchord/openmodelz/agent/pkg/k8s" ingressclient "github.com/tensorchord/openmodelz/ingress-operator/pkg/client/clientset/versioned" "github.com/tensorchord/openmodelz/modelzetes/pkg/apis/modelzetes/v2alpha1" apis "github.com/tensorchord/openmodelz/modelzetes/pkg/apis/modelzetes/v2alpha1" @@ -55,6 +56,8 @@ type Runtime interface { ServerList(ctx context.Context) ([]types.Server, error) // managed cluster GetClusterInfo(cluster *types.ManagedCluster) error + // secret + CreateSecret(ctx context.Context, secret *types.Secret) error } type generalRuntime struct { @@ -69,6 +72,7 @@ type generalRuntime struct { ingressClient ingressclient.Interface inferenceClient clientset.Interface kubefledgedClient kubefledged.Interface + secretClient k8s.SecretsClient logger *logrus.Entry eventRecorder event.Interface @@ -104,6 +108,7 @@ func New(clientConfig *rest.Config, clientConfig: clientConfig, ingressClient: ingressClient, inferenceClient: inferenceClient, + secretClient: k8s.NewSecretClient(kubeClient), logger: logrus.WithField("component", "runtime"), eventRecorder: eventRecorder, ingressEnabled: ingressEnabled, diff --git a/agent/pkg/runtime/secret.go b/agent/pkg/runtime/secret.go new file mode 100644 index 0000000..c25c50d --- /dev/null +++ b/agent/pkg/runtime/secret.go @@ -0,0 +1,15 @@ +package runtime + +import ( + "context" + + "github.com/tensorchord/openmodelz/agent/api/types" +) + +func (r generalRuntime) CreateSecret(ctx context.Context, secret *types.Secret) error { + err := r.secretClient.Create(*secret) + if err != nil { + return err + } + return nil +} diff --git a/agent/pkg/server/handler_secret_create.go b/agent/pkg/server/handler_secret_create.go new file mode 100644 index 0000000..bc5f128 --- /dev/null +++ b/agent/pkg/server/handler_secret_create.go @@ -0,0 +1,30 @@ +package server + +import ( + "net/http" + + "github.com/gin-gonic/gin" + "github.com/tensorchord/openmodelz/agent/api/types" +) + +// @Summary Create the secret. +// @Description Create the secret. +// @Tags secret +// @Accept json +// @Produce json +// @Param body body types.Secret true "Secret" +// @Success 200 {object} types.Secret +// @Router /system/secrets [post] +func (s *Server) handleSecretCreate(c *gin.Context) error { + var req types.Secret + if err := c.ShouldBindJSON(&req); err != nil { + return NewError(http.StatusBadRequest, err, "failed to parse request body") + } + + if err := s.runtime.CreateSecret(c.Request.Context(), &req); err != nil { + return NewError(http.StatusInternalServerError, err, "failed to create secret") + } + + c.JSON(http.StatusOK, req) + return nil +} diff --git a/agent/pkg/server/server_init_route.go b/agent/pkg/server/server_init_route.go index 6f84ccb..720071e 100644 --- a/agent/pkg/server/server_init_route.go +++ b/agent/pkg/server/server_init_route.go @@ -21,6 +21,7 @@ const ( endpointHealthz = "/healthz" endpointBuild = "/build" endpointImageCache = "/image-cache" + endpointSecretPlural = "/secrets" ) func (s *Server) registerRoutes() { @@ -105,6 +106,9 @@ func (s *Server) registerRoutes() { // image cache controlPlane.POST(endpointImageCache, WrapHandler(s.handleImageCacheCreate)) + + // secret + controlPlane.POST(endpointSecretPlural, WrapHandler(s.handleSecretCreate)) } // registerMetricsRoutes registers the metrics routes. diff --git a/agent/pkg/server/validator/validator.go b/agent/pkg/server/validator/validator.go index de5aad3..c985eb0 100644 --- a/agent/pkg/server/validator/validator.go +++ b/agent/pkg/server/validator/validator.go @@ -7,6 +7,7 @@ import ( "k8s.io/apimachinery/pkg/util/rand" "github.com/tensorchord/openmodelz/agent/api/types" + modelzetes "github.com/tensorchord/openmodelz/modelzetes/pkg/apis/modelzetes/v2alpha1" ) const ( @@ -47,7 +48,7 @@ func (v Validator) ValidateService(service string) error { // DefaultDeployRequest sets default values for the deploy request. func (v Validator) DefaultDeployRequest(request *types.InferenceDeployment) { if request.Spec.Scaling == nil { - request.Spec.Scaling = &types.ScalingConfig{} + request.Spec.Scaling = &modelzetes.ScalingConfig{} } if request.Spec.Scaling.MinReplicas == nil { @@ -66,8 +67,8 @@ func (v Validator) DefaultDeployRequest(request *types.InferenceDeployment) { } if request.Spec.Scaling.Type == nil { - request.Spec.Scaling.Type = new(types.ScalingType) - *request.Spec.Scaling.Type = types.ScalingTypeCapacity + request.Spec.Scaling.Type = new(modelzetes.ScalingType) + *request.Spec.Scaling.Type = modelzetes.ScalingTypeCapacity } if request.Spec.Scaling.ZeroDuration == nil { diff --git a/modelzetes/pkg/apis/modelzetes/v2alpha1/types.go b/modelzetes/pkg/apis/modelzetes/v2alpha1/types.go index ad0e91e..4fae577 100644 --- a/modelzetes/pkg/apis/modelzetes/v2alpha1/types.go +++ b/modelzetes/pkg/apis/modelzetes/v2alpha1/types.go @@ -58,6 +58,8 @@ type InferenceSpec struct { // Limits for inference Resources *v1.ResourceRequirements `json:"resources,omitempty"` + + Volumes []VolumeConfig `json:"volumes,omitempty"` } // Framework is the inference framework. It is only used to set the default port @@ -98,6 +100,31 @@ const ( ScalingTypeRPS ScalingType = "rps" ) +const ( + VolumeTypeLocal VolumeType = "local" + VolumeTypeS3Fuse VolumeType = "s3fuse" + VolumeTypeGCSFuse VolumeType = "gcsfuse" +) + +type VolumeType string + +type VolumeConfig struct { + // Name is the name of the volume. + Name string `json:"name"` + // Type of the volume. + Type VolumeType `json:"type,omitempty"` + // MountPath is the path in pod to mount the volume. + MountPath string `json:"mount_path"` + // SubPath is the sub path of the volume. + SubPath *string `json:"sub_path,omitempty"` + // MountOption is the mount option. + MountOption *string `json:"mount_option,omitempty"` + // SecretName is the name of the secret. It is only used for object storage volume. + SecretName *string `json:"secret_name,omitempty"` + // NodeNames are the name list of the node. It is only used for local volume. + NodeNames []string `json:"node_name,omitempty"` +} + // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // InferenceList is a list of inference resources diff --git a/modelzetes/pkg/consts/consts.go b/modelzetes/pkg/consts/consts.go index 7d7fe85..4b41190 100644 --- a/modelzetes/pkg/consts/consts.go +++ b/modelzetes/pkg/consts/consts.go @@ -30,4 +30,9 @@ const ( // MaxReplicas is the maximum number of replicas that can be set for a inference. MaxReplicas = 5 + + GCSCSIDriverName = "gcs.csi.ofek.dev" + GCSVolumeHandle = "csi-gcs" + GCSStorageClassName = "csi-gcs-sc" + LocalStorageClassName = "local-storage" ) diff --git a/modelzetes/pkg/controller/controller.go b/modelzetes/pkg/controller/controller.go index 3f7a810..624f64e 100644 --- a/modelzetes/pkg/controller/controller.go +++ b/modelzetes/pkg/controller/controller.go @@ -249,6 +249,49 @@ func (c *Controller) syncHandler(key string) error { } } + // Create persistentvolume if needed. + if len(function.Spec.Volumes) != 0 { + for _, volume := range function.Spec.Volumes { + + if (volume.Type == v2alpha1.VolumeTypeLocal) && (len(volume.NodeNames) == 0) { + // no need create pv and pvc for hostPath + continue + } + pvName := makePersistentVolumeName(volume.Name) + _, err := c.kubeclientset.CoreV1().PersistentVolumes().Get(context.TODO(), pvName, metav1.GetOptions{}) + if errors.IsNotFound(err) { + err = nil + glog.Infof("Creating persistentvolume %s for '%s'", pvName, function.Spec.Name) + if _, err := c.kubeclientset.CoreV1().PersistentVolumes().Create(context.TODO(), + newPersistentVolume(function, volume), metav1.CreateOptions{}); err != nil { + if errors.IsAlreadyExists(err) { + err = nil + glog.V(2).Infof("Persistentvolume '%s' already exists. Skipping creation.", function.Spec.Name) + } else { + return err + } + } + } + + pvcName := makePersistentVolumeClaimName(volume.Name) + _, err = c.kubeclientset.CoreV1().PersistentVolumeClaims(function.Namespace).Get(context.TODO(), pvcName, metav1.GetOptions{}) + if errors.IsNotFound(err) { + err = nil + glog.Infof("Creating persistentvolumeclaim %s for '%s'", pvcName, function.Spec.Name) + if _, err := c.kubeclientset.CoreV1().PersistentVolumeClaims(function.Namespace).Create(context.TODO(), + makePersistentVolumeClaim(function, volume), metav1.CreateOptions{}); err != nil { + if errors.IsAlreadyExists(err) { + err = nil + glog.V(2).Infof("Persistentvolumeclaim '%s' already exists. Skipping creation.", function.Spec.Name) + } else { + return err + } + } + } + } + } + // Create persistentvolumeclaim if needed. + // Get the deployment with the name specified in Function.spec deployment, err := c.deploymentsLister. Deployments(function.Namespace).Get(deploymentName) diff --git a/modelzetes/pkg/controller/deployment.go b/modelzetes/pkg/controller/deployment.go index b65b928..eff388f 100644 --- a/modelzetes/pkg/controller/deployment.go +++ b/modelzetes/pkg/controller/deployment.go @@ -61,6 +61,9 @@ func newDeployment( allowPrivilegeEscalation := false + volumes := makeVolumes(inference) + volumeMounts := makeVolumeMounts(inference) + deploymentSpec := &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Name: inference.Spec.Name, @@ -115,24 +118,10 @@ func newDeployment( AllowPrivilegeEscalation: &allowPrivilegeEscalation, }, // TODO(xieydd): Add a function to set shm size - VolumeMounts: []corev1.VolumeMount{ - { - Name: "dshm", - MountPath: "/dev/shm", - }, - }, - }, - }, - Volumes: []corev1.Volume{ - { - Name: "dshm", - VolumeSource: corev1.VolumeSource{ - EmptyDir: &corev1.EmptyDirVolumeSource{ - Medium: corev1.StorageMediumMemory, - }, - }, + VolumeMounts: volumeMounts, }, }, + Volumes: volumes, }, }, }, @@ -185,6 +174,72 @@ func newDeployment( return deploymentSpec } +func makeVolumes(inference *v2alpha1.Inference) []corev1.Volume { + volumes := []corev1.Volume{ + { + Name: "dshm", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{ + Medium: corev1.StorageMediumMemory, + }, + }, + }, + } + + if len(inference.Spec.Volumes) != 0 { + for _, volume := range inference.Spec.Volumes { + if (volume.Type == v2alpha1.VolumeTypeLocal) && (len(volume.NodeNames) == 0) { + types := corev1.HostPathDirectoryOrCreate + volumes = append(volumes, corev1.Volume{ + Name: volume.Name, + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: *volume.SubPath, + Type: &types, + }, + }, + }) + } else { + volumes = append(volumes, corev1.Volume{ + Name: volume.Name, + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: makePersistentVolumeClaimName(volume.Name), + }, + }, + }) + } + } + } + + return volumes +} + +func makeVolumeMounts(inference *v2alpha1.Inference) []corev1.VolumeMount { + volumeMounts := []corev1.VolumeMount{ + { + Name: "dshm", + MountPath: "/dev/shm", + }, + } + + if len(inference.Spec.Volumes) != 0 { + for _, volume := range inference.Spec.Volumes { + volumeMount := corev1.VolumeMount{ + Name: makePersistentVolumeName(volume.Name), + MountPath: volume.MountPath, + } + + if volume.SubPath != nil { + volumeMount.SubPath = *volume.SubPath + } + volumeMounts = append(volumeMounts, volumeMount) + } + } + + return volumeMounts +} + func makeTolerationGPU() []corev1.Toleration { res := []corev1.Toleration{ { diff --git a/modelzetes/pkg/controller/persistentvolume.go b/modelzetes/pkg/controller/persistentvolume.go new file mode 100644 index 0000000..9a1a068 --- /dev/null +++ b/modelzetes/pkg/controller/persistentvolume.go @@ -0,0 +1,89 @@ +package controller + +import ( + "github.com/tensorchord/openmodelz/modelzetes/pkg/apis/modelzetes/v2alpha1" + "github.com/tensorchord/openmodelz/modelzetes/pkg/consts" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + glog "k8s.io/klog" +) + +const ( + capacity = "100Gi" +) + +func makePersistentVolumeName(name string) string { + return name + "-pv" +} + +func newPersistentVolume(function *v2alpha1.Inference, volume v2alpha1.VolumeConfig) *corev1.PersistentVolume { + pv := &corev1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: makePersistentVolumeName(volume.Name), + OwnerReferences: []metav1.OwnerReference{ + *metav1.NewControllerRef(function, schema.GroupVersionKind{ + Group: v2alpha1.SchemeGroupVersion.Group, + Version: v2alpha1.SchemeGroupVersion.Version, + Kind: v2alpha1.Kind, + }), + }, + }, + Spec: corev1.PersistentVolumeSpec{ + AccessModes: []corev1.PersistentVolumeAccessMode{ + corev1.ReadWriteOnce, + }, + PersistentVolumeReclaimPolicy: corev1.PersistentVolumeReclaimDelete, + }, + } + + switch volume.Type { + case v2alpha1.VolumeTypeGCSFuse: + pv.Spec.Capacity = corev1.ResourceList{ + corev1.ResourceStorage: resource.MustParse(capacity), + } + pv.Spec.StorageClassName = consts.GCSStorageClassName + csi := &corev1.CSIPersistentVolumeSource{ + Driver: consts.GCSCSIDriverName, + VolumeHandle: consts.GCSVolumeHandle, + } + if volume.SecretName != nil { + csi.NodePublishSecretRef = &corev1.SecretReference{ + Name: *volume.SecretName, + Namespace: function.Namespace, + } + } + pv.Spec.CSI = csi + case v2alpha1.VolumeTypeLocal: + pv.Spec.Capacity = corev1.ResourceList{ + corev1.ResourceStorage: resource.MustParse(capacity), + } + mode := corev1.PersistentVolumeFilesystem + pv.Spec.VolumeMode = &mode + pv.Spec.StorageClassName = consts.LocalStorageClassName + pv.Spec.Local = &corev1.LocalVolumeSource{ + Path: *volume.SubPath, + } + pv.Spec.NodeAffinity = &corev1.VolumeNodeAffinity{ + Required: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ + { + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: corev1.LabelHostname, + Operator: corev1.NodeSelectorOpIn, + Values: volume.NodeNames, + }, + }, + }, + }, + }, + } + default: + glog.Errorf("unknown volume type: %s", volume.Type) + return nil + } + + return pv +} diff --git a/modelzetes/pkg/controller/persistentvolumeclaim.go b/modelzetes/pkg/controller/persistentvolumeclaim.go new file mode 100644 index 0000000..d099eb1 --- /dev/null +++ b/modelzetes/pkg/controller/persistentvolumeclaim.go @@ -0,0 +1,59 @@ +package controller + +import ( + v2alpha1 "github.com/tensorchord/openmodelz/modelzetes/pkg/apis/modelzetes/v2alpha1" + "github.com/tensorchord/openmodelz/modelzetes/pkg/consts" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + glog "k8s.io/klog" +) + +func makePersistentVolumeClaimName(name string) string { + return name + "-pvc" +} + +func makePersistentVolumeClaim(function *v2alpha1.Inference, volume v2alpha1.VolumeConfig) *corev1.PersistentVolumeClaim { + pvc := &corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: makePersistentVolumeClaimName(volume.Name), + Namespace: function.Namespace, + OwnerReferences: []metav1.OwnerReference{ + *metav1.NewControllerRef(function, schema.GroupVersionKind{ + Group: v2alpha1.SchemeGroupVersion.Group, + Version: v2alpha1.SchemeGroupVersion.Version, + Kind: v2alpha1.Kind, + }), + }, + }, + Spec: corev1.PersistentVolumeClaimSpec{ + AccessModes: []corev1.PersistentVolumeAccessMode{ + corev1.ReadWriteOnce, + }, + VolumeName: makePersistentVolumeName(volume.Name), + }, + } + switch volume.Type { + case v2alpha1.VolumeTypeGCSFuse: + sc := consts.GCSStorageClassName + pvc.Spec.StorageClassName = &sc + pvc.Spec.Resources = corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceStorage: resource.MustParse(capacity), + }, + } + case v2alpha1.VolumeTypeLocal: + sc := consts.LocalStorageClassName + pvc.Spec.StorageClassName = &sc + pvc.Spec.Resources = corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceStorage: resource.MustParse(capacity), + }, + } + default: + glog.Errorf("unknown volume type %s", volume.Type) + return nil + } + return pvc +} diff --git a/modelzetes/pkg/k8s/secrets.go b/modelzetes/pkg/k8s/secrets.go index 0b83efa..e3cd7ef 100644 --- a/modelzetes/pkg/k8s/secrets.go +++ b/modelzetes/pkg/k8s/secrets.go @@ -4,195 +4,19 @@ package k8s import ( - "context" "fmt" - "log" "sort" - "strings" - "github.com/pkg/errors" - types "github.com/tensorchord/openmodelz/agent/api/types" "github.com/tensorchord/openmodelz/modelzetes/pkg/apis/modelzetes/v2alpha1" appsv1 "k8s.io/api/apps/v1" apiv1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" - typedV1 "k8s.io/client-go/kubernetes/typed/core/v1" ) const ( secretsMountPath = "/var/modelz/secrets" - secretLabel = "app.kubernetes.io/managed-by" - secretLabelValue = "modelz" secretsProjectVolumeNameTmpl = "projected-secrets" ) -// SecretsClient exposes the standardized CRUD behaviors for Kubernetes secrets. These methods -// will ensure that the secrets are structured and labelled correctly for use by the modelz system. -type SecretsClient interface { - // List returns a list of available function secrets. Only the names are returned - // to ensure we do not accidentally read or print the sensitive values during - // read operations. - List(namespace string) (names []string, err error) - // Create adds a new secret, with the appropriate labels and structure to be - // used as a function secret. - Create(secret types.Secret) error - // Replace updates the value of a function secret - Replace(secret types.Secret) error - // Delete removes a function secret - Delete(name string, namespace string) error - // GetSecrets queries Kubernetes for a list of secrets by name in the given k8s namespace. - // This should only be used if you need access to the actual secret structure/value. Specifically, - // inside the FunctionFactory. - GetSecrets(namespace string, secretNames []string) (map[string]*apiv1.Secret, error) -} - -// SecretInterfacer exposes the SecretInterface getter for the k8s client. -// This is implemented by the CoreV1Interface() interface in the Kubernetes client. -// The SecretsClient only needs this one interface, but needs to be able to set the -// namespaces when the interface is instantiated, meaning, we need the Getter and not the -// SecretInterface itself. -type SecretInterfacer interface { - // Secrets returns a SecretInterface scoped to the specified namespace - Secrets(namespace string) typedV1.SecretInterface -} - -type secretClient struct { - kube SecretInterfacer -} - -// NewSecretsClient constructs a new SecretsClient using the provided Kubernetes client. -func NewSecretsClient(kube kubernetes.Interface) SecretsClient { - return &secretClient{ - kube: kube.CoreV1(), - } -} - -func (c secretClient) List(namespace string) (names []string, err error) { - res, err := c.kube.Secrets(namespace).List(context.TODO(), c.selector()) - if err != nil { - log.Printf("failed to list secrets in %s: %v\n", namespace, err) - return nil, err - } - - names = make([]string, len(res.Items)) - for idx, item := range res.Items { - // this is safe because size of names matches res.Items exactly - names[idx] = item.Name - } - return names, nil -} - -func (c secretClient) Create(secret types.Secret) error { - err := c.validateSecret(secret) - if err != nil { - return err - } - - req := &apiv1.Secret{ - Type: apiv1.SecretTypeOpaque, - ObjectMeta: metav1.ObjectMeta{ - Name: secret.Name, - Namespace: secret.Namespace, - Labels: map[string]string{ - secretLabel: secretLabelValue, - }, - }, - } - - req.Data = c.getValidSecretData(secret) - - _, err = c.kube.Secrets(secret.Namespace).Create(context.TODO(), req, metav1.CreateOptions{}) - if err != nil { - log.Printf("failed to create secret %s.%s: %v\n", secret.Name, secret.Namespace, err) - return err - } - - log.Printf("created secret %s.%s\n", secret.Name, secret.Namespace) - - return nil -} - -func (c secretClient) Replace(secret types.Secret) error { - err := c.validateSecret(secret) - if err != nil { - return err - } - - kube := c.kube.Secrets(secret.Namespace) - found, err := kube.Get(context.TODO(), secret.Name, metav1.GetOptions{}) - if err != nil { - log.Printf("can not retrieve secret for update %s.%s: %v\n", secret.Name, secret.Namespace, err) - return err - } - - found.Data = c.getValidSecretData(secret) - - _, err = kube.Update(context.TODO(), found, metav1.UpdateOptions{}) - if err != nil { - log.Printf("can not update secret %s.%s: %v\n", secret.Name, secret.Namespace, err) - return err - } - - return nil -} - -func (c secretClient) Delete(namespace string, name string) error { - err := c.kube.Secrets(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{}) - if err != nil { - log.Printf("can not delete %s.%s: %v\n", name, namespace, err) - } - return err -} - -func (c secretClient) GetSecrets(namespace string, secretNames []string) (map[string]*apiv1.Secret, error) { - kube := c.kube.Secrets(namespace) - opts := metav1.GetOptions{} - - secrets := map[string]*apiv1.Secret{} - for _, secretName := range secretNames { - secret, err := kube.Get(context.TODO(), secretName, opts) - if err != nil { - return nil, err - } - secrets[secretName] = secret - } - - return secrets, nil -} - -func (c secretClient) selector() metav1.ListOptions { - return metav1.ListOptions{ - LabelSelector: fmt.Sprintf("%s=%s", secretLabel, secretLabelValue), - } -} - -func (c secretClient) validateSecret(secret types.Secret) error { - if strings.TrimSpace(secret.Namespace) == "" { - return errors.New("namespace may not be empty") - } - - if strings.TrimSpace(secret.Name) == "" { - return errors.New("name may not be empty") - } - - return nil -} - -func (c secretClient) getValidSecretData(secret types.Secret) map[string][]byte { - - if len(secret.RawValue) > 0 { - return map[string][]byte{ - secret.Name: secret.RawValue, - } - } - - return map[string][]byte{ - secret.Name: []byte(secret.Value), - } - -} - // ConfigureSecrets will update the Deployment spec to include secrets that have been deployed // in the kubernetes cluster. For each requested secret, we inspect the type and add it to the // deployment spec as appropriate: secrets with type `SecretTypeDockercfg/SecretTypeDockerjson` From f8df59475a5ffab667fc0242bf9e6b6cce03ca5c Mon Sep 17 00:00:00 2001 From: xieydd Date: Mon, 30 Oct 2023 15:40:09 +0800 Subject: [PATCH 2/3] Fix ci error Signed-off-by: xieydd --- mdz/pkg/cmd/deploy.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/mdz/pkg/cmd/deploy.go b/mdz/pkg/cmd/deploy.go index adb4fc5..f0fdf32 100644 --- a/mdz/pkg/cmd/deploy.go +++ b/mdz/pkg/cmd/deploy.go @@ -10,6 +10,7 @@ import ( "github.com/spf13/cobra" "github.com/tensorchord/openmodelz/agent/api/types" "github.com/tensorchord/openmodelz/mdz/pkg/telemetry" + modelzetes "github.com/tensorchord/openmodelz/modelzetes/pkg/apis/modelzetes/v2alpha1" ) var ( @@ -70,7 +71,7 @@ func commandDeploy(cmd *cobra.Command, args []string) error { name = petname.Generate(2, "-") } - var typ types.ScalingType = types.ScalingTypeCapacity + var typ modelzetes.ScalingType = modelzetes.ScalingTypeCapacity inf := types.InferenceDeployment{ Spec: types.InferenceDeploymentSpec{ Image: deployImage, @@ -80,7 +81,7 @@ func commandDeploy(cmd *cobra.Command, args []string) error { "ai.tensorchord.name": name, }, Framework: types.FrameworkOther, - Scaling: &types.ScalingConfig{ + Scaling: &modelzetes.ScalingConfig{ MinReplicas: int32Ptr(deployMinReplicas), MaxReplicas: int32Ptr(deployMaxReplicas), TargetLoad: int32Ptr(10), From 33bca1b39e1eb3890a2b8f0e1d5e8076adfda0c1 Mon Sep 17 00:00:00 2001 From: xieydd Date: Fri, 3 Nov 2023 14:33:01 +0800 Subject: [PATCH 3/3] chore: Fix some error in test Signed-off-by: xieydd --- .../crds/tensorchord.ai_inferences.yaml | 35 ++++++++++++++++-- modelzetes/pkg/controller/controller.go | 36 +++++++++++++++++-- modelzetes/pkg/controller/deployment.go | 2 +- modelzetes/pkg/controller/persistentvolume.go | 8 ----- 4 files changed, 68 insertions(+), 13 deletions(-) diff --git a/modelzetes/artifacts/crds/tensorchord.ai_inferences.yaml b/modelzetes/artifacts/crds/tensorchord.ai_inferences.yaml index 8341c7e..13b65f9 100644 --- a/modelzetes/artifacts/crds/tensorchord.ai_inferences.yaml +++ b/modelzetes/artifacts/crds/tensorchord.ai_inferences.yaml @@ -2,7 +2,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.5.0 + controller-gen.kubebuilder.io/version: v0.7.0 creationTimestamp: null name: inferences.tensorchord.ai spec: @@ -107,7 +107,7 @@ spec: - type: string x-kubernetes-int-or-string: true requests: - description: 'Requests describes the minimum amount of compute resources required. If Requests is omitted for a container, it defaults to Limits if that is explicitly specified, otherwise to an implementation-defined value. More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/' + description: 'Requests describes the minimum amount of compute resources required. If Requests is omitted for a container, it defaults to Limits if that is explicitly specified, otherwise to an implementation-defined value. Requests cannot exceed Limits. More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/' type: object additionalProperties: pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ @@ -147,6 +147,37 @@ spec: type: array items: type: string + volumes: + type: array + items: + type: object + required: + - mount_path + - name + properties: + mount_option: + description: MountOption is the mount option. + type: string + mount_path: + description: MountPath is the path in pod to mount the volume. + type: string + name: + description: Name is the name of the volume. + type: string + node_name: + description: NodeNames are the name list of the node. It is only used for local volume. + type: array + items: + type: string + secret_name: + description: SecretName is the name of the secret. It is only used for object storage volume. + type: string + sub_path: + description: SubPath is the sub path of the volume. + type: string + type: + description: Type of the volume. + type: string served: true storage: true subresources: {} diff --git a/modelzetes/pkg/controller/controller.go b/modelzetes/pkg/controller/controller.go index 624f64e..9e14365 100644 --- a/modelzetes/pkg/controller/controller.go +++ b/modelzetes/pkg/controller/controller.go @@ -119,6 +119,12 @@ func NewController( UpdateFunc: func(old, new interface{}) { controller.enqueueFunction(new) }, + DeleteFunc: func(obj interface{}) { + _, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err == nil { + controller.deleteFunction(obj) + } + }, }) // Set up an event handler for when functions related resources like pods, deployments, replica sets @@ -250,9 +256,8 @@ func (c *Controller) syncHandler(key string) error { } // Create persistentvolume if needed. - if len(function.Spec.Volumes) != 0 { + if len(function.Spec.Volumes) > 0 { for _, volume := range function.Spec.Volumes { - if (volume.Type == v2alpha1.VolumeTypeLocal) && (len(volume.NodeNames) == 0) { // no need create pv and pvc for hostPath continue @@ -399,6 +404,33 @@ func (c *Controller) enqueueFunction(obj interface{}) { c.workqueue.AddRateLimited(key) } +func (c *Controller) deleteFunction(obj interface{}) { + + function := obj.(*v2alpha1.Inference) + + if len(function.Spec.Volumes) > 0 { + for _, volume := range function.Spec.Volumes { + if (volume.Type == v2alpha1.VolumeTypeLocal) && (len(volume.NodeNames) == 0) { + // no need create pv and pvc for hostPath + continue + } + pvName := makePersistentVolumeName(volume.Name) + err := c.kubeclientset.CoreV1().PersistentVolumes().Delete(context.TODO(), pvName, metav1.DeleteOptions{}) + + if errors.IsNotFound(err) { + glog.Infof("Persistentvolume '%s' already deleted. Skipping deletion.", function.Spec.Name) + return + } + if err != nil { + runtime.HandleError(fmt.Errorf("failed to delete persistentvolume %s for '%s': %s", pvName, function.Spec.Name, err.Error())) + return + } + glog.Infof("Deleted persistentvolume %s for '%s'", pvName, function.Spec.Name) + } + } + return +} + // handleObject will take any resource implementing metav1.Object and attempt // to find the Function resource that 'owns' it. It does this by looking at the // objects metadata.ownerReferences field for an appropriate OwnerReference. diff --git a/modelzetes/pkg/controller/deployment.go b/modelzetes/pkg/controller/deployment.go index eff388f..f910839 100644 --- a/modelzetes/pkg/controller/deployment.go +++ b/modelzetes/pkg/controller/deployment.go @@ -226,7 +226,7 @@ func makeVolumeMounts(inference *v2alpha1.Inference) []corev1.VolumeMount { if len(inference.Spec.Volumes) != 0 { for _, volume := range inference.Spec.Volumes { volumeMount := corev1.VolumeMount{ - Name: makePersistentVolumeName(volume.Name), + Name: volume.Name, MountPath: volume.MountPath, } diff --git a/modelzetes/pkg/controller/persistentvolume.go b/modelzetes/pkg/controller/persistentvolume.go index 9a1a068..10f9087 100644 --- a/modelzetes/pkg/controller/persistentvolume.go +++ b/modelzetes/pkg/controller/persistentvolume.go @@ -6,7 +6,6 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" glog "k8s.io/klog" ) @@ -22,13 +21,6 @@ func newPersistentVolume(function *v2alpha1.Inference, volume v2alpha1.VolumeCon pv := &corev1.PersistentVolume{ ObjectMeta: metav1.ObjectMeta{ Name: makePersistentVolumeName(volume.Name), - OwnerReferences: []metav1.OwnerReference{ - *metav1.NewControllerRef(function, schema.GroupVersionKind{ - Group: v2alpha1.SchemeGroupVersion.Group, - Version: v2alpha1.SchemeGroupVersion.Version, - Kind: v2alpha1.Kind, - }), - }, }, Spec: corev1.PersistentVolumeSpec{ AccessModes: []corev1.PersistentVolumeAccessMode{