From c42e0c918b3b3196d499aee135e59da7bebb0441 Mon Sep 17 00:00:00 2001 From: Dery Rahman Ahaddienata Date: Fri, 24 Feb 2023 10:43:44 +0700 Subject: [PATCH] feat: configurable endpoint for kubernetes pod executor (#740) * feat: add entrypoint feature on plugins * remove: fetch env and secret from api * feat: return info with entrypoint * remove: unecessary param on dag template * test: fix test * refactor: use get entrypoint func * feat: make entrypoint as required * feat: validate plugin yaml info --- client/cmd/plugin/validate.go | 5 +- ext/scheduler/airflow/__lib.py | 29 +------ ext/scheduler/airflow/dag/compiler_test.go | 28 +++--- ext/scheduler/airflow/dag/dag.py.tmpl | 25 +++--- ext/scheduler/airflow/dag/expected_dag.py | 43 ++++----- ext/scheduler/airflow/dag/models.go | 16 ++-- internal/models/plugin.go | 27 +----- plugin/yaml/plugin.go | 3 +- plugin/yaml/plugin_test.go | 28 ++++-- plugin/yaml/tests/sample_plugin.yaml | 2 +- .../tests/sample_plugin_schema_invalid.yaml | 1 - .../tests/sample_plugin_without_version.yaml | 2 +- sdk/plugin/mock/plugin.go | 1 + sdk/plugin/plugin.go | 34 ++++++++ sdk/plugin/plugin_test.go | 87 +++++++++++++++++++ 15 files changed, 205 insertions(+), 126 deletions(-) diff --git a/client/cmd/plugin/validate.go b/client/cmd/plugin/validate.go index 58cf6bdb30..03e2632bc8 100644 --- a/client/cmd/plugin/validate.go +++ b/client/cmd/plugin/validate.go @@ -39,11 +39,12 @@ func (v *validateCommand) validateFile(pluginPath string) error { if filepath.Ext(pluginPath) != ".yaml" { return errors.New("expecting .yaml file at " + pluginPath) } - _, err := yaml.NewPluginSpec(pluginPath) + pluginSpec, err := yaml.NewPluginSpec(pluginPath) if err != nil { return err } - return nil + + return pluginSpec.Info.Validate() } func (v *validateCommand) validateDir(pluginPath string) error { diff --git a/ext/scheduler/airflow/__lib.py b/ext/scheduler/airflow/__lib.py index eb78d6b93a..3dbc547e09 100644 --- a/ext/scheduler/airflow/__lib.py +++ b/ext/scheduler/airflow/__lib.py @@ -52,15 +52,7 @@ def lookup_non_standard_cron_expression(expr: str) -> str: class SuperKubernetesPodOperator(KubernetesPodOperator): - def __init__(self, - optimus_hostname, - optimus_projectname, - optimus_namespacename, - optimus_jobname, - optimus_jobtype, - optimus_instancename, - *args, - **kwargs): + def __init__(self, *args, **kwargs): super(SuperKubernetesPodOperator, self).__init__(*args, **kwargs) self.do_xcom_push = kwargs.get('do_xcom_push') self.namespace = kwargs.get('namespace') @@ -69,32 +61,13 @@ def __init__(self, self.reattach_on_restart = kwargs.get('reattach_on_restart') self.config_file = kwargs.get('config_file') - # used to fetch job env from optimus for adding to k8s pod - self.optimus_hostname = optimus_hostname - self.optimus_namespacename = optimus_namespacename - self.optimus_jobname = optimus_jobname - self.optimus_instancename = optimus_instancename - self.optimus_projectname = optimus_projectname - self.optimus_jobtype = optimus_jobtype - self._optimus_client = OptimusAPIClient(optimus_hostname) - def render_init_containers(self, context): for ic in self.init_containers: env = getattr(ic, 'env') if env: self.render_template(env, context) - def fetch_env_from_optimus(self, context): - scheduled_at = context["next_execution_date"].strftime(TIMESTAMP_FORMAT) - job_meta = self._optimus_client.get_job_run_input(scheduled_at, self.optimus_projectname, self.optimus_jobname, self.optimus_jobtype, self.optimus_instancename) - return [ - k8s.V1EnvVar(name=key, value=val) for key, val in job_meta["envs"].items() - ] + [ - k8s.V1EnvVar(name=key, value=val) for key, val in job_meta["secrets"].items() - ] - def execute(self, context): - self.env_vars += self.fetch_env_from_optimus(context) self.render_init_containers(context) # call KubernetesPodOperator to handle the pod return super().execute(context) diff --git a/ext/scheduler/airflow/dag/compiler_test.go b/ext/scheduler/airflow/dag/compiler_test.go index 4b24329e5e..a7a1e463ca 100644 --- a/ext/scheduler/airflow/dag/compiler_test.go +++ b/ext/scheduler/airflow/dag/compiler_test.go @@ -157,32 +157,36 @@ func (m mockPluginRepo) GetByName(name string) (*plugin.Plugin, error) { func setupPluginRepo() mockPluginRepo { execUnit := new(mock.YamlMod) execUnit.On("PluginInfo").Return(&plugin.Info{ - Name: "bq-bq", - Image: "example.io/namespace/bq2bq-executor:latest", + Name: "bq-bq", + Image: "example.io/namespace/bq2bq-executor:latest", + Entrypoint: "python3 /opt/bumblebee/main.py", }, nil) transporterHook := "transporter" hookUnit := new(mock.YamlMod) hookUnit.On("PluginInfo").Return(&plugin.Info{ - Name: transporterHook, - HookType: plugin.HookTypePre, - Image: "example.io/namespace/transporter-executor:latest", - DependsOn: []string{"predator"}, + Name: transporterHook, + HookType: plugin.HookTypePre, + Image: "example.io/namespace/transporter-executor:latest", + Entrypoint: "java -cp /opt/transporter/transporter.jar:/opt/transporter/jolokia-jvm-agent.jar -javaagent:jolokia-jvm-agent.jar=port=7777,host=0.0.0.0 com.gojek.transporter.Main", + DependsOn: []string{"predator"}, }, nil) predatorHook := "predator" hookUnit2 := new(mock.YamlMod) hookUnit2.On("PluginInfo").Return(&plugin.Info{ - Name: predatorHook, - HookType: plugin.HookTypePost, - Image: "example.io/namespace/predator-image:latest", + Name: predatorHook, + HookType: plugin.HookTypePost, + Image: "example.io/namespace/predator-image:latest", + Entrypoint: "predator ${SUB_COMMAND} -s ${PREDATOR_URL} -u \"${BQ_PROJECT}.${BQ_DATASET}.${BQ_TABLE}\"", }, nil) hookUnit3 := new(mock.YamlMod) hookUnit3.On("PluginInfo").Return(&plugin.Info{ - Name: "failureHook", - HookType: plugin.HookTypeFail, - Image: "example.io/namespace/failure-hook-image:latest", + Name: "failureHook", + HookType: plugin.HookTypeFail, + Image: "example.io/namespace/failure-hook-image:latest", + Entrypoint: "sleep 5", }, nil) repo := mockPluginRepo{plugins: []*plugin.Plugin{ diff --git a/ext/scheduler/airflow/dag/dag.py.tmpl b/ext/scheduler/airflow/dag/dag.py.tmpl index 133daecfd3..426e5c0d82 100644 --- a/ext/scheduler/airflow/dag/dag.py.tmpl +++ b/ext/scheduler/airflow/dag/dag.py.tmpl @@ -102,6 +102,13 @@ IMAGE_PULL_POLICY = "IfNotPresent" INIT_CONTAINER_IMAGE = "odpf/optimus:{{.Version}}" INIT_CONTAINER_ENTRYPOINT = "/opt/entrypoint_init_container.sh" +def get_entrypoint_cmd(plugin_entrypoint): + path_config = JOB_DIR + "/in/.env" + path_secret = JOB_DIR + "/in/.secret" + entrypoint = "set -o allexport; source {path_config}; set +o allexport; cat {path_config}; ".format(path_config=path_config) + entrypoint += "set -o allexport; source {path_secret}; set +o allexport; ".format(path_secret=path_secret) + return entrypoint + plugin_entrypoint + volume = k8s.V1Volume( name='asset-volume', empty_dir=k8s.V1EmptyDirVolumeSource() @@ -140,16 +147,11 @@ init_container = k8s.V1Container( {{ $transformationName := print "transformation_" .Task.Name | DisplayName -}} {{$transformationName}} = SuperKubernetesPodOperator( - optimus_hostname="{{$.Hostname}}", - optimus_projectname="{{$.Tenant.ProjectName.String}}", - optimus_namespacename="{{$.Tenant.NamespaceName.String}}", - optimus_jobname="{{.JobDetails.Name}}", - optimus_jobtype="{{$.ExecutorTask}}", - optimus_instancename="{{.Task.Name}}", image_pull_policy=IMAGE_PULL_POLICY, namespace=conf.get('kubernetes', 'namespace', fallback="default"), image={{ .Task.Image | quote}}, - cmds=[], + cmds=["/bin/sh"], + arguments=["-c", get_entrypoint_cmd("""{{.Task.Entrypoint}} """)], name="{{ .Task.Name | replace "_" "-" }}", task_id={{ .Task.Name | quote}}, get_logs=True, @@ -188,16 +190,11 @@ init_container_{{$hookName}} = k8s.V1Container( ) hook_{{$hookName}} = SuperKubernetesPodOperator( - optimus_instancename="{{$hookName}}", - optimus_hostname="{{$.Hostname}}", - optimus_projectname="{{$.Tenant.ProjectName.String}}", - optimus_namespacename="{{$.Tenant.NamespaceName.String}}", - optimus_jobname="{{$.JobDetails.Name}}", - optimus_jobtype="{{$.ExecutorHook}}", image_pull_policy=IMAGE_PULL_POLICY, namespace=conf.get('kubernetes', 'namespace', fallback="default"), image="{{ $t.Image }}", - cmds=[], + cmds=["/bin/sh"], + arguments=["-c", get_entrypoint_cmd("""{{ $t.Entrypoint }} """)], name="hook_{{ $t.Name | replace "_" "-" }}", task_id="hook_{{ $t.Name }}", get_logs=True, diff --git a/ext/scheduler/airflow/dag/expected_dag.py b/ext/scheduler/airflow/dag/expected_dag.py index 94179f1bec..a51ef117aa 100644 --- a/ext/scheduler/airflow/dag/expected_dag.py +++ b/ext/scheduler/airflow/dag/expected_dag.py @@ -72,6 +72,13 @@ INIT_CONTAINER_IMAGE = "odpf/optimus:dev" INIT_CONTAINER_ENTRYPOINT = "/opt/entrypoint_init_container.sh" +def get_entrypoint_cmd(plugin_entrypoint): + path_config = JOB_DIR + "/in/.env" + path_secret = JOB_DIR + "/in/.secret" + entrypoint = "set -o allexport; source {path_config}; set +o allexport; cat {path_config}; ".format(path_config=path_config) + entrypoint += "set -o allexport; source {path_secret}; set +o allexport; ".format(path_secret=path_secret) + return entrypoint + plugin_entrypoint + volume = k8s.V1Volume( name='asset-volume', empty_dir=k8s.V1EmptyDirVolumeSource() @@ -109,16 +116,11 @@ ) transformation_bq__dash__bq = SuperKubernetesPodOperator( - optimus_hostname="http://optimus.example.com", - optimus_projectname="example-proj", - optimus_namespacename="billing", - optimus_jobname="infra.billing.weekly-status-reports", - optimus_jobtype="task", - optimus_instancename="bq-bq", image_pull_policy=IMAGE_PULL_POLICY, namespace=conf.get('kubernetes', 'namespace', fallback="default"), image="example.io/namespace/bq2bq-executor:latest", - cmds=[], + cmds=["/bin/sh"], + arguments=["-c", get_entrypoint_cmd("""python3 /opt/bumblebee/main.py """)], name="bq-bq", task_id="bq-bq", get_logs=True, @@ -151,16 +153,11 @@ ) hook_transporter = SuperKubernetesPodOperator( - optimus_instancename="transporter", - optimus_hostname="http://optimus.example.com", - optimus_projectname="example-proj", - optimus_namespacename="billing", - optimus_jobname="infra.billing.weekly-status-reports", - optimus_jobtype="hook", image_pull_policy=IMAGE_PULL_POLICY, namespace=conf.get('kubernetes', 'namespace', fallback="default"), image="example.io/namespace/transporter-executor:latest", - cmds=[], + cmds=["/bin/sh"], + arguments=["-c", get_entrypoint_cmd("""java -cp /opt/transporter/transporter.jar:/opt/transporter/jolokia-jvm-agent.jar -javaagent:jolokia-jvm-agent.jar=port=7777,host=0.0.0.0 com.gojek.transporter.Main """)], name="hook_transporter", task_id="hook_transporter", get_logs=True, @@ -189,16 +186,11 @@ ) hook_predator = SuperKubernetesPodOperator( - optimus_instancename="predator", - optimus_hostname="http://optimus.example.com", - optimus_projectname="example-proj", - optimus_namespacename="billing", - optimus_jobname="infra.billing.weekly-status-reports", - optimus_jobtype="hook", image_pull_policy=IMAGE_PULL_POLICY, namespace=conf.get('kubernetes', 'namespace', fallback="default"), image="example.io/namespace/predator-image:latest", - cmds=[], + cmds=["/bin/sh"], + arguments=["-c", get_entrypoint_cmd("""predator ${SUB_COMMAND} -s ${PREDATOR_URL} -u "${BQ_PROJECT}.${BQ_DATASET}.${BQ_TABLE}" """)], name="hook_predator", task_id="hook_predator", get_logs=True, @@ -227,16 +219,11 @@ ) hook_failureHook = SuperKubernetesPodOperator( - optimus_instancename="failureHook", - optimus_hostname="http://optimus.example.com", - optimus_projectname="example-proj", - optimus_namespacename="billing", - optimus_jobname="infra.billing.weekly-status-reports", - optimus_jobtype="hook", image_pull_policy=IMAGE_PULL_POLICY, namespace=conf.get('kubernetes', 'namespace', fallback="default"), image="example.io/namespace/failure-hook-image:latest", - cmds=[], + cmds=["/bin/sh"], + arguments=["-c", get_entrypoint_cmd("""sleep 5 """)], name="hook_failureHook", task_id="hook_failureHook", get_logs=True, diff --git a/ext/scheduler/airflow/dag/models.go b/ext/scheduler/airflow/dag/models.go index 2909e9113e..0c0b2886f5 100644 --- a/ext/scheduler/airflow/dag/models.go +++ b/ext/scheduler/airflow/dag/models.go @@ -31,8 +31,9 @@ type TemplateContext struct { } type Task struct { - Name string - Image string + Name string + Image string + Entrypoint string } func PrepareTask(job *scheduler.Job, pluginRepo PluginRepo) (Task, error) { @@ -44,14 +45,16 @@ func PrepareTask(job *scheduler.Job, pluginRepo PluginRepo) (Task, error) { info := plugin.Info() return Task{ - Name: info.Name, - Image: info.Image, + Name: info.Name, + Image: info.Image, + Entrypoint: info.Entrypoint, }, nil } type Hook struct { Name string Image string + Entrypoint string IsFailHook bool } @@ -81,8 +84,9 @@ func PrepareHooksForJob(job *scheduler.Job, pluginRepo PluginRepo) (Hooks, error info := hook.Info() hk := Hook{ - Name: h.Name, - Image: info.Image, + Name: h.Name, + Image: info.Image, + Entrypoint: info.Entrypoint, } switch info.HookType { case plugin.HookTypePre: diff --git a/internal/models/plugin.go b/internal/models/plugin.go index 0ddf26a24d..154e350981 100644 --- a/internal/models/plugin.go +++ b/internal/models/plugin.go @@ -72,7 +72,7 @@ func (s *PluginRepository) GetHooks() []*plugin.Plugin { func (s *PluginRepository) AddYaml(yamlMod plugin.YamlMod) error { info := yamlMod.PluginInfo() - if err := validateYamlPluginInfo(info); err != nil { + if err := info.Validate(); err != nil { return err } @@ -103,31 +103,6 @@ func (s *PluginRepository) AddBinary(drMod plugin.DependencyResolverMod) error { return nil } -func validateYamlPluginInfo(info *plugin.Info) error { - if info.Name == "" { - return errors.New("plugin name cannot be empty") - } - - // image is a required field - if info.Image == "" { - return errors.New("plugin image cannot be empty") - } - - // version is a required field - if info.PluginVersion == "" { - return errors.New("plugin version cannot be empty") - } - - switch info.PluginType { - case plugin.TypeTask: - case plugin.TypeHook: - default: - return ErrUnsupportedPlugin - } - - return nil -} - func NewPluginRepository() *PluginRepository { return &PluginRepository{data: map[string]*plugin.Plugin{}} } diff --git a/plugin/yaml/plugin.go b/plugin/yaml/plugin.go index 8187bd193b..3b35c1f1cf 100644 --- a/plugin/yaml/plugin.go +++ b/plugin/yaml/plugin.go @@ -31,6 +31,7 @@ func (p *PluginSpec) PluginInfo() *plugin.Info { Name: p.Name, Description: p.Description, Image: p.Image, + Entrypoint: p.Entrypoint, PluginType: p.PluginType, PluginMods: []plugin.Mod{plugin.ModTypeCLI}, PluginVersion: p.PluginVersion, @@ -100,7 +101,7 @@ func NewPluginSpec(pluginPath string) (*PluginSpec, error) { return nil, err } var plugin PluginSpec - if err := yaml.Unmarshal(pluginBytes, &plugin); err != nil { // TODO: check if strict marshal is required + if err := yaml.UnmarshalStrict(pluginBytes, &plugin); err != nil { return &plugin, err } return &plugin, nil diff --git a/plugin/yaml/plugin_test.go b/plugin/yaml/plugin_test.go index a80b5b2623..90847d5109 100644 --- a/plugin/yaml/plugin_test.go +++ b/plugin/yaml/plugin_test.go @@ -16,6 +16,7 @@ import ( type mockYamlMod struct { Name string Image string + Entrypoint string PluginVersion string PluginType string } @@ -24,6 +25,7 @@ func (p *mockYamlMod) PluginInfo() *plugin.Info { return &plugin.Info{ Name: p.Name, Image: p.Image, + Entrypoint: p.Entrypoint, PluginVersion: p.PluginVersion, PluginType: plugin.Type(p.PluginType), } @@ -52,6 +54,7 @@ func TestYamlPlugin(t *testing.T) { Name: "bq2bqtest", Description: "Testing", Image: "docker.io/odpf/optimus-task-bq2bq-executor:latest", + Entrypoint: "sleep 100", PluginType: "task", PluginMods: []plugin.Mod{"cli"}, PluginVersion: "latest", @@ -170,6 +173,7 @@ func TestYamlPlugin(t *testing.T) { err := repoWithBinayPlugin.AddYaml(&mockYamlMod{ Name: testYamlPluginName, Image: "sdsd", + Entrypoint: "sleep 100", PluginVersion: "asdasd", PluginType: plugin.TypeTask.String(), }) @@ -192,16 +196,28 @@ func TestYamlPlugin(t *testing.T) { repoPlugins := repoWithBinayPlugin.GetAll() assert.Len(t, repoPlugins, 1) }) - t.Run("should not load yaml plugin for invalid paths or yaml", func(t *testing.T) { + t.Run("should not load yaml plugin for invalid paths", func(t *testing.T) { repo := models.NewPluginRepository() - invalidPluginPaths := []string{ - "tests/notpresent.yaml", - "tests/sample_plugin_without_version.yaml", - "tests/sample_plugin_schema_invalid.yaml", - } + invalidPluginPaths := []string{"tests/notpresent.yaml"} err := yaml.Init(repo, invalidPluginPaths, pluginLogger) assert.Error(t, err) assert.Empty(t, repo.GetAll()) }) + t.Run("should not load yaml plugin for invalid yaml", func(t *testing.T) { + t.Run("version not present", func(t *testing.T) { + repo := models.NewPluginRepository() + invalidPluginPaths := []string{"tests/sample_plugin_without_version.yaml"} + err := yaml.Init(repo, invalidPluginPaths, pluginLogger) + assert.Error(t, err) + assert.Empty(t, repo.GetAll()) + }) + t.Run("schema invalid", func(t *testing.T) { + repo := models.NewPluginRepository() + invalidPluginPaths := []string{"tests/sample_plugin_schema_invalid.yaml"} + err := yaml.Init(repo, invalidPluginPaths, pluginLogger) + assert.Error(t, err) + assert.Empty(t, repo.GetAll()) + }) + }) }) } diff --git a/plugin/yaml/tests/sample_plugin.yaml b/plugin/yaml/tests/sample_plugin.yaml index aa18fc9ddc..af79c0e279 100644 --- a/plugin/yaml/tests/sample_plugin.yaml +++ b/plugin/yaml/tests/sample_plugin.yaml @@ -6,7 +6,7 @@ pluginmods: - dependencyresolver pluginversion: latest image: docker.io/odpf/optimus-task-bq2bq-executor:latest -secretpath: /tmp/auth.json +entrypoint: "sleep 100" questions: - name: PROJECT diff --git a/plugin/yaml/tests/sample_plugin_schema_invalid.yaml b/plugin/yaml/tests/sample_plugin_schema_invalid.yaml index d2a1fdecd2..24aa05f137 100644 --- a/plugin/yaml/tests/sample_plugin_schema_invalid.yaml +++ b/plugin/yaml/tests/sample_plugin_schema_invalid.yaml @@ -5,7 +5,6 @@ pluginmods: - cli # pluginversion: "" # image: "" -secretpath: /tmp/auth.json questions: - name: PROJECT diff --git a/plugin/yaml/tests/sample_plugin_without_version.yaml b/plugin/yaml/tests/sample_plugin_without_version.yaml index 83cde51487..5d8f0a784d 100644 --- a/plugin/yaml/tests/sample_plugin_without_version.yaml +++ b/plugin/yaml/tests/sample_plugin_without_version.yaml @@ -5,7 +5,7 @@ pluginmods: - cli pluginversion: "" image: "" -secretpath: /tmp/auth.json +entrypoint: "sleep 100" questions: - name: PROJECT diff --git a/sdk/plugin/mock/plugin.go b/sdk/plugin/mock/plugin.go index 68c4e6887b..52cc9226ba 100644 --- a/sdk/plugin/mock/plugin.go +++ b/sdk/plugin/mock/plugin.go @@ -34,6 +34,7 @@ func (p *MockYamlMod) PluginInfo() *plugin.Info { DependsOn: nil, HookType: "", Image: "gcr.io/bq-plugin:dev", + Entrypoint: "sleep 60", PluginMods: []plugin.Mod{plugin.ModTypeCLI}, } } diff --git a/sdk/plugin/plugin.go b/sdk/plugin/plugin.go index 785ba3fcb5..0848ba3d54 100644 --- a/sdk/plugin/plugin.go +++ b/sdk/plugin/plugin.go @@ -1,6 +1,7 @@ package plugin import ( + "errors" "strings" ) @@ -50,6 +51,9 @@ type Info struct { // Image is the full path to docker container that will be scheduled for execution Image string + // Entrypoint command which will be used to execute the plugin + Entrypoint string + // DependsOn returns list of hooks this should be executed after DependsOn []string `yaml:",omitempty"` @@ -58,6 +62,36 @@ type Info struct { HookType HookType `yaml:",omitempty"` } +func (info *Info) Validate() error { + if info.Name == "" { + return errors.New("plugin name cannot be empty") + } + + // image is a required field + if info.Image == "" { + return errors.New("plugin image cannot be empty") + } + + // version is a required field + if info.PluginVersion == "" { + return errors.New("plugin version cannot be empty") + } + + // entrypoint is a required field + if info.Entrypoint == "" { + return errors.New("entrypoint cannot be empty") + } + + switch info.PluginType { + case TypeTask: + case TypeHook: + default: + return errors.New("plugin type is not supported") + } + + return nil +} + type YamlMod interface { PluginInfo() *Info CommandLineMod diff --git a/sdk/plugin/plugin_test.go b/sdk/plugin/plugin_test.go index 2e54d0fa76..ae8068c2ee 100644 --- a/sdk/plugin/plugin_test.go +++ b/sdk/plugin/plugin_test.go @@ -1,6 +1,7 @@ package plugin_test import ( + "errors" "testing" "github.com/stretchr/testify/assert" @@ -34,6 +35,92 @@ func TestPlugins(t *testing.T) { }) }) + t.Run("Info", func(t *testing.T) { + t.Run("Validate", func(t *testing.T) { + testCases := []struct { + name string + err error + info plugin.Info + }{ + { + name: "when name is empty", + err: errors.New("plugin name cannot be empty"), + info: plugin.Info{ + Name: "", + Image: "odpf.io/example", + PluginVersion: "0.2", + Entrypoint: "sleep 10", + PluginType: plugin.TypeTask, + }, + }, + { + name: "when image is empty", + err: errors.New("plugin image cannot be empty"), + info: plugin.Info{ + Name: "example", + Image: "", + PluginVersion: "0.2", + Entrypoint: "sleep 10", + PluginType: plugin.TypeTask, + }, + }, + { + name: "when plugin version is empty", + err: errors.New("plugin version cannot be empty"), + info: plugin.Info{ + Name: "example", + Image: "odpf.io/example", + PluginVersion: "", + Entrypoint: "sleep 10", + PluginType: plugin.TypeTask, + }, + }, + { + name: "when entrypoint is empty", + err: errors.New("entrypoint cannot be empty"), + info: plugin.Info{ + Name: "example", + Image: "odpf.io/example", + PluginVersion: "0.2", + Entrypoint: "", + PluginType: plugin.TypeTask, + }, + }, + { + name: "when plugin type is not supported", + err: errors.New("plugin type is not supported"), + info: plugin.Info{ + Name: "example", + Image: "odpf.io/example", + PluginVersion: "0.2", + Entrypoint: "sleep 10", + PluginType: "", + }, + }, + { + name: "when valid", + err: nil, + info: plugin.Info{ + Name: "example", + Image: "odpf.io/example", + PluginVersion: "0.2", + Entrypoint: "sleep 10", + PluginType: plugin.TypeTask, + }, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + err := tc.info.Validate() + assert.Equal(t, tc.err, err) + }) + } + }) + }) + t.Run("ValidatorFactory", func(t *testing.T) { validator := plugin.ValidatorFactory.NewFromRegex(`^[a-z0-9_\-]+$`, "invalid string format") assert.Error(t, validator(23)) // input should be only string