Skip to content

Commit

Permalink
feat: configurable endpoint for kubernetes pod executor (#740)
Browse files Browse the repository at this point in the history
* feat: add entrypoint feature on plugins

* remove: fetch env and secret from api

* feat: return info with entrypoint

* remove: unecessary param on dag template

* test: fix test

* refactor: use get entrypoint func

* feat: make entrypoint as required

* feat: validate plugin yaml info
  • Loading branch information
deryrahman authored and sravankorumilli committed Feb 27, 2023
1 parent ad3c691 commit c42e0c9
Show file tree
Hide file tree
Showing 15 changed files with 205 additions and 126 deletions.
5 changes: 3 additions & 2 deletions client/cmd/plugin/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,12 @@ func (v *validateCommand) validateFile(pluginPath string) error {
if filepath.Ext(pluginPath) != ".yaml" {
return errors.New("expecting .yaml file at " + pluginPath)
}
_, err := yaml.NewPluginSpec(pluginPath)
pluginSpec, err := yaml.NewPluginSpec(pluginPath)
if err != nil {
return err
}
return nil

return pluginSpec.Info.Validate()
}

func (v *validateCommand) validateDir(pluginPath string) error {
Expand Down
29 changes: 1 addition & 28 deletions ext/scheduler/airflow/__lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,7 @@ def lookup_non_standard_cron_expression(expr: str) -> str:


class SuperKubernetesPodOperator(KubernetesPodOperator):
def __init__(self,
optimus_hostname,
optimus_projectname,
optimus_namespacename,
optimus_jobname,
optimus_jobtype,
optimus_instancename,
*args,
**kwargs):
def __init__(self, *args, **kwargs):
super(SuperKubernetesPodOperator, self).__init__(*args, **kwargs)
self.do_xcom_push = kwargs.get('do_xcom_push')
self.namespace = kwargs.get('namespace')
Expand All @@ -69,32 +61,13 @@ def __init__(self,
self.reattach_on_restart = kwargs.get('reattach_on_restart')
self.config_file = kwargs.get('config_file')

# used to fetch job env from optimus for adding to k8s pod
self.optimus_hostname = optimus_hostname
self.optimus_namespacename = optimus_namespacename
self.optimus_jobname = optimus_jobname
self.optimus_instancename = optimus_instancename
self.optimus_projectname = optimus_projectname
self.optimus_jobtype = optimus_jobtype
self._optimus_client = OptimusAPIClient(optimus_hostname)

def render_init_containers(self, context):
for ic in self.init_containers:
env = getattr(ic, 'env')
if env:
self.render_template(env, context)

def fetch_env_from_optimus(self, context):
scheduled_at = context["next_execution_date"].strftime(TIMESTAMP_FORMAT)
job_meta = self._optimus_client.get_job_run_input(scheduled_at, self.optimus_projectname, self.optimus_jobname, self.optimus_jobtype, self.optimus_instancename)
return [
k8s.V1EnvVar(name=key, value=val) for key, val in job_meta["envs"].items()
] + [
k8s.V1EnvVar(name=key, value=val) for key, val in job_meta["secrets"].items()
]

def execute(self, context):
self.env_vars += self.fetch_env_from_optimus(context)
self.render_init_containers(context)
# call KubernetesPodOperator to handle the pod
return super().execute(context)
Expand Down
28 changes: 16 additions & 12 deletions ext/scheduler/airflow/dag/compiler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,32 +157,36 @@ func (m mockPluginRepo) GetByName(name string) (*plugin.Plugin, error) {
func setupPluginRepo() mockPluginRepo {
execUnit := new(mock.YamlMod)
execUnit.On("PluginInfo").Return(&plugin.Info{
Name: "bq-bq",
Image: "example.io/namespace/bq2bq-executor:latest",
Name: "bq-bq",
Image: "example.io/namespace/bq2bq-executor:latest",
Entrypoint: "python3 /opt/bumblebee/main.py",
}, nil)

transporterHook := "transporter"
hookUnit := new(mock.YamlMod)
hookUnit.On("PluginInfo").Return(&plugin.Info{
Name: transporterHook,
HookType: plugin.HookTypePre,
Image: "example.io/namespace/transporter-executor:latest",
DependsOn: []string{"predator"},
Name: transporterHook,
HookType: plugin.HookTypePre,
Image: "example.io/namespace/transporter-executor:latest",
Entrypoint: "java -cp /opt/transporter/transporter.jar:/opt/transporter/jolokia-jvm-agent.jar -javaagent:jolokia-jvm-agent.jar=port=7777,host=0.0.0.0 com.gojek.transporter.Main",
DependsOn: []string{"predator"},
}, nil)

predatorHook := "predator"
hookUnit2 := new(mock.YamlMod)
hookUnit2.On("PluginInfo").Return(&plugin.Info{
Name: predatorHook,
HookType: plugin.HookTypePost,
Image: "example.io/namespace/predator-image:latest",
Name: predatorHook,
HookType: plugin.HookTypePost,
Image: "example.io/namespace/predator-image:latest",
Entrypoint: "predator ${SUB_COMMAND} -s ${PREDATOR_URL} -u \"${BQ_PROJECT}.${BQ_DATASET}.${BQ_TABLE}\"",
}, nil)

hookUnit3 := new(mock.YamlMod)
hookUnit3.On("PluginInfo").Return(&plugin.Info{
Name: "failureHook",
HookType: plugin.HookTypeFail,
Image: "example.io/namespace/failure-hook-image:latest",
Name: "failureHook",
HookType: plugin.HookTypeFail,
Image: "example.io/namespace/failure-hook-image:latest",
Entrypoint: "sleep 5",
}, nil)

repo := mockPluginRepo{plugins: []*plugin.Plugin{
Expand Down
25 changes: 11 additions & 14 deletions ext/scheduler/airflow/dag/dag.py.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ IMAGE_PULL_POLICY = "IfNotPresent"
INIT_CONTAINER_IMAGE = "odpf/optimus:{{.Version}}"
INIT_CONTAINER_ENTRYPOINT = "/opt/entrypoint_init_container.sh"

def get_entrypoint_cmd(plugin_entrypoint):
path_config = JOB_DIR + "/in/.env"
path_secret = JOB_DIR + "/in/.secret"
entrypoint = "set -o allexport; source {path_config}; set +o allexport; cat {path_config}; ".format(path_config=path_config)
entrypoint += "set -o allexport; source {path_secret}; set +o allexport; ".format(path_secret=path_secret)
return entrypoint + plugin_entrypoint

volume = k8s.V1Volume(
name='asset-volume',
empty_dir=k8s.V1EmptyDirVolumeSource()
Expand Down Expand Up @@ -140,16 +147,11 @@ init_container = k8s.V1Container(

{{ $transformationName := print "transformation_" .Task.Name | DisplayName -}}
{{$transformationName}} = SuperKubernetesPodOperator(
optimus_hostname="{{$.Hostname}}",
optimus_projectname="{{$.Tenant.ProjectName.String}}",
optimus_namespacename="{{$.Tenant.NamespaceName.String}}",
optimus_jobname="{{.JobDetails.Name}}",
optimus_jobtype="{{$.ExecutorTask}}",
optimus_instancename="{{.Task.Name}}",
image_pull_policy=IMAGE_PULL_POLICY,
namespace=conf.get('kubernetes', 'namespace', fallback="default"),
image={{ .Task.Image | quote}},
cmds=[],
cmds=["/bin/sh"],
arguments=["-c", get_entrypoint_cmd("""{{.Task.Entrypoint}} """)],
name="{{ .Task.Name | replace "_" "-" }}",
task_id={{ .Task.Name | quote}},
get_logs=True,
Expand Down Expand Up @@ -188,16 +190,11 @@ init_container_{{$hookName}} = k8s.V1Container(
)

hook_{{$hookName}} = SuperKubernetesPodOperator(
optimus_instancename="{{$hookName}}",
optimus_hostname="{{$.Hostname}}",
optimus_projectname="{{$.Tenant.ProjectName.String}}",
optimus_namespacename="{{$.Tenant.NamespaceName.String}}",
optimus_jobname="{{$.JobDetails.Name}}",
optimus_jobtype="{{$.ExecutorHook}}",
image_pull_policy=IMAGE_PULL_POLICY,
namespace=conf.get('kubernetes', 'namespace', fallback="default"),
image="{{ $t.Image }}",
cmds=[],
cmds=["/bin/sh"],
arguments=["-c", get_entrypoint_cmd("""{{ $t.Entrypoint }} """)],
name="hook_{{ $t.Name | replace "_" "-" }}",
task_id="hook_{{ $t.Name }}",
get_logs=True,
Expand Down
43 changes: 15 additions & 28 deletions ext/scheduler/airflow/dag/expected_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@
INIT_CONTAINER_IMAGE = "odpf/optimus:dev"
INIT_CONTAINER_ENTRYPOINT = "/opt/entrypoint_init_container.sh"

def get_entrypoint_cmd(plugin_entrypoint):
path_config = JOB_DIR + "/in/.env"
path_secret = JOB_DIR + "/in/.secret"
entrypoint = "set -o allexport; source {path_config}; set +o allexport; cat {path_config}; ".format(path_config=path_config)
entrypoint += "set -o allexport; source {path_secret}; set +o allexport; ".format(path_secret=path_secret)
return entrypoint + plugin_entrypoint

volume = k8s.V1Volume(
name='asset-volume',
empty_dir=k8s.V1EmptyDirVolumeSource()
Expand Down Expand Up @@ -109,16 +116,11 @@
)

transformation_bq__dash__bq = SuperKubernetesPodOperator(
optimus_hostname="http://optimus.example.com",
optimus_projectname="example-proj",
optimus_namespacename="billing",
optimus_jobname="infra.billing.weekly-status-reports",
optimus_jobtype="task",
optimus_instancename="bq-bq",
image_pull_policy=IMAGE_PULL_POLICY,
namespace=conf.get('kubernetes', 'namespace', fallback="default"),
image="example.io/namespace/bq2bq-executor:latest",
cmds=[],
cmds=["/bin/sh"],
arguments=["-c", get_entrypoint_cmd("""python3 /opt/bumblebee/main.py """)],
name="bq-bq",
task_id="bq-bq",
get_logs=True,
Expand Down Expand Up @@ -151,16 +153,11 @@
)

hook_transporter = SuperKubernetesPodOperator(
optimus_instancename="transporter",
optimus_hostname="http://optimus.example.com",
optimus_projectname="example-proj",
optimus_namespacename="billing",
optimus_jobname="infra.billing.weekly-status-reports",
optimus_jobtype="hook",
image_pull_policy=IMAGE_PULL_POLICY,
namespace=conf.get('kubernetes', 'namespace', fallback="default"),
image="example.io/namespace/transporter-executor:latest",
cmds=[],
cmds=["/bin/sh"],
arguments=["-c", get_entrypoint_cmd("""java -cp /opt/transporter/transporter.jar:/opt/transporter/jolokia-jvm-agent.jar -javaagent:jolokia-jvm-agent.jar=port=7777,host=0.0.0.0 com.gojek.transporter.Main """)],
name="hook_transporter",
task_id="hook_transporter",
get_logs=True,
Expand Down Expand Up @@ -189,16 +186,11 @@
)

hook_predator = SuperKubernetesPodOperator(
optimus_instancename="predator",
optimus_hostname="http://optimus.example.com",
optimus_projectname="example-proj",
optimus_namespacename="billing",
optimus_jobname="infra.billing.weekly-status-reports",
optimus_jobtype="hook",
image_pull_policy=IMAGE_PULL_POLICY,
namespace=conf.get('kubernetes', 'namespace', fallback="default"),
image="example.io/namespace/predator-image:latest",
cmds=[],
cmds=["/bin/sh"],
arguments=["-c", get_entrypoint_cmd("""predator ${SUB_COMMAND} -s ${PREDATOR_URL} -u "${BQ_PROJECT}.${BQ_DATASET}.${BQ_TABLE}" """)],
name="hook_predator",
task_id="hook_predator",
get_logs=True,
Expand Down Expand Up @@ -227,16 +219,11 @@
)

hook_failureHook = SuperKubernetesPodOperator(
optimus_instancename="failureHook",
optimus_hostname="http://optimus.example.com",
optimus_projectname="example-proj",
optimus_namespacename="billing",
optimus_jobname="infra.billing.weekly-status-reports",
optimus_jobtype="hook",
image_pull_policy=IMAGE_PULL_POLICY,
namespace=conf.get('kubernetes', 'namespace', fallback="default"),
image="example.io/namespace/failure-hook-image:latest",
cmds=[],
cmds=["/bin/sh"],
arguments=["-c", get_entrypoint_cmd("""sleep 5 """)],
name="hook_failureHook",
task_id="hook_failureHook",
get_logs=True,
Expand Down
16 changes: 10 additions & 6 deletions ext/scheduler/airflow/dag/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ type TemplateContext struct {
}

type Task struct {
Name string
Image string
Name string
Image string
Entrypoint string
}

func PrepareTask(job *scheduler.Job, pluginRepo PluginRepo) (Task, error) {
Expand All @@ -44,14 +45,16 @@ func PrepareTask(job *scheduler.Job, pluginRepo PluginRepo) (Task, error) {
info := plugin.Info()

return Task{
Name: info.Name,
Image: info.Image,
Name: info.Name,
Image: info.Image,
Entrypoint: info.Entrypoint,
}, nil
}

type Hook struct {
Name string
Image string
Entrypoint string
IsFailHook bool
}

Expand Down Expand Up @@ -81,8 +84,9 @@ func PrepareHooksForJob(job *scheduler.Job, pluginRepo PluginRepo) (Hooks, error

info := hook.Info()
hk := Hook{
Name: h.Name,
Image: info.Image,
Name: h.Name,
Image: info.Image,
Entrypoint: info.Entrypoint,
}
switch info.HookType {
case plugin.HookTypePre:
Expand Down
27 changes: 1 addition & 26 deletions internal/models/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (s *PluginRepository) GetHooks() []*plugin.Plugin {

func (s *PluginRepository) AddYaml(yamlMod plugin.YamlMod) error {
info := yamlMod.PluginInfo()
if err := validateYamlPluginInfo(info); err != nil {
if err := info.Validate(); err != nil {
return err
}

Expand Down Expand Up @@ -103,31 +103,6 @@ func (s *PluginRepository) AddBinary(drMod plugin.DependencyResolverMod) error {
return nil
}

func validateYamlPluginInfo(info *plugin.Info) error {
if info.Name == "" {
return errors.New("plugin name cannot be empty")
}

// image is a required field
if info.Image == "" {
return errors.New("plugin image cannot be empty")
}

// version is a required field
if info.PluginVersion == "" {
return errors.New("plugin version cannot be empty")
}

switch info.PluginType {
case plugin.TypeTask:
case plugin.TypeHook:
default:
return ErrUnsupportedPlugin
}

return nil
}

func NewPluginRepository() *PluginRepository {
return &PluginRepository{data: map[string]*plugin.Plugin{}}
}
3 changes: 2 additions & 1 deletion plugin/yaml/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func (p *PluginSpec) PluginInfo() *plugin.Info {
Name: p.Name,
Description: p.Description,
Image: p.Image,
Entrypoint: p.Entrypoint,
PluginType: p.PluginType,
PluginMods: []plugin.Mod{plugin.ModTypeCLI},
PluginVersion: p.PluginVersion,
Expand Down Expand Up @@ -100,7 +101,7 @@ func NewPluginSpec(pluginPath string) (*PluginSpec, error) {
return nil, err
}
var plugin PluginSpec
if err := yaml.Unmarshal(pluginBytes, &plugin); err != nil { // TODO: check if strict marshal is required
if err := yaml.UnmarshalStrict(pluginBytes, &plugin); err != nil {
return &plugin, err
}
return &plugin, nil
Expand Down
Loading

0 comments on commit c42e0c9

Please sign in to comment.