From d41f56eb81ea09819f6f796e098c1f95874e6afd Mon Sep 17 00:00:00 2001 From: hougang liu Date: Sun, 22 Sep 2019 23:02:00 +0800 Subject: [PATCH 1/2] Implement tfevent collector --- .../v1alpha3/file-metricscollector/main.go | 6 +-- .../tfevent-metricscollector/Dockerfile | 6 ++- .../v1alpha3/tfevent-metricscollector/main.py | 16 +++++--- .../tfevent-metricscollector/requirements.txt | 1 + cmd/tfevent-metricscollector/v1alpha2/main.py | 2 +- examples/v1alpha3/tfjob-example.yaml | 18 ++++----- .../v1alpha3/common/__init__.py | 0 pkg/metricscollector/v1alpha3/common/pns.py | 39 +++++++++++++++++++ pkg/webhook/v1alpha3/pod/inject_webhook.go | 24 +++++++----- 9 files changed, 82 insertions(+), 30 deletions(-) create mode 100644 cmd/metricscollector/v1alpha3/tfevent-metricscollector/requirements.txt create mode 100644 pkg/metricscollector/v1alpha3/common/__init__.py create mode 100644 pkg/metricscollector/v1alpha3/common/pns.py diff --git a/cmd/metricscollector/v1alpha3/file-metricscollector/main.go b/cmd/metricscollector/v1alpha3/file-metricscollector/main.go index 2f98e972c7a..aa438a7bd19 100644 --- a/cmd/metricscollector/v1alpha3/file-metricscollector/main.go +++ b/cmd/metricscollector/v1alpha3/file-metricscollector/main.go @@ -50,10 +50,10 @@ import ( filemc "github.com/kubeflow/katib/pkg/metricscollector/v1alpha3/file-metricscollector" ) -var metricsFileName = flag.String("f", "", "Metrics File Name") +var metricsFileName = flag.String("path", "", "Metrics File Path") var trialName = flag.String("t", "", "Trial Name") -var managerService = flag.String("m", "", "Katib Manager service") -var metricNames = flag.String("mn", "", "Metric names") +var managerService = flag.String("s", "", "Katib Manager service") +var metricNames = flag.String("m", "", "Metric names") var pollInterval = flag.Duration("p", common.DefaultPollInterval, "Poll interval to check if main process of worker container exit") var timeout = flag.Duration("timeout", common.DefaultTimeout, "Timeout to check if main process of worker container exit") var waitAll = flag.Bool("w", common.DefaultWaitAll, "Whether wait for all other main process of container exiting") diff --git a/cmd/metricscollector/v1alpha3/tfevent-metricscollector/Dockerfile b/cmd/metricscollector/v1alpha3/tfevent-metricscollector/Dockerfile index 6ca78983db5..1426ed9a86f 100644 --- a/cmd/metricscollector/v1alpha3/tfevent-metricscollector/Dockerfile +++ b/cmd/metricscollector/v1alpha3/tfevent-metricscollector/Dockerfile @@ -1,5 +1,7 @@ FROM tensorflow/tensorflow:1.11.0 RUN pip install rfc3339 grpcio googleapis-common-protos ADD . /usr/src/app/github.com/kubeflow/katib -WORKDIR /usr/src/app/github.com/kubeflow/katib/cmd/tfevent-metricscollector/v1alpha3 -ENV PYTHONPATH /usr/src/app/github.com/kubeflow/katib:/usr/src/app/github.com/kubeflow/katib/pkg/apis/manager/v1alpha3/python:/usr/src/app/github.com/kubeflow/katib/pkg/metricscollector/v1alpha3/tfevent-metricscollector/ +WORKDIR /usr/src/app/github.com/kubeflow/katib/cmd/metricscollector/v1alpha3/tfevent-metricscollector/ +RUN pip install --no-cache-dir -r requirements.txt +ENV PYTHONPATH /usr/src/app/github.com/kubeflow/katib:/usr/src/app/github.com/kubeflow/katib/pkg/apis/manager/v1alpha3/python:/usr/src/app/github.com/kubeflow/katib/pkg/metricscollector/v1alpha3/tfevent-metricscollector/:/usr/src/app/github.com/kubeflow/katib/pkg/metricscollector/v1alpha3/common/ +ENTRYPOINT ["python", "main.py"] diff --git a/cmd/metricscollector/v1alpha3/tfevent-metricscollector/main.py b/cmd/metricscollector/v1alpha3/tfevent-metricscollector/main.py index d7e71ff8c04..0fdc725828b 100644 --- a/cmd/metricscollector/v1alpha3/tfevent-metricscollector/main.py +++ b/cmd/metricscollector/v1alpha3/tfevent-metricscollector/main.py @@ -2,6 +2,7 @@ import argparse import api_pb2 import api_pb2_grpc +from pns import WaitOtherMainProcesses from tfevent_loader import MetricsCollector from logging import getLogger, StreamHandler, INFO @@ -10,10 +11,9 @@ def parse_options(): description='TF-Event MetricsCollector', add_help = True ) - parser.add_argument("-a", "--manager_addr", type = str, default = "katib-manager") - parser.add_argument("-p", "--manager_port", type = int, default = 6789 ) + parser.add_argument("-s", "--manager_server_addr", type = str, default = "katib-manager:6789") parser.add_argument("-t", "--trial_name", type = str, default = "") - parser.add_argument("-d", "--log_dir", type = str, default = "/log") + parser.add_argument("-path", "--dir_path", type = str, default = "/log") parser.add_argument("-m", "--metric_names", type = str, default = "") opt = parser.parse_args() return opt @@ -26,11 +26,17 @@ def parse_options(): logger.addHandler(handler) logger.propagate = False opt = parse_options() + manager_server = opt.manager_server_addr.split(':') + if len(manager_server) != 2: + raise Exception("Invalid katib manager service address: %s" % opt.manager_server_addr) + WaitOtherMainProcesses() + mc = MetricsCollector(opt.metric_names.split(',')) - observation_log = mc.parse_file(opt.log_dir) + observation_log = mc.parse_file(opt.dir_path) + + channel = grpc.beta.implementations.insecure_channel(manager_server[0], int(manager_server[1])) - channel = grpc.beta.implementations.insecure_channel(opt.manager_addr, opt.manager_port) with api_pb2.beta_create_Manager_stub(channel) as client: logger.info("In " + opt.trial_name + " " + str(len(observation_log.metric_logs)) + " metrics will be reported.") client.ReportObservationLog(api_pb2.ReportObservationLogRequest( diff --git a/cmd/metricscollector/v1alpha3/tfevent-metricscollector/requirements.txt b/cmd/metricscollector/v1alpha3/tfevent-metricscollector/requirements.txt new file mode 100644 index 00000000000..4cc0c6c0ed4 --- /dev/null +++ b/cmd/metricscollector/v1alpha3/tfevent-metricscollector/requirements.txt @@ -0,0 +1 @@ +psutil==5.2.2 diff --git a/cmd/tfevent-metricscollector/v1alpha2/main.py b/cmd/tfevent-metricscollector/v1alpha2/main.py index d7e71ff8c04..ef09582266f 100644 --- a/cmd/tfevent-metricscollector/v1alpha2/main.py +++ b/cmd/tfevent-metricscollector/v1alpha2/main.py @@ -36,4 +36,4 @@ def parse_options(): client.ReportObservationLog(api_pb2.ReportObservationLogRequest( trial_name=opt.trial_name, observation_log=observation_log - ), 10) + ), 10) \ No newline at end of file diff --git a/examples/v1alpha3/tfjob-example.yaml b/examples/v1alpha3/tfjob-example.yaml index 9b3d6f27960..6d6e1826164 100644 --- a/examples/v1alpha3/tfjob-example.yaml +++ b/examples/v1alpha3/tfjob-example.yaml @@ -12,7 +12,14 @@ spec: goal: 0.99 objectiveMetricName: accuracy_1 algorithm: - algorithmName: random + algorithmName: hyperopt-random + metricsCollectorSpec: + source: + fileSystemPath: + path: /train + kind: Directory + collector: + kind: TensorFlowEvent parameters: - name: --learning_rate parameterType: double @@ -47,16 +54,9 @@ spec: command: - "python" - "/var/tf_mnist/mnist_with_summaries.py" - - "--log_dir=/train/{{.Trial}}" + - "--log_dir=/train/metrics" {{- with .HyperParameters}} {{- range .}} - "{{.Name}}={{.Value}}" {{- end}} {{- end}} - volumeMounts: - - mountPath: "/train" - name: "train" - volumes: - - name: "train" - persistentVolumeClaim: - claimName: "tfevent-volume" \ No newline at end of file diff --git a/pkg/metricscollector/v1alpha3/common/__init__.py b/pkg/metricscollector/v1alpha3/common/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/pkg/metricscollector/v1alpha3/common/pns.py b/pkg/metricscollector/v1alpha3/common/pns.py new file mode 100644 index 00000000000..dc3895ad20e --- /dev/null +++ b/pkg/metricscollector/v1alpha3/common/pns.py @@ -0,0 +1,39 @@ +import os +import psutil +import time + +def GetOtherMainProcesses(): + this_pid = psutil.Process().pid + pids = set() + for proc in psutil.process_iter(): + pid = proc.pid + ppid = proc.ppid() + if pid == 1 or pid == this_pid or ppid != 0: + # ignore the pause container, our own pid, and non-root processes + continue + pids.add(pid) + return pids + +def WaitPIDs(pids, poll_interval_seconds=1, timeout_seconds=0, is_wait_all=False): + start = 0 + pids = set(pids) + if poll_interval_seconds <= 0: + raise Exception("Poll interval seconds must be a positive integer") + while (timeout_seconds <= 0 or start < timeout_seconds) and len(pids) > 0: + stop_pids = set() + for pid in pids: + path = "/proc/%d" % pid + if os.path.isdir(path): + continue + else: + if is_wait_all: + stop_pids.add(pid) + else: + return + if is_wait_all: + pids = pids - stop_pids + time.sleep(poll_interval_seconds) + start = start + poll_interval_seconds + +def WaitOtherMainProcesses(poll_interval_seconds=1, timeout_seconds=0, is_wait_all=False): + return WaitPIDs(GetOtherMainProcesses(), poll_interval_seconds, timeout_seconds, is_wait_all) diff --git a/pkg/webhook/v1alpha3/pod/inject_webhook.go b/pkg/webhook/v1alpha3/pod/inject_webhook.go index cf5edd19cb9..7dc61681c41 100644 --- a/pkg/webhook/v1alpha3/pod/inject_webhook.go +++ b/pkg/webhook/v1alpha3/pod/inject_webhook.go @@ -159,9 +159,9 @@ func (s *sidecarInjector) Mutate(pod *v1.Pod, namespace string) (*v1.Pod, error) mutatedPod.Spec.ServiceAccountName = pod.Spec.ServiceAccountName mutatedPod.Spec.ShareProcessNamespace = pointer.BoolPtr(true) - if mountFile := getMountFile(trial.Spec.MetricsCollector); mountFile != "" { - wrapWorkerContainer(mutatedPod, kind, mountFile, trial.Spec.MetricsCollector) - if err = mutateVolume(mutatedPod, kind, mountFile); err != nil { + if mountPath := getMountPath(trial.Spec.MetricsCollector); mountPath != "" { + wrapWorkerContainer(mutatedPod, kind, mountPath, trial.Spec.MetricsCollector) + if err = mutateVolume(mutatedPod, kind, mountPath, trial.Spec.MetricsCollector.Source.FileSystemPath.Kind); err != nil { return nil, err } } @@ -203,17 +203,17 @@ func (s *sidecarInjector) getMetricsCollectorImage(cKind common.CollectorKind) ( } func getMetricsCollectorArgs(trialName, metricName string, mc trialsv1alpha3.MetricsCollectorSpec) []string { - args := []string{"-t", trialName, "-m", katibmanagerv1alpha3.GetManagerAddr(), "-mn", metricName} - if mountFile := getMountFile(mc); mountFile != "" { - args = append(args, "-f", mountFile) + args := []string{"-t", trialName, "-m", metricName, "-s", katibmanagerv1alpha3.GetManagerAddr()} + if mountPath := getMountPath(mc); mountPath != "" { + args = append(args, "-path", mountPath) } return args } -func getMountFile(mc trialsv1alpha3.MetricsCollectorSpec) string { +func getMountPath(mc trialsv1alpha3.MetricsCollectorSpec) string { if mc.Collector.Kind == common.StdOutCollector { return common.DefaultFilePath - } else if mc.Collector.Kind == common.FileCollector { + } else if mc.Collector.Kind == common.FileCollector || mc.Collector.Kind == common.TfEventCollector { return mc.Source.FileSystemPath.Path } else { return "" @@ -273,16 +273,20 @@ func isWorkerContainer(jobKind string, index int, c v1.Container) bool { return false } -func mutateVolume(pod *v1.Pod, jobKind, mountFile string) error { +func mutateVolume(pod *v1.Pod, jobKind, mountPath string, pathKind common.FileSystemKind) error { metricsVol := v1.Volume{ Name: common.MetricsVolume, VolumeSource: v1.VolumeSource{ EmptyDir: &v1.EmptyDirVolumeSource{}, }, } + dir := mountPath + if pathKind == common.FileKind { + dir = filepath.Dir(mountPath) + } vm := v1.VolumeMount{ Name: metricsVol.Name, - MountPath: filepath.Dir(mountFile), + MountPath: dir, } index_list := []int{} for i, c := range pod.Spec.Containers { From 83553997a9039df383ac424d0a65e6829dc34f23 Mon Sep 17 00:00:00 2001 From: hougang liu Date: Mon, 23 Sep 2019 10:29:05 +0800 Subject: [PATCH 2/2] Fix stdout collector error --- .../controller/common/v1alpha3/common_types.go | 1 + pkg/webhook/v1alpha3/pod/inject_webhook.go | 18 ++++++++++-------- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/pkg/apis/controller/common/v1alpha3/common_types.go b/pkg/apis/controller/common/v1alpha3/common_types.go index 447e2b1e010..36be85a5618 100644 --- a/pkg/apis/controller/common/v1alpha3/common_types.go +++ b/pkg/apis/controller/common/v1alpha3/common_types.go @@ -96,6 +96,7 @@ type FileSystemKind string const ( DirectoryKind FileSystemKind = "Directory" FileKind FileSystemKind = "File" + InvalidKind FileSystemKind = "Invalid" ) // +k8s:deepcopy-gen=true diff --git a/pkg/webhook/v1alpha3/pod/inject_webhook.go b/pkg/webhook/v1alpha3/pod/inject_webhook.go index 7dc61681c41..de8a9553ef8 100644 --- a/pkg/webhook/v1alpha3/pod/inject_webhook.go +++ b/pkg/webhook/v1alpha3/pod/inject_webhook.go @@ -159,9 +159,9 @@ func (s *sidecarInjector) Mutate(pod *v1.Pod, namespace string) (*v1.Pod, error) mutatedPod.Spec.ServiceAccountName = pod.Spec.ServiceAccountName mutatedPod.Spec.ShareProcessNamespace = pointer.BoolPtr(true) - if mountPath := getMountPath(trial.Spec.MetricsCollector); mountPath != "" { + if mountPath, pathKind := getMountPath(trial.Spec.MetricsCollector); mountPath != "" { wrapWorkerContainer(mutatedPod, kind, mountPath, trial.Spec.MetricsCollector) - if err = mutateVolume(mutatedPod, kind, mountPath, trial.Spec.MetricsCollector.Source.FileSystemPath.Kind); err != nil { + if err = mutateVolume(mutatedPod, kind, mountPath, pathKind); err != nil { return nil, err } } @@ -204,19 +204,21 @@ func (s *sidecarInjector) getMetricsCollectorImage(cKind common.CollectorKind) ( func getMetricsCollectorArgs(trialName, metricName string, mc trialsv1alpha3.MetricsCollectorSpec) []string { args := []string{"-t", trialName, "-m", metricName, "-s", katibmanagerv1alpha3.GetManagerAddr()} - if mountPath := getMountPath(mc); mountPath != "" { + if mountPath, _ := getMountPath(mc); mountPath != "" { args = append(args, "-path", mountPath) } return args } -func getMountPath(mc trialsv1alpha3.MetricsCollectorSpec) string { +func getMountPath(mc trialsv1alpha3.MetricsCollectorSpec) (string, common.FileSystemKind) { if mc.Collector.Kind == common.StdOutCollector { - return common.DefaultFilePath - } else if mc.Collector.Kind == common.FileCollector || mc.Collector.Kind == common.TfEventCollector { - return mc.Source.FileSystemPath.Path + return common.DefaultFilePath, common.FileKind + } else if mc.Collector.Kind == common.FileCollector { + return mc.Source.FileSystemPath.Path, common.FileKind + } else if mc.Collector.Kind == common.TfEventCollector { + return mc.Source.FileSystemPath.Path, common.DirectoryKind } else { - return "" + return "", common.InvalidKind } }