Skip to content

Commit

Permalink
Fix Pod's ownership to inject metrics collector (#1303)
Browse files Browse the repository at this point in the history
* Refactor get Katib job

* Get trial after func

* Remove trialName

* return error

* Remove error

* Resolve
  • Loading branch information
andreyvelich authored Sep 3, 2020
1 parent 36aef5f commit cba4560
Show file tree
Hide file tree
Showing 4 changed files with 439 additions and 192 deletions.
4 changes: 4 additions & 0 deletions pkg/webhook/v1beta1/pod/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ import (
const (
MasterRole = "master"
BatchJob = "Job"
// TrialKind is the name of Trial kind
TrialKind = "Trial"
// TrialAPIVersion is the name of Trial API Version
TrialAPIVersion = "kubeflow.org/v1beta1"
)

var (
Expand Down
206 changes: 64 additions & 142 deletions pkg/webhook/v1beta1/pod/inject_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ package pod

import (
"context"
"fmt"
"errors"
"net/http"
"path/filepath"
"strings"

"github.com/spf13/viper"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
apitypes "k8s.io/apimachinery/pkg/types"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -35,10 +35,8 @@ import (

common "github.com/kubeflow/katib/pkg/apis/controller/common/v1beta1"
trialsv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/trials/v1beta1"
katibmanagerv1beta1 "github.com/kubeflow/katib/pkg/common/v1beta1"
"github.com/kubeflow/katib/pkg/controller.v1beta1/consts"
jobv1beta1 "github.com/kubeflow/katib/pkg/job/v1beta1"
mccommon "github.com/kubeflow/katib/pkg/metricscollector/v1beta1/common"
"github.com/kubeflow/katib/pkg/controller.v1beta1/util"
"github.com/kubeflow/katib/pkg/util/v1beta1/katibconfig"
)

Expand Down Expand Up @@ -108,7 +106,13 @@ func NewSidecarInjector(c client.Client) *sidecarInjector {
}

func (s *sidecarInjector) MutationRequired(pod *v1.Pod, ns string) (bool, error) {
jobKind, jobName, err := getKatibJob(pod)
object, err := util.ConvertObjectToUnstructured(pod)
if err != nil {
return false, err
}

// Try to get Katib job kind and job name from mutating pod
jobKind, jobName, err := s.getKatibJob(object, ns)
if err != nil {
return false, nil
}
Expand Down Expand Up @@ -141,9 +145,17 @@ func (s *sidecarInjector) MutationRequired(pod *v1.Pod, ns string) (bool, error)
func (s *sidecarInjector) Mutate(pod *v1.Pod, namespace string) (*v1.Pod, error) {
mutatedPod := pod.DeepCopy()

kind, trialName, _ := getKatibJob(pod)
object, err := util.ConvertObjectToUnstructured(pod)
if err != nil {
return nil, err
}

// Try to get Katib job kind and job name from mutating pod
jobKind, jobName, _ := s.getKatibJob(object, namespace)

trial := &trialsv1beta1.Trial{}
if err := s.client.Get(context.TODO(), apitypes.NamespacedName{Name: trialName, Namespace: namespace}, trial); err != nil {
// jobName and Trial name is equal
if err := s.client.Get(context.TODO(), apitypes.NamespacedName{Name: jobName, Namespace: namespace}, trial); err != nil {
return nil, err
}

Expand All @@ -157,16 +169,16 @@ func (s *sidecarInjector) Mutate(pod *v1.Pod, namespace string) (*v1.Pod, error)

mountPath, pathKind := getMountPath(trial.Spec.MetricsCollector)
if mountPath != "" {
if err = mutateVolume(mutatedPod, kind, mountPath, injectContainer.Name, pathKind); err != nil {
if err = mutateVolume(mutatedPod, jobKind, mountPath, injectContainer.Name, pathKind); err != nil {
return nil, err
}
}
if needWrapWorkerContainer(trial.Spec.MetricsCollector) {
if err = wrapWorkerContainer(mutatedPod, namespace, kind, mountPath, pathKind, trial.Spec.MetricsCollector); err != nil {
if err = wrapWorkerContainer(mutatedPod, namespace, jobKind, mountPath, pathKind, trial.Spec.MetricsCollector); err != nil {
return nil, err
}
}
log.Info("Inject metrics collector sidecar container", "Pod Generate Name", mutatedPod.GenerateName, "Trial", trialName)
log.Info("Inject metrics collector sidecar container", "Pod Generate Name", mutatedPod.GenerateName, "Trial", jobName)
return mutatedPod, nil
}

Expand Down Expand Up @@ -206,143 +218,53 @@ func (s *sidecarInjector) getMetricsCollectorContainer(trial *trialsv1beta1.Tria
return &injectContainer, nil
}

func getMetricsCollectorArgs(trialName, metricName string, mc common.MetricsCollectorSpec) []string {
args := []string{"-t", trialName, "-m", metricName, "-s", katibmanagerv1beta1.GetDBManagerAddr()}
if mountPath, _ := getMountPath(mc); mountPath != "" {
args = append(args, "-path", mountPath)
}
if mc.Source != nil && mc.Source.Filter != nil && len(mc.Source.Filter.MetricsFormat) > 0 {
args = append(args, "-f", strings.Join(mc.Source.Filter.MetricsFormat, ";"))
}
return args
}

func getMountPath(mc common.MetricsCollectorSpec) (string, common.FileSystemKind) {
if mc.Collector.Kind == common.StdOutCollector {
return common.DefaultFilePath, common.FileKind
} else if mc.Collector.Kind == common.FileCollector {
return mc.Source.FileSystemPath.Path, common.FileKind
} else if mc.Collector.Kind == common.TfEventCollector {
return mc.Source.FileSystemPath.Path, common.DirectoryKind
} else if mc.Collector.Kind == common.CustomCollector {
if mc.Source == nil || mc.Source.FileSystemPath == nil {
return "", common.InvalidKind
}
return mc.Source.FileSystemPath.Path, mc.Source.FileSystemPath.Kind
} else {
return "", common.InvalidKind
}
}

func needWrapWorkerContainer(mc common.MetricsCollectorSpec) bool {
mcKind := mc.Collector.Kind
for _, kind := range NeedWrapWorkerMetricsCollecterList {
if mcKind == kind {
return true
}
}
return false
}

func wrapWorkerContainer(
pod *v1.Pod, namespace, jobKind, metricsFile string,
pathKind common.FileSystemKind,
mc common.MetricsCollectorSpec) error {
index := -1
for i, c := range pod.Spec.Containers {
jobProvider, err := jobv1beta1.New(jobKind)
if err != nil {
return err
}
if jobProvider.IsTrainingContainer(i, c) {
index = i
break
func (s *sidecarInjector) getKatibJob(object *unstructured.Unstructured, namespace string) (string, string, error) {
owners := object.GetOwnerReferences()
// jobKind and jobName points to the object kind and name that Trial is created
jobKind := ""
jobName := ""
// Search for Trial owner in object owner references
// Trial is owned object if kind = Trial kind and API version = Trial API version
for _, owner := range owners {
if owner.Kind == TrialKind && owner.APIVersion == TrialAPIVersion {
jobKind = object.GetKind()
jobName = object.GetName()
}
}
if index >= 0 {
command := []string{"sh", "-c"}
args, err := getContainerCommand(pod, namespace, index)
if err != nil {
return err
}
// If the first two commands are sh -c, we do not inject command.
if args[0] == "sh" || args[0] == "bash" {
if args[1] == "-c" {
command = args[0:2]
args = args[2:]
// If Trial is not found in object owners search for nested owners
if jobKind == "" {
i := 0
// Search for Trial ownership unless jobKind is empty and owners is exists
for jobKind == "" && i < len(owners) {
nestedJob := &unstructured.Unstructured{}
// Get group and version from owner API version
gv, err := schema.ParseGroupVersion(owners[i].APIVersion)
if err != nil {
return "", "", err
}
}
if mc.Collector.Kind == common.StdOutCollector {
redirectStr := fmt.Sprintf("1>%s 2>&1", metricsFile)
args = append(args, redirectStr)
}
args = append(args, "&&", getMarkCompletedCommand(metricsFile, pathKind))
argsStr := strings.Join(args, " ")
c := &pod.Spec.Containers[index]
c.Command = command
c.Args = []string{argsStr}
}
return nil
}

func getMarkCompletedCommand(mountPath string, pathKind common.FileSystemKind) string {
dir := mountPath
if pathKind == common.FileKind {
dir = filepath.Dir(mountPath)
}
// $$ is process id in shell
pidFile := filepath.Join(dir, "$$$$.pid")
return fmt.Sprintf("echo %s > %s", mccommon.TrainingCompleted, pidFile)
}

func mutateVolume(pod *v1.Pod, jobKind, mountPath, sidecarContainerName string, pathKind common.FileSystemKind) error {
metricsVol := v1.Volume{
Name: common.MetricsVolume,
VolumeSource: v1.VolumeSource{
EmptyDir: &v1.EmptyDirVolumeSource{},
},
}
dir := mountPath
if pathKind == common.FileKind {
dir = filepath.Dir(mountPath)
}
vm := v1.VolumeMount{
Name: metricsVol.Name,
MountPath: dir,
}
indexList := []int{}
for i, c := range pod.Spec.Containers {
shouldMount := false
if c.Name == sidecarContainerName {
shouldMount = true
} else {
jobProvider, err := jobv1beta1.New(jobKind)
gvk := schema.GroupVersionKind{
Group: gv.Group,
Version: gv.Version,
Kind: owners[i].Kind,
}
// Set GVK for nested unstructured object
nestedJob.SetGroupVersionKind(gvk)
// Get nested object from cluster.
// Nested object namespace must be equal to object namespace
err = s.client.Get(context.TODO(), apitypes.NamespacedName{Name: owners[i].Name, Namespace: namespace}, nestedJob)
if err != nil {
return err
return "", "", err
}
shouldMount = jobProvider.IsTrainingContainer(i, c)
}
if shouldMount {
indexList = append(indexList, i)
}
}
for _, i := range indexList {
c := &pod.Spec.Containers[i]
if c.VolumeMounts == nil {
c.VolumeMounts = make([]v1.VolumeMount, 0)
// Recursively search for Trial ownership in nested object
jobKind, jobName, err = s.getKatibJob(nestedJob, namespace)
i++
}
c.VolumeMounts = append(c.VolumeMounts, vm)
pod.Spec.Containers[i] = *c
}
pod.Spec.Volumes = append(pod.Spec.Volumes, metricsVol)

return nil
}

func getSidecarContainerName(cKind common.CollectorKind) string {
if cKind == common.StdOutCollector || cKind == common.FileCollector {
return mccommon.MetricLoggerCollectorContainerName
} else {
return mccommon.MetricCollectorContainerName
// If jobKind is empty after the loop, Trial doesn't own the object
if jobKind == "" {
return "", "", errors.New("The Pod doesn't belong to Katib Job")
}

return jobKind, jobName, nil
}
Loading

0 comments on commit cba4560

Please sign in to comment.