Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support resource in sidecar #991

Merged
merged 6 commits into from
Jan 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/katib-controller/v1alpha3/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,15 @@ func main() {
var webhookPort int
var certLocalFS bool
var injectSecurityContext bool
var serviceName string

flag.StringVar(&experimentSuggestionName, "experiment-suggestion-name",
"default", "The implementation of suggestion interface in experiment controller (default|fake)")
flag.StringVar(&metricsAddr, "metrics-addr", ":8080", "The address the metric endpoint binds to.")
flag.IntVar(&webhookPort, "webhook-port", 8443, "The port number to be used for admission webhook server.")
flag.BoolVar(&certLocalFS, "cert-localfs", false, "Store the webhook cert in local file system")
flag.BoolVar(&injectSecurityContext, "webhook-inject-securitycontext", false, "Inject the securityContext of container[0] in the sidecar")
flag.StringVar(&serviceName, "webhook-service-name", "katib-controller", "The service name which will be used in webhook")

flag.Parse()

Expand Down Expand Up @@ -102,7 +104,7 @@ func main() {
}

log.Info("Setting up webhooks")
if err := webhook.AddToManager(mgr, int32(webhookPort)); err != nil {
if err := webhook.AddToManager(mgr, int32(webhookPort), serviceName); err != nil {
log.Error(err, "unable to register webhooks to the manager")
os.Exit(1)
}
Expand Down
7 changes: 6 additions & 1 deletion manifests/v1alpha3/katib-controller/katib-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@ data:
"image": "gcr.io/kubeflow-images-public/katib/v1alpha3/file-metrics-collector"
},
"TensorFlowEvent": {
"image": "gcr.io/kubeflow-images-public/katib/v1alpha3/tfevent-metrics-collector"
"image": "gcr.io/kubeflow-images-public/katib/v1alpha3/tfevent-metrics-collector",
"resources": {
"limits": {
"memory": "1Gi"
}
}
}
}
suggestion: |-
Expand Down
17 changes: 15 additions & 2 deletions pkg/controller.v1alpha3/consts/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,28 @@ const (
DefaultMemRequest = "10Mi"
// LabelSuggestionDiskLimitTag is the name of suggestion Disk Limit config in configmap.
LabelSuggestionDiskLimitTag = "diskLimit"
// DefaultMemLimit is the default value for mem Limit
// DefaultDiskLimit is the default value for disk limit.
DefaultDiskLimit = "5Gi"
// LabelSuggestionDiskRequestTag is the name of suggestion Disk Request config in configmap.
LabelSuggestionDiskRequestTag = "diskRequest"
// DefaultMemRequest is the default value for mem Request
// DefaultDiskRequest is the default value for disk request.
DefaultDiskRequest = "500Mi"
// LabelMetricsCollectorSidecar is the name of metrics collector config in configmap.
LabelMetricsCollectorSidecar = "metrics-collector-sidecar"
// LabelMetricsCollectorSidecarImage is the name of metrics collector image config in configmap.
LabelMetricsCollectorSidecarImage = "image"
// LabelMetricsCollectorCPULimitTag is the name of metrics collector CPU Limit config in configmap.
LabelMetricsCollectorCPULimitTag = "cpuLimit"
// LabelMetricsCollectorCPURequestTag is the name of metrics collector CPU Request config in configmap.
LabelMetricsCollectorCPURequestTag = "cpuRequest"
// LabelMetricsCollectorMemLimitTag is the name of metrics collector Mem Limit config in configmap.
LabelMetricsCollectorMemLimitTag = "memLimit"
// LabelMetricsCollectorMemRequestTag is the name of metrics collector Mem Request config in configmap.
LabelMetricsCollectorMemRequestTag = "memRequest"
// LabelMetricsCollectorDiskLimitTag is the name of metrics collector Disk Limit config in configmap.
LabelMetricsCollectorDiskLimitTag = "diskLimit"
// LabelMetricsCollectorDiskRequestTag is the name of metrics collector Disk Request config in configmap.
LabelMetricsCollectorDiskRequestTag = "diskRequest"

// ReconcileErrorReason is the reason when there is a reconcile error.
ReconcileErrorReason = "ReconcileError"
Expand All @@ -93,5 +105,6 @@ const (
)

var (
// DefaultKatibNamespace is the default namespace of katib deployment.
DefaultKatibNamespace = env.GetEnvOrDefault(DefaultKatibNamespaceEnvName, "kubeflow")
)
7 changes: 6 additions & 1 deletion pkg/controller.v1alpha3/experiment/manifest/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

commonapiv1alpha3 "github.com/kubeflow/katib/pkg/apis/controller/common/v1alpha3"
experimentsv1alpha3 "github.com/kubeflow/katib/pkg/apis/controller/experiments/v1alpha3"
"github.com/kubeflow/katib/pkg/controller.v1alpha3/consts"
"github.com/kubeflow/katib/pkg/util/v1alpha3/katibclient"
"github.com/kubeflow/katib/pkg/util/v1alpha3/katibconfig"
)
Expand Down Expand Up @@ -45,7 +46,11 @@ func (g *DefaultGenerator) InjectClient(c client.Client) {
}

