diff --git a/Gopkg.lock b/Gopkg.lock index e03c1f2f44a..5e00b59f670 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -321,6 +321,14 @@ revision = "1624edc4454b8682399def8740d46db5e4362ba4" version = "v1.1.5" +[[projects]] + digest = "1:11eef84ee2fae64144174a994ee8b5c96c028c091565a325303d771d2c73e705" + name = "github.com/kubeflow/pytorch-operator" + packages = ["pkg/apis/pytorch/v1beta1"] + pruneopts = "T" + revision = "97b3b9974baf73959329ae9ee4d18ef5b85a61a7" + version = "v0.4.0-rc.1" + [[projects]] digest = "1:931643b24140b2960fb80e46d15c68587485c41d1ee9d9f887563e0d11ab8f15" name = "github.com/kubeflow/tf-operator" @@ -1050,6 +1058,7 @@ "github.com/grpc-ecosystem/grpc-gateway/protoc-gen-swagger", "github.com/grpc-ecosystem/grpc-gateway/runtime", "github.com/grpc-ecosystem/grpc-gateway/utilities", + "github.com/kubeflow/pytorch-operator/pkg/apis/pytorch/v1beta1", "github.com/kubeflow/tf-operator/pkg/apis/common/v1beta1", "github.com/kubeflow/tf-operator/pkg/apis/tensorflow/v1beta1", "github.com/onsi/ginkgo", diff --git a/Gopkg.toml b/Gopkg.toml index 11114237894..3b86b0af8b6 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -85,3 +85,7 @@ required = [ [[constraint]] name = "github.com/kubeflow/tf-operator" version = "0.4.0-rc.1" + +[[constraint]] + name = "github.com/kubeflow/pytorch-operator" + version = "0.4.0-rc.1" diff --git a/cmd/metricscollector/main.go b/cmd/metricscollector/main.go index f76a5641a49..ad22aec6208 100644 --- a/cmd/metricscollector/main.go +++ b/cmd/metricscollector/main.go @@ -52,11 +52,12 @@ import ( var studyID = flag.String("s", "", "Study ID") var trialID = flag.String("t", "", "Trial ID") var workerID = flag.String("w", "", "Worker ID") +var workerKind = flag.String("k", "", "Worker Kind") var namespace = flag.String("n", "", "NameSpace") func main() { flag.Parse() - log.Printf("Study ID: %s, Trial ID: %s, Worker ID: %s", *studyID, *trialID, *workerID) + log.Printf("Study ID: %s, Trial ID: %s, Worker ID: %s, Worker Kind: %s", *studyID, *trialID, *workerID, *workerKind) conn, err := grpc.Dial(pkg.ManagerAddr, grpc.WithInsecure()) if err != nil { log.Fatalf("could not connect: %v", err) @@ -75,7 +76,7 @@ func main() { if err != nil { log.Fatalf("Failed to GetStudyConf: %v", err) } - mls, err := mc.CollectWorkerLog(*workerID, screp.StudyConfig.ObjectiveValueName, screp.StudyConfig.Metrics, *namespace) + mls, err := mc.CollectWorkerLog(*workerID, *workerKind, screp.StudyConfig.ObjectiveValueName, screp.StudyConfig.Metrics, *namespace) if err != nil { log.Printf("Failed to collect logs: %v", err) return diff --git a/examples/pytorchjob-example.yaml b/examples/pytorchjob-example.yaml new file mode 100644 index 00000000000..232a28301f2 --- /dev/null +++ b/examples/pytorchjob-example.yaml @@ -0,0 +1,93 @@ +apiVersion: "kubeflow.org/v1alpha1" +kind: StudyJob +metadata: + namespace: kubeflow + labels: + controller-tools.k8s.io: "1.0" + name: pytorchjob-example +spec: + studyName: pytorchjob-example + owner: crd + optimizationtype: maximize + objectivevaluename: accuracy + optimizationgoal: 0.99 + requestcount: 4 + metricsnames: + - accuracy + parameterconfigs: + - name: --lr + parametertype: double + feasible: + min: "0.01" + max: "0.05" + - name: --momentum + parametertype: double + feasible: + min: "0.5" + max: "0.9" + workerSpec: + retain: true + goTemplate: + rawTemplate: |- + apiVersion: "kubeflow.org/v1beta1" + kind: PyTorchJob + metadata: + name: {{.WorkerID}} + namespace: kubeflow + spec: + pytorchReplicaSpecs: + Master: + replicas: 1 + restartPolicy: Never + template: + spec: + containers: + - name: pytorch + image: gcr.io/kubeflow-ci/pytorch-mnist-with-summary:0.4 + imagePullPolicy: Always + command: + - "python" + - "/opt/pytorch_dist_mnist/mnist_with_summary.py" + {{- with .HyperParameters}} + {{- range .}} + - "{{.Name}}={{.Value}}" + {{- end}} + {{- end}} + metricsCollectorSpec: + retain: true + goTemplate: + rawTemplate: |- + apiVersion: batch/v1beta1 + kind: CronJob + metadata: + name: {{.WorkerID}} + namespace: kubeflow + spec: + schedule: "*/1 * * * *" + successfulJobsHistoryLimit: 15 + failedJobsHistoryLimit: 15 + jobTemplate: + spec: + template: + spec: + serviceAccountName: metrics-collector + containers: + - name: {{.WorkerID}} + image: johnugeorge/metrics-collector + args: + - "./metricscollector" + - "-s" + - "{{.StudyID}}" + - "-t" + - "{{.TrialID}}" + - "-w" + - "{{.WorkerID}}" + - "-k" + - "{{.WorkerKind}}" + - "-n" + - "{{.NameSpace}}" + restartPolicy: Never + + suggestionSpec: + suggestionAlgorithm: "random" + requestNumber: 3 diff --git a/manifests/studyjobcontroller/metricsControllerConfigMap.yaml b/manifests/studyjobcontroller/metricsControllerConfigMap.yaml index 02b750c725e..ae26cb138b7 100644 --- a/manifests/studyjobcontroller/metricsControllerConfigMap.yaml +++ b/manifests/studyjobcontroller/metricsControllerConfigMap.yaml @@ -30,6 +30,8 @@ data: - "{{.TrialID}}" - "-w" - "{{.WorkerID}}" + - "-k" + - "{{.WorkerKind}}" - "-n" - "{{.NameSpace}}" restartPolicy: Never diff --git a/manifests/studyjobcontroller/pytorchjobsCrd.yaml b/manifests/studyjobcontroller/pytorchjobsCrd.yaml new file mode 100644 index 00000000000..b2dd419b8d4 --- /dev/null +++ b/manifests/studyjobcontroller/pytorchjobsCrd.yaml @@ -0,0 +1,12 @@ +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + name: pytorchjobs.kubeflow.org +spec: + group: kubeflow.org + version: v1beta1 + scope: Namespaced + names: + kind: PyTorchJob + singular: pytorchjob + plural: pytorchjobs diff --git a/manifests/studyjobcontroller/rbac.yaml b/manifests/studyjobcontroller/rbac.yaml index ccf9749f4ba..db1013d609e 100644 --- a/manifests/studyjobcontroller/rbac.yaml +++ b/manifests/studyjobcontroller/rbac.yaml @@ -42,6 +42,7 @@ rules: - kubeflow.org resources: - tfjobs + - pytorchjobs verbs: - "*" --- diff --git a/pkg/api/operators/apis/addtoscheme_pytorchjob_v1beta1.go b/pkg/api/operators/apis/addtoscheme_pytorchjob_v1beta1.go new file mode 100644 index 00000000000..9eee8cef54d --- /dev/null +++ b/pkg/api/operators/apis/addtoscheme_pytorchjob_v1beta1.go @@ -0,0 +1,22 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apis + +import ( + "github.com/kubeflow/pytorch-operator/pkg/apis/pytorch/v1beta1" +) + +func init() { + // Register the types with the Scheme so the components can map objects to GroupVersionKinds and back + AddToSchemes = append(AddToSchemes, v1beta1.SchemeBuilder.AddToScheme) +} diff --git a/pkg/controller/studyjob/const.go b/pkg/controller/studyjob/const.go index e0344510f73..61c8e64f8fb 100644 --- a/pkg/controller/studyjob/const.go +++ b/pkg/controller/studyjob/const.go @@ -13,5 +13,6 @@ package studyjob const ( DefaultJobWorker = "Job" - TFJobWorker = "TFJob" + TFJobWorker = "TFJob" + PyTorchJobWorker = "PyTorchJob" ) diff --git a/pkg/controller/studyjob/manifest_parser.go b/pkg/controller/studyjob/manifest_parser.go index 95f633f3e47..c003355a0f3 100644 --- a/pkg/controller/studyjob/manifest_parser.go +++ b/pkg/controller/studyjob/manifest_parser.go @@ -130,14 +130,15 @@ func getWorkerManifest(c katibapi.ManagerClient, studyID string, trial *katibapi return wid, &b, nil } -func getMetricsCollectorManifest(studyID string, trialID string, workerID string, namespace string, mcs *katibv1alpha1.MetricsCollectorSpec) (*bytes.Buffer, error) { +func getMetricsCollectorManifest(studyID string, trialID string, workerID string, workerKind string, namespace string, mcs *katibv1alpha1.MetricsCollectorSpec) (*bytes.Buffer, error) { var mtp *template.Template = nil var err error tmpValues := map[string]string{ - "StudyID": studyID, - "TrialID": trialID, - "WorkerID": workerID, - "NameSpace": namespace, + "StudyID": studyID, + "TrialID": trialID, + "WorkerID": workerID, + "WorkerKind": workerKind, + "NameSpace": namespace, } if mcs != nil { if mcs.GoTemplate.RawTemplate != "" { diff --git a/pkg/controller/studyjob/studyjob_controller.go b/pkg/controller/studyjob/studyjob_controller.go index be185940ff7..df35a19755d 100644 --- a/pkg/controller/studyjob/studyjob_controller.go +++ b/pkg/controller/studyjob/studyjob_controller.go @@ -24,6 +24,7 @@ import ( "github.com/kubeflow/katib/pkg" katibapi "github.com/kubeflow/katib/pkg/api" katibv1alpha1 "github.com/kubeflow/katib/pkg/api/operators/apis/studyjob/v1alpha1" + pytorchjobv1beta1 "github.com/kubeflow/pytorch-operator/pkg/apis/pytorch/v1beta1" commonv1beta1 "github.com/kubeflow/tf-operator/pkg/apis/common/v1beta1" tfjobv1beta1 "github.com/kubeflow/tf-operator/pkg/apis/tensorflow/v1beta1" @@ -117,6 +118,16 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error { return err } + err = c.Watch( + &source.Kind{Type: &pytorchjobv1beta1.PyTorchJob{}}, + &handler.EnqueueRequestForOwner{ + IsController: true, + OwnerType: &katibv1alpha1.StudyJob{}, + }) + if err != nil { + return err + } + return nil } @@ -431,6 +442,10 @@ func (r *ReconcileStudyJobController) checkStatus(instance *katibv1alpha1.StudyJ if err := r.deleteWorkerResources(instance, &tfjobv1beta1.TFJob{}, ns, w.WorkerID); err != nil { return false, err } + case PyTorchJobWorker: + if err := r.deleteWorkerResources(instance, &pytorchjobv1beta1.PyTorchJob{}, ns, w.WorkerID); err != nil { + return false, err + } } continue } @@ -475,6 +490,27 @@ func (r *ReconcileStudyJobController) checkStatus(instance *katibv1alpha1.StudyJ WorkerState: state, } update, err = r.updateWorker(c, instance, js, ns, cwids[0:], i, j) + case PyTorchJobWorker: + pytorchjob := &pytorchjobv1beta1.PyTorchJob{} + nname := types.NamespacedName{Namespace: ns, Name: w.WorkerID} + pytorchjoberr := r.Client.Get(context.TODO(), nname, pytorchjob) + if pytorchjoberr != nil { + continue + } + var state katibapi.State = katibapi.State_RUNNING + if len(pytorchjob.Status.Conditions) > 0 { + lc := pytorchjob.Status.Conditions[len(pytorchjob.Status.Conditions)-1] + if lc.Type == commonv1beta1.JobSucceeded { + state = katibapi.State_COMPLETED + } else if lc.Type == commonv1beta1.JobFailed { + state = katibapi.State_ERROR + } + } + js := WorkerStatus{ + CompletionTime: pytorchjob.Status.CompletionTime, + WorkerState: state, + } + update, err = r.updateWorker(c, instance, js, ns, cwids[0:], i, j) } } @@ -634,6 +670,23 @@ func (r *ReconcileStudyJobController) spawnWorker(instance *katibv1alpha1.StudyJ log.Printf("TFJob Create error %v", err) return "", err } + case PyTorchJobWorker: + var pytorchjob pytorchjobv1beta1.PyTorchJob + if err := k8syaml.NewYAMLOrJSONDecoder(wm, BUFSIZE).Decode(&pytorchjob); err != nil { + instance.Status.Condition = katibv1alpha1.ConditionFailed + log.Printf("Yaml decode error %v", err) + return "", err + } + if err := controllerutil.SetControllerReference(instance, &pytorchjob, r.scheme); err != nil { + instance.Status.Condition = katibv1alpha1.ConditionFailed + log.Printf("SetControllerReference error %v", err) + return "", err + } + if err := r.Create(context.TODO(), &pytorchjob); err != nil { + instance.Status.Condition = katibv1alpha1.ConditionFailed + log.Printf("PytorchJob Create error %v", err) + return "", err + } } return wid, nil } @@ -641,7 +694,13 @@ func (r *ReconcileStudyJobController) spawnWorker(instance *katibv1alpha1.StudyJ func (r *ReconcileStudyJobController) spawnMetricsCollector(instance *katibv1alpha1.StudyJob, c katibapi.ManagerClient, studyID string, trialID string, workerID string, namespace string, mcs *katibv1alpha1.MetricsCollectorSpec) error { var mcjob batchv1beta.CronJob BUFSIZE := 1024 - mcm, err := getMetricsCollectorManifest(studyID, trialID, workerID, namespace, mcs) + wkind, err := getWorkerKind(instance.Spec.WorkerSpec) + if err != nil { + log.Printf("getWorkerKind error %v", err) + instance.Status.Condition = katibv1alpha1.ConditionFailed + return err + } + mcm, err := getMetricsCollectorManifest(studyID, trialID, workerID, wkind, namespace, mcs) if err != nil { log.Printf("getMetricsCollectorManifest error %v", err) return err diff --git a/pkg/manager/metricscollector/meticscollector.go b/pkg/manager/metricscollector/meticscollector.go index 78b317938d0..afac5313fa0 100644 --- a/pkg/manager/metricscollector/meticscollector.go +++ b/pkg/manager/metricscollector/meticscollector.go @@ -13,6 +13,7 @@ import ( restclient "k8s.io/client-go/rest" "github.com/kubeflow/katib/pkg/api" + "github.com/kubeflow/katib/pkg/controller/studyjob" ) type MetricsCollector struct { @@ -34,8 +35,16 @@ func NewMetricsCollector() (*MetricsCollector, error) { } -func (d *MetricsCollector) CollectWorkerLog(wID string, objectiveValueName string, metrics []string, namespace string) (*api.MetricsLogSet, error) { - pl, _ := d.clientset.CoreV1().Pods(namespace).List(metav1.ListOptions{LabelSelector: "job-name=" + wID, IncludeUninitialized: true}) +func (d *MetricsCollector) CollectWorkerLog(wID string, wkind string, objectiveValueName string, metrics []string, namespace string) (*api.MetricsLogSet, error) { + var labelName string + if wkind == studyjob.TFJobWorker { + labelName = "tf_job_name" + } else if wkind == studyjob.PyTorchJobWorker { + labelName = "pytorch_job_name" + } else { + labelName = "job-name" + } + pl, _ := d.clientset.CoreV1().Pods(namespace).List(metav1.ListOptions{LabelSelector: labelName + "=" + wID, IncludeUninitialized: true}) if len(pl.Items) == 0 { return nil, errors.New(fmt.Sprintf("No Pods are found in Job %v", wID)) } diff --git a/scripts/deploy.sh b/scripts/deploy.sh index 1939716c925..6f106ff5f43 100755 --- a/scripts/deploy.sh +++ b/scripts/deploy.sh @@ -34,6 +34,7 @@ kubectl apply -f manifests/vizier/suggestion/hyperband kubectl apply -f manifests/vizier/suggestion/bayesianoptimization kubectl apply -f manifests/studyjobcontroller/crd.yaml kubectl apply -f manifests/studyjobcontroller/tfjobsCrd.yaml +kubectl apply -f manifests/studyjobcontroller/pytorchjobsCrd.yaml kubectl apply -f manifests/studyjobcontroller/rbac.yaml kubectl apply -f manifests/studyjobcontroller/mcrbac.yaml kubectl apply -f manifests/studyjobcontroller/workerConfigMap.yaml diff --git a/test/scripts/run-tests.sh b/test/scripts/run-tests.sh index c2ca3eacca4..e1720ef8194 100755 --- a/test/scripts/run-tests.sh +++ b/test/scripts/run-tests.sh @@ -102,6 +102,7 @@ kubectl -n kubeflow get pod kubectl -n kubeflow port-forward $(kubectl -n kubeflow get pod -o=name | grep vizier-core | grep -v vizier-core-rest | sed -e "s@pods\/@@") 6789:6789 & echo "kubectl port-forward start" +sleep 5 TIMEOUT=120 until curl localhost:6789 || [ $TIMEOUT -eq 0 ]; do sleep 5