Skip to content

Commit

Permalink
Custom CRD: Set dynamic watch from controller flags (#1302)
Browse files Browse the repository at this point in the history
  • Loading branch information
andreyvelich authored Sep 2, 2020
1 parent ced8496 commit ef6557a
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 1 deletion.
6 changes: 6 additions & 0 deletions cmd/katib-controller/v1beta1/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
apis "github.com/kubeflow/katib/pkg/apis/controller"
controller "github.com/kubeflow/katib/pkg/controller.v1beta1"
"github.com/kubeflow/katib/pkg/controller.v1beta1/consts"
trialutil "github.com/kubeflow/katib/pkg/controller.v1beta1/trial/util"
webhook "github.com/kubeflow/katib/pkg/webhook/v1beta1"
)

Expand All @@ -44,6 +45,7 @@ func main() {
var injectSecurityContext bool
var serviceName string
var enableGRPCProbeInSuggestion bool
var trialResources trialutil.GvkListFlag

flag.StringVar(&experimentSuggestionName, "experiment-suggestion-name",
"default", "The implementation of suggestion interface in experiment controller (default)")
Expand All @@ -53,6 +55,7 @@ func main() {
flag.BoolVar(&injectSecurityContext, "webhook-inject-securitycontext", false, "Inject the securityContext of container[0] in the sidecar")
flag.StringVar(&serviceName, "webhook-service-name", "katib-controller", "The service name which will be used in webhook")
flag.BoolVar(&enableGRPCProbeInSuggestion, "enable-grpc-probe-in-suggestion", true, "enable grpc probe in suggestions")
flag.Var(&trialResources, "trial-resources", "The list of resources that can be used as trial template, in the form: Kind.version.group (e.g. TFJob.v1.kubeflow.org)")

flag.Parse()

Expand All @@ -61,6 +64,7 @@ func main() {
viper.Set(consts.ConfigCertLocalFS, certLocalFS)
viper.Set(consts.ConfigInjectSecurityContext, injectSecurityContext)
viper.Set(consts.ConfigEnableGRPCProbeInSuggestion, enableGRPCProbeInSuggestion)
viper.Set(consts.ConfigTrialResources, trialResources)

log.Info("Config:",
consts.ConfigExperimentSuggestionName,
Expand All @@ -75,6 +79,8 @@ func main() {
viper.GetBool(consts.ConfigInjectSecurityContext),
consts.ConfigEnableGRPCProbeInSuggestion,
viper.GetBool(consts.ConfigEnableGRPCProbeInSuggestion),
"trial-resources",
viper.Get(consts.ConfigTrialResources),
)

// Get a config to talk to the apiserver
Expand Down
3 changes: 3 additions & 0 deletions pkg/controller.v1beta1/consts/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ const (
// ConfigEnableGRPCProbeInSuggestion is the config name which indicates
// if we should set GRPC probe in suggestion deployments.
ConfigEnableGRPCProbeInSuggestion = "enable-grpc-probe-in-suggestion"
// ConfigTrialResources is the config name which indicates
// resources list which can be used as trial template
ConfigTrialResources = "trial-resources"

// LabelExperimentName is the label of experiment name.
LabelExperimentName = "experiment"
Expand Down
31 changes: 31 additions & 0 deletions pkg/controller.v1beta1/trial/trial_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,12 @@ import (
"sigs.k8s.io/controller-runtime/pkg/source"

trialsv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/trials/v1beta1"
"github.com/kubeflow/katib/pkg/controller.v1beta1/consts"
"github.com/kubeflow/katib/pkg/controller.v1beta1/trial/managerclient"
trialutil "github.com/kubeflow/katib/pkg/controller.v1beta1/trial/util"
"github.com/kubeflow/katib/pkg/controller.v1beta1/util"
jobv1beta1 "github.com/kubeflow/katib/pkg/job/v1beta1"
"github.com/spf13/viper"
)

const (
Expand Down Expand Up @@ -124,6 +126,35 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
log.Info("Job watch added successfully", "CRD Kind", gvk.Kind)
}
}

trialResources := viper.Get(consts.ConfigTrialResources)
if trialResources != nil {
// Cast interface to gvk slice object
gvkList := trialResources.(trialutil.GvkListFlag)

// Watch for changes in custom resources
for _, gvk := range gvkList {
unstructuredJob := &unstructured.Unstructured{}
unstructuredJob.SetGroupVersionKind(gvk)
err = c.Watch(
&source.Kind{Type: unstructuredJob},
&handler.EnqueueRequestForOwner{
IsController: true,
OwnerType: &trialsv1beta1.Trial{},
})
if err != nil {
if meta.IsNoMatchError(err) {
log.Info("Job watch error. CRD might be missing. Please install CRD and restart katib-controller",
"CRD Group", gvk.Group, "CRD Version", gvk.Version, "CRD Kind", gvk.Kind)
continue
}
return err
}
log.Info("Job watch added successfully",
"CRD Group", gvk.Group, "CRD Version", gvk.Version, "CRD Kind", gvk.Kind)
}
}

log.Info("Trial controller created")
return nil
}
Expand Down
18 changes: 18 additions & 0 deletions pkg/controller.v1beta1/trial/trial_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
tfv1 "github.com/kubeflow/tf-operator/pkg/apis/tensorflow/v1"
"github.com/onsi/gomega"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/viper"
"golang.org/x/net/context"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
Expand All @@ -22,6 +23,7 @@ import (
commonv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/common/v1beta1"
trialsv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/trials/v1beta1"
api_pb "github.com/kubeflow/katib/pkg/apis/manager/v1beta1"
"github.com/kubeflow/katib/pkg/controller.v1beta1/consts"
trialutil "github.com/kubeflow/katib/pkg/controller.v1beta1/trial/util"
util "github.com/kubeflow/katib/pkg/controller.v1beta1/util"
managerclientmock "github.com/kubeflow/katib/pkg/mock/v1beta1/trial/managerclient"
Expand Down Expand Up @@ -50,6 +52,22 @@ func TestAdd(t *testing.T) {
mgr, err := manager.New(cfg, manager.Options{})
g.Expect(err).NotTo(gomega.HaveOccurred())

// Set fake trial resources
trialResources := trialutil.GvkListFlag{
{
Group: "kubeflow.org",
Version: "v1",
Kind: "TFJob",
},
{
Group: "kubeflow.org",
Version: "v1",
Kind: "MPIJob",
},
}

viper.Set(consts.ConfigTrialResources, trialResources)

// Test - Try to add Trial controller to the manager
g.Expect(Add(mgr)).NotTo(gomega.HaveOccurred())
}
Expand Down
30 changes: 30 additions & 0 deletions pkg/controller.v1beta1/trial/util/flag_util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package util

import (
"fmt"
"strings"

"k8s.io/apimachinery/pkg/runtime/schema"
)

// GvkListFlag is the custom flag to parse GroupVersionKind list for trial resources.
type GvkListFlag []schema.GroupVersionKind

// Set is the method to convert gvk to string value
func (flag *GvkListFlag) String() string {
gvkStrings := []string{}
for _, x := range []schema.GroupVersionKind(*flag) {
gvkStrings = append(gvkStrings, x.String())
}
return strings.Join(gvkStrings, ",")
}

// Set is the method to set gvk from string flag value
func (flag *GvkListFlag) Set(value string) error {
gvk, _ := schema.ParseKindArg(value)
if gvk == nil {
return fmt.Errorf("Invalid GroupVersionKind: %v", value)
}
*flag = append(*flag, *gvk)
return nil
}
2 changes: 1 addition & 1 deletion pkg/controller.v1beta1/trial/util/prometheus_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package trial
package util

import (
"context"
Expand Down

0 comments on commit ef6557a

Please sign in to comment.