func (g *DefaultGenerator) GetMetricsCollectorImage(cKind commonapiv1alpha3.CollectorKind) (string, error) {
return katibconfig.GetMetricsCollectorImage(cKind, g.client.GetClient())
configData, err := katibconfig.GetMetricsCollectorConfigData(cKind, g.client.GetClient())
if err != nil {
return "", nil
}
return configData[consts.LabelMetricsCollectorSidecarImage], nil
}

func (g *DefaultGenerator) GetSuggestionConfigData(algorithmName string) (map[string]string, error) {
Expand Down
88 changes: 68 additions & 20 deletions pkg/util/v1alpha3/katibconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,17 @@ import (
"github.com/kubeflow/katib/pkg/controller.v1alpha3/consts"
)

type suggestionConfigJSON struct {
Image string `json:"image"`
Resource corev1.ResourceRequirements `json:"resources"`
}

type metricsCollectorConfigJSON struct {
Image string `json:"image"`
Resource corev1.ResourceRequirements `json:"resources"`
}

// GetSuggestionConfigData gets the config data for the given algorithm name.
func GetSuggestionConfigData(algorithmName string, client client.Client) (map[string]string, error) {
configMap := &corev1.ConfigMap{}
suggestionConfigData := map[string]string{}
Expand All @@ -24,10 +35,7 @@ func GetSuggestionConfigData(algorithmName string, client client.Client) (map[st
if err != nil {
return map[string]string{}, err
}
type suggestionConfigJSON struct {
Image string `json:"image"`
Resource corev1.ResourceRequirements `json:"resources"`
}

if config, ok := configMap.Data[consts.LabelSuggestionTag]; ok {
suggestionsConfig := map[string]suggestionConfigJSON{}
if err := json.Unmarshal([]byte(config), &suggestionsConfig); err != nil {
Expand Down Expand Up @@ -87,35 +95,75 @@ func GetSuggestionConfigData(algorithmName string, client client.Client) (map[st
return suggestionConfigData, nil
}

func GetMetricsCollectorImage(cKind common.CollectorKind, client client.Client) (string, error) {
// GetMetricsCollectorConfigData gets the config data for the given kind.
func GetMetricsCollectorConfigData(cKind common.CollectorKind, client client.Client) (map[string]string, error) {
configMap := &corev1.ConfigMap{}
metricsCollectorConfigData := map[string]string{}
err := client.Get(
context.TODO(),
apitypes.NamespacedName{Name: consts.KatibConfigMapName, Namespace: consts.DefaultKatibNamespace},
configMap)
if err != nil {
return "", err
return metricsCollectorConfigData, err
}
if mcs, ok := configMap.Data[consts.LabelMetricsCollectorSidecar]; ok {
// Get the config with name metrics-collector-sidecar.
if config, ok := configMap.Data[consts.LabelMetricsCollectorSidecar]; ok {
kind := string(cKind)
mcsConfig := map[string]map[string]string{}
if err := json.Unmarshal([]byte(mcs), &mcsConfig); err != nil {
return "", err
mcsConfig := map[string]metricsCollectorConfigJSON{}
if err := json.Unmarshal([]byte(config), &mcsConfig); err != nil {
return metricsCollectorConfigData, err
}
if mc, ok := mcsConfig[kind]; ok {
if image, yes := mc[consts.LabelMetricsCollectorSidecarImage]; yes {
if strings.TrimSpace(image) != "" {
return image, nil
} else {
return "", errors.New("Required value for " + consts.LabelMetricsCollectorSidecarImage + "configuration of metricsCollector kind " + kind)
}
// Get the config for the given cKind.
if metricsCollectorConfig, ok := mcsConfig[kind]; ok {
image := metricsCollectorConfig.Image
// If the image is not empty, we set it into result.
if strings.TrimSpace(image) != "" {
metricsCollectorConfigData[consts.LabelMetricsCollectorSidecarImage] = image
} else {
return "", errors.New("Failed to find " + consts.LabelMetricsCollectorSidecarImage + " configuration of metricsCollector kind " + kind)
return metricsCollectorConfigData, errors.New("Required value for " + consts.LabelMetricsCollectorSidecarImage + "configuration of metricsCollector kind " + kind)
}

// Set default values for CPU, Memory and Disk
metricsCollectorConfigData[consts.LabelMetricsCollectorCPURequestTag] = consts.DefaultCPURequest
metricsCollectorConfigData[consts.LabelMetricsCollectorMemRequestTag] = consts.DefaultMemRequest
metricsCollectorConfigData[consts.LabelMetricsCollectorDiskRequestTag] = consts.DefaultDiskRequest
metricsCollectorConfigData[consts.LabelMetricsCollectorCPULimitTag] = consts.DefaultCPULimit
metricsCollectorConfigData[consts.LabelMetricsCollectorMemLimitTag] = consts.DefaultMemLimit
metricsCollectorConfigData[consts.LabelMetricsCollectorDiskLimitTag] = consts.DefaultDiskLimit

// Get CPU, Memory and Disk Requests from config
cpuRequest := metricsCollectorConfig.Resource.Requests[corev1.ResourceCPU]
memRequest := metricsCollectorConfig.Resource.Requests[corev1.ResourceMemory]
diskRequest := metricsCollectorConfig.Resource.Requests[corev1.ResourceEphemeralStorage]
if !cpuRequest.IsZero() {
metricsCollectorConfigData[consts.LabelSuggestionCPURequestTag] = cpuRequest.String()
}
if !memRequest.IsZero() {
metricsCollectorConfigData[consts.LabelSuggestionMemRequestTag] = memRequest.String()
}
if !diskRequest.IsZero() {
metricsCollectorConfigData[consts.LabelSuggestionDiskRequestTag] = diskRequest.String()
}

// Get CPU, Memory and Disk Limits from config
cpuLimit := metricsCollectorConfig.Resource.Limits[corev1.ResourceCPU]
memLimit := metricsCollectorConfig.Resource.Limits[corev1.ResourceMemory]
diskLimit := metricsCollectorConfig.Resource.Limits[corev1.ResourceEphemeralStorage]
if !cpuLimit.IsZero() {
metricsCollectorConfigData[consts.LabelSuggestionCPULimitTag] = cpuLimit.String()
}
if !memLimit.IsZero() {
metricsCollectorConfigData[consts.LabelSuggestionMemLimitTag] = memLimit.String()
}
if !diskLimit.IsZero() {
metricsCollectorConfigData[consts.LabelSuggestionDiskLimitTag] = diskLimit.String()
}

} else {
return "", errors.New("Cannot support metricsCollector injection for kind " + kind)
return metricsCollectorConfigData, errors.New("Cannot support metricsCollector injection for kind " + kind)
}
} else {
return "", errors.New("Failed to find metrics collector configuration in configmap " + consts.KatibConfigMapName)
return metricsCollectorConfigData, errors.New("Failed to find metrics collector configuration in configmap " + consts.KatibConfigMapName)
}
return metricsCollectorConfigData, nil
}
51 changes: 49 additions & 2 deletions pkg/webhook/v1alpha3/pod/inject_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/spf13/viper"
v1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
apitypes "k8s.io/apimachinery/pkg/types"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -176,18 +177,64 @@ func (s *sidecarInjector) getMetricsCollectorContainer(trial *trialsv1alpha3.Tri
metricName += ";"
metricName += v
}
image, err := katibconfig.GetMetricsCollectorImage(mc.Collector.Kind, s.client)
metricsCollectorConfigData, err := katibconfig.GetMetricsCollectorConfigData(mc.Collector.Kind, s.client)
if err != nil {
return nil, err
}
args := getMetricsCollectorArgs(trial.Name, metricName, mc)
sidecarContainerName := getSidecarContainerName(trial.Spec.MetricsCollector.Collector.Kind)

// Get metricsCollector data from config
metricsCollectorContainerImage := metricsCollectorConfigData[consts.LabelMetricsCollectorSidecarImage]
metricsCollectorCPULimit := metricsCollectorConfigData[consts.LabelMetricsCollectorCPULimitTag]
metricsCollectorCPURequest := metricsCollectorConfigData[consts.LabelMetricsCollectorCPURequestTag]
metricsCollectorMemLimit := metricsCollectorConfigData[consts.LabelMetricsCollectorMemLimitTag]
metricsCollectorMemRequest := metricsCollectorConfigData[consts.LabelMetricsCollectorMemRequestTag]
metricsCollectorDiskLimit := metricsCollectorConfigData[consts.LabelMetricsCollectorDiskLimitTag]
metricsCollectorDiskRequest := metricsCollectorConfigData[consts.LabelMetricsCollectorDiskRequestTag]

cpuLimitQuantity, err := resource.ParseQuantity(metricsCollectorCPULimit)
if err != nil {
return nil, err
}
cpuRequestQuantity, err := resource.ParseQuantity(metricsCollectorCPURequest)
if err != nil {
return nil, err
}
memLimitQuantity, err := resource.ParseQuantity(metricsCollectorMemLimit)
if err != nil {
return nil, err
}
memRequestQuantity, err := resource.ParseQuantity(metricsCollectorMemRequest)
if err != nil {
return nil, err
}
diskLimitQuantity, err := resource.ParseQuantity(metricsCollectorDiskLimit)
if err != nil {
return nil, err
}
diskRequestQuantity, err := resource.ParseQuantity(metricsCollectorDiskRequest)
if err != nil {
return nil, err
}

injectContainer := v1.Container{
Name: sidecarContainerName,
Image: image,
Image: metricsCollectorContainerImage,
Args: args,
ImagePullPolicy: v1.PullIfNotPresent,
Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceCPU: cpuLimitQuantity,
v1.ResourceMemory: memLimitQuantity,
v1.ResourceEphemeralStorage: diskLimitQuantity,
},
Requests: v1.ResourceList{
v1.ResourceCPU: cpuRequestQuantity,
v1.ResourceMemory: memRequestQuantity,
v1.ResourceEphemeralStorage: diskRequestQuantity,
},
},
}

// Inject the security context when the flag is enabled.
Expand Down
12 changes: 4 additions & 8 deletions pkg/webhook/v1alpha3/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,15 @@ import (
"github.com/kubeflow/katib/pkg/webhook/v1alpha3/pod"
)

const (
katibControllerName = "katib-controller"
)

func AddToManager(m manager.Manager, port int32) error {
func AddToManager(m manager.Manager, port int32, serviceName string) error {
so := webhook.ServerOptions{
CertDir: "/tmp/cert",
BootstrapOptions: &webhook.BootstrapOptions{
Service: &webhook.Service{
Namespace: consts.DefaultKatibNamespace,
Name: katibControllerName,
Name: serviceName,
Selectors: map[string]string{
"app": katibControllerName,
"app": serviceName,
},
},
ValidatingWebhookConfigName: "katib-validating-webhook-config",
Expand All @@ -59,7 +55,7 @@ func AddToManager(m manager.Manager, port int32) error {
if !usingFS {
so.BootstrapOptions.Secret = &types.NamespacedName{
Namespace: consts.DefaultKatibNamespace,
Name: katibControllerName,
Name: serviceName,
}
}
server, err := webhook.NewServer("katib-admission-server", m, so)
Expand Down