Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Pytorch job in Katib #283

Merged
merged 6 commits into from
Dec 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,7 @@ required = [
[[constraint]]
name = "github.com/kubeflow/tf-operator"
version = "0.4.0-rc.1"

[[constraint]]
name = "github.com/kubeflow/pytorch-operator"
version = "0.4.0-rc.1"
5 changes: 3 additions & 2 deletions cmd/metricscollector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,12 @@ import (
var studyID = flag.String("s", "", "Study ID")
var trialID = flag.String("t", "", "Trial ID")
var workerID = flag.String("w", "", "Worker ID")
var workerKind = flag.String("k", "", "Worker Kind")
var namespace = flag.String("n", "", "NameSpace")

func main() {
flag.Parse()
log.Printf("Study ID: %s, Trial ID: %s, Worker ID: %s", *studyID, *trialID, *workerID)
log.Printf("Study ID: %s, Trial ID: %s, Worker ID: %s, Worker Kind: %s", *studyID, *trialID, *workerID, *workerKind)
conn, err := grpc.Dial(pkg.ManagerAddr, grpc.WithInsecure())
if err != nil {
log.Fatalf("could not connect: %v", err)
Expand All @@ -75,7 +76,7 @@ func main() {
if err != nil {
log.Fatalf("Failed to GetStudyConf: %v", err)
}
mls, err := mc.CollectWorkerLog(*workerID, screp.StudyConfig.ObjectiveValueName, screp.StudyConfig.Metrics, *namespace)
mls, err := mc.CollectWorkerLog(*workerID, *workerKind, screp.StudyConfig.ObjectiveValueName, screp.StudyConfig.Metrics, *namespace)
if err != nil {
log.Printf("Failed to collect logs: %v", err)
return
Expand Down
93 changes: 93 additions & 0 deletions examples/pytorchjob-example.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
apiVersion: "kubeflow.org/v1alpha1"
kind: StudyJob
metadata:
namespace: kubeflow
labels:
controller-tools.k8s.io: "1.0"
name: pytorchjob-example
spec:
studyName: pytorchjob-example
owner: crd
optimizationtype: maximize
objectivevaluename: accuracy
optimizationgoal: 0.99
requestcount: 4
metricsnames:
- accuracy
parameterconfigs:
- name: --lr
parametertype: double
feasible:
min: "0.01"
max: "0.05"
- name: --momentum
parametertype: double
feasible:
min: "0.5"
max: "0.9"
workerSpec:
retain: true
goTemplate:
rawTemplate: |-
apiVersion: "kubeflow.org/v1beta1"
kind: PyTorchJob
metadata:
name: {{.WorkerID}}
namespace: kubeflow
spec:
pytorchReplicaSpecs:
Master:
replicas: 1
restartPolicy: Never
template:
spec:
containers:
- name: pytorch
image: gcr.io/kubeflow-ci/pytorch-mnist-with-summary:0.4
imagePullPolicy: Always
command:
- "python"
- "/opt/pytorch_dist_mnist/mnist_with_summary.py"
{{- with .HyperParameters}}
{{- range .}}
- "{{.Name}}={{.Value}}"
{{- end}}
{{- end}}
metricsCollectorSpec:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you also need to configure PV here?

Copy link
Member Author

@johnugeorge johnugeorge Dec 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@richardsliu Currently, this example uses the default metric collector which parses the stdout logs. It doesn't need PV. I will add one more example that uses the tf event metric collector.

retain: true
goTemplate:
rawTemplate: |-
apiVersion: batch/v1beta1
kind: CronJob
metadata:
name: {{.WorkerID}}
namespace: kubeflow
spec:
schedule: "*/1 * * * *"
successfulJobsHistoryLimit: 15
failedJobsHistoryLimit: 15
jobTemplate:
spec:
template:
spec:
serviceAccountName: metrics-collector
containers:
- name: {{.WorkerID}}
image: johnugeorge/metrics-collector
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same with this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New metric-collector image has to be created once this current PR is merged

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will change the image name once the new image is created after this PR merge.

args:
- "./metricscollector"
- "-s"
- "{{.StudyID}}"
- "-t"
- "{{.TrialID}}"
- "-w"
- "{{.WorkerID}}"
- "-k"
- "{{.WorkerKind}}"
- "-n"
- "{{.NameSpace}}"
restartPolicy: Never

suggestionSpec:
suggestionAlgorithm: "random"
requestNumber: 3
2 changes: 2 additions & 0 deletions manifests/studyjobcontroller/metricsControllerConfigMap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ data:
- "{{.TrialID}}"
- "-w"
- "{{.WorkerID}}"
- "-k"
- "{{.WorkerKind}}"
- "-n"
- "{{.NameSpace}}"
restartPolicy: Never
12 changes: 12 additions & 0 deletions manifests/studyjobcontroller/pytorchjobsCrd.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: pytorchjobs.kubeflow.org
spec:
group: kubeflow.org
version: v1beta1
scope: Namespaced
names:
kind: PyTorchJob
singular: pytorchjob
plural: pytorchjobs
1 change: 1 addition & 0 deletions manifests/studyjobcontroller/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ rules:
- kubeflow.org
resources:
- tfjobs
- pytorchjobs
verbs:
- "*"
---
Expand Down
22 changes: 22 additions & 0 deletions pkg/api/operators/apis/addtoscheme_pytorchjob_v1beta1.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package apis

import (
"github.com/kubeflow/pytorch-operator/pkg/apis/pytorch/v1beta1"
)

func init() {
// Register the types with the Scheme so the components can map objects to GroupVersionKinds and back
AddToSchemes = append(AddToSchemes, v1beta1.SchemeBuilder.AddToScheme)
}
3 changes: 2 additions & 1 deletion pkg/controller/studyjob/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ package studyjob

const (
DefaultJobWorker = "Job"
TFJobWorker = "TFJob"
TFJobWorker = "TFJob"
PyTorchJobWorker = "PyTorchJob"
)
11 changes: 6 additions & 5 deletions pkg/controller/studyjob/manifest_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,14 +130,15 @@ func getWorkerManifest(c katibapi.ManagerClient, studyID string, trial *katibapi
return wid, &b, nil
}

func getMetricsCollectorManifest(studyID string, trialID string, workerID string, namespace string, mcs *katibv1alpha1.MetricsCollectorSpec) (*bytes.Buffer, error) {
func getMetricsCollectorManifest(studyID string, trialID string, workerID string, workerKind string, namespace string, mcs *katibv1alpha1.MetricsCollectorSpec) (*bytes.Buffer, error) {
var mtp *template.Template = nil
var err error
tmpValues := map[string]string{
"StudyID": studyID,
"TrialID": trialID,
"WorkerID": workerID,
"NameSpace": namespace,
"StudyID": studyID,
"TrialID": trialID,
"WorkerID": workerID,
"WorkerKind": workerKind,
"NameSpace": namespace,
}
if mcs != nil {
if mcs.GoTemplate.RawTemplate != "" {
Expand Down
61 changes: 60 additions & 1 deletion pkg/controller/studyjob/studyjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/kubeflow/katib/pkg"
katibapi "github.com/kubeflow/katib/pkg/api"
katibv1alpha1 "github.com/kubeflow/katib/pkg/api/operators/apis/studyjob/v1alpha1"
pytorchjobv1beta1 "github.com/kubeflow/pytorch-operator/pkg/apis/pytorch/v1beta1"
commonv1beta1 "github.com/kubeflow/tf-operator/pkg/apis/common/v1beta1"
tfjobv1beta1 "github.com/kubeflow/tf-operator/pkg/apis/tensorflow/v1beta1"

Expand Down Expand Up @@ -117,6 +118,16 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
return err
}

err = c.Watch(
&source.Kind{Type: &pytorchjobv1beta1.PyTorchJob{}},
&handler.EnqueueRequestForOwner{
IsController: true,
OwnerType: &katibv1alpha1.StudyJob{},
})
if err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -431,6 +442,10 @@ func (r *ReconcileStudyJobController) checkStatus(instance *katibv1alpha1.StudyJ
if err := r.deleteWorkerResources(instance, &tfjobv1beta1.TFJob{}, ns, w.WorkerID); err != nil {
return false, err
}
case PyTorchJobWorker:
if err := r.deleteWorkerResources(instance, &pytorchjobv1beta1.PyTorchJob{}, ns, w.WorkerID); err != nil {
return false, err
}
}
continue
}
Expand Down Expand Up @@ -475,6 +490,27 @@ func (r *ReconcileStudyJobController) checkStatus(instance *katibv1alpha1.StudyJ
WorkerState: state,
}
update, err = r.updateWorker(c, instance, js, ns, cwids[0:], i, j)
case PyTorchJobWorker:
pytorchjob := &pytorchjobv1beta1.PyTorchJob{}
nname := types.NamespacedName{Namespace: ns, Name: w.WorkerID}
pytorchjoberr := r.Client.Get(context.TODO(), nname, pytorchjob)
if pytorchjoberr != nil {
continue
}
var state katibapi.State = katibapi.State_RUNNING
if len(pytorchjob.Status.Conditions) > 0 {
lc := pytorchjob.Status.Conditions[len(pytorchjob.Status.Conditions)-1]
if lc.Type == commonv1beta1.JobSucceeded {
state = katibapi.State_COMPLETED
} else if lc.Type == commonv1beta1.JobFailed {
state = katibapi.State_ERROR
}
}
js := WorkerStatus{
CompletionTime: pytorchjob.Status.CompletionTime,
WorkerState: state,
}
update, err = r.updateWorker(c, instance, js, ns, cwids[0:], i, j)

}
}
Expand Down Expand Up @@ -634,14 +670,37 @@ func (r *ReconcileStudyJobController) spawnWorker(instance *katibv1alpha1.StudyJ
log.Printf("TFJob Create error %v", err)
return "", err
}
case PyTorchJobWorker:
var pytorchjob pytorchjobv1beta1.PyTorchJob
if err := k8syaml.NewYAMLOrJSONDecoder(wm, BUFSIZE).Decode(&pytorchjob); err != nil {
instance.Status.Condition = katibv1alpha1.ConditionFailed
log.Printf("Yaml decode error %v", err)
return "", err
}
if err := controllerutil.SetControllerReference(instance, &pytorchjob, r.scheme); err != nil {
instance.Status.Condition = katibv1alpha1.ConditionFailed
log.Printf("SetControllerReference error %v", err)
return "", err
}
if err := r.Create(context.TODO(), &pytorchjob); err != nil {
instance.Status.Condition = katibv1alpha1.ConditionFailed
log.Printf("PytorchJob Create error %v", err)
return "", err
}
}
return wid, nil
}

func (r *ReconcileStudyJobController) spawnMetricsCollector(instance *katibv1alpha1.StudyJob, c katibapi.ManagerClient, studyID string, trialID string, workerID string, namespace string, mcs *katibv1alpha1.MetricsCollectorSpec) error {
var mcjob batchv1beta.CronJob
BUFSIZE := 1024
mcm, err := getMetricsCollectorManifest(studyID, trialID, workerID, namespace, mcs)
wkind, err := getWorkerKind(instance.Spec.WorkerSpec)
if err != nil {
log.Printf("getWorkerKind error %v", err)
instance.Status.Condition = katibv1alpha1.ConditionFailed
return err
}
mcm, err := getMetricsCollectorManifest(studyID, trialID, workerID, wkind, namespace, mcs)
if err != nil {
log.Printf("getMetricsCollectorManifest error %v", err)
return err
Expand Down
13 changes: 11 additions & 2 deletions pkg/manager/metricscollector/meticscollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
restclient "k8s.io/client-go/rest"

"github.com/kubeflow/katib/pkg/api"
"github.com/kubeflow/katib/pkg/controller/studyjob"
)

type MetricsCollector struct {
Expand All @@ -34,8 +35,16 @@ func NewMetricsCollector() (*MetricsCollector, error) {

}

func (d *MetricsCollector) CollectWorkerLog(wID string, objectiveValueName string, metrics []string, namespace string) (*api.MetricsLogSet, error) {
pl, _ := d.clientset.CoreV1().Pods(namespace).List(metav1.ListOptions{LabelSelector: "job-name=" + wID, IncludeUninitialized: true})
func (d *MetricsCollector) CollectWorkerLog(wID string, wkind string, objectiveValueName string, metrics []string, namespace string) (*api.MetricsLogSet, error) {
var labelName string
if wkind == studyjob.TFJobWorker {
labelName = "tf_job_name"
} else if wkind == studyjob.PyTorchJobWorker {
labelName = "pytorch_job_name"
} else {
labelName = "job-name"
}
pl, _ := d.clientset.CoreV1().Pods(namespace).List(metav1.ListOptions{LabelSelector: labelName + "=" + wID, IncludeUninitialized: true})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whitch pod will be watched in Pytorch and TFJob Job by pytorch_job_name = wID and tf_job_name = wID ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pods spawn by the pytorch job and tf job will have this label key whose value is set to job name https://github.com/kubeflow/tf-operator/blob/master/pkg/common/jobcontroller/jobcontroller.go#L190

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, my question is pytorch job and tf job will create several pods, and metrics collector will get logs from one pod.
Does metrics collector get logs from which one of the pods created by a pytorch job?

Copy link
Member Author

@johnugeorge johnugeorge Dec 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, it uses only 1 worker. If there are more workers, Master can take responsibility to emit logs. We need to separately discuss better ways to tackle distributed job. @richardsliu

if len(pl.Items) == 0 {
return nil, errors.New(fmt.Sprintf("No Pods are found in Job %v", wID))
}
Expand Down
1 change: 1 addition & 0 deletions scripts/deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ kubectl apply -f manifests/vizier/suggestion/hyperband
kubectl apply -f manifests/vizier/suggestion/bayesianoptimization
kubectl apply -f manifests/studyjobcontroller/crd.yaml
kubectl apply -f manifests/studyjobcontroller/tfjobsCrd.yaml
kubectl apply -f manifests/studyjobcontroller/pytorchjobsCrd.yaml
kubectl apply -f manifests/studyjobcontroller/rbac.yaml
kubectl apply -f manifests/studyjobcontroller/mcrbac.yaml
kubectl apply -f manifests/studyjobcontroller/workerConfigMap.yaml
Expand Down
1 change: 1 addition & 0 deletions test/scripts/run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ kubectl -n kubeflow get pod

kubectl -n kubeflow port-forward $(kubectl -n kubeflow get pod -o=name | grep vizier-core | grep -v vizier-core-rest | sed -e "s@pods\/@@") 6789:6789 &
echo "kubectl port-forward start"
sleep 5
TIMEOUT=120
until curl localhost:6789 || [ $TIMEOUT -eq 0 ]; do
sleep 5
Expand Down