Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: configurable entrypoint for kubernetes pod executor #740

Merged
merged 8 commits into from
Feb 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions client/cmd/plugin/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,12 @@ func (v *validateCommand) validateFile(pluginPath string) error {
if filepath.Ext(pluginPath) != ".yaml" {
return errors.New("expecting .yaml file at " + pluginPath)
}
_, err := yaml.NewPluginSpec(pluginPath)
pluginSpec, err := yaml.NewPluginSpec(pluginPath)
if err != nil {
return err
}
return nil

return pluginSpec.Info.Validate()
}

func (v *validateCommand) validateDir(pluginPath string) error {
Expand Down
29 changes: 1 addition & 28 deletions ext/scheduler/airflow/__lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,7 @@ def lookup_non_standard_cron_expression(expr: str) -> str:


class SuperKubernetesPodOperator(KubernetesPodOperator):
def __init__(self,
optimus_hostname,
optimus_projectname,
optimus_namespacename,
optimus_jobname,
optimus_jobtype,
optimus_instancename,
*args,
**kwargs):
def __init__(self, *args, **kwargs):
super(SuperKubernetesPodOperator, self).__init__(*args, **kwargs)
self.do_xcom_push = kwargs.get('do_xcom_push')
self.namespace = kwargs.get('namespace')
Expand All @@ -69,32 +61,13 @@ def __init__(self,
self.reattach_on_restart = kwargs.get('reattach_on_restart')
self.config_file = kwargs.get('config_file')

# used to fetch job env from optimus for adding to k8s pod
self.optimus_hostname = optimus_hostname
self.optimus_namespacename = optimus_namespacename
self.optimus_jobname = optimus_jobname
self.optimus_instancename = optimus_instancename
self.optimus_projectname = optimus_projectname
self.optimus_jobtype = optimus_jobtype
self._optimus_client = OptimusAPIClient(optimus_hostname)

def render_init_containers(self, context):
for ic in self.init_containers:
env = getattr(ic, 'env')
if env:
self.render_template(env, context)

def fetch_env_from_optimus(self, context):
scheduled_at = context["next_execution_date"].strftime(TIMESTAMP_FORMAT)
job_meta = self._optimus_client.get_job_run_input(scheduled_at, self.optimus_projectname, self.optimus_jobname, self.optimus_jobtype, self.optimus_instancename)
return [
k8s.V1EnvVar(name=key, value=val) for key, val in job_meta["envs"].items()
] + [
k8s.V1EnvVar(name=key, value=val) for key, val in job_meta["secrets"].items()
]

def execute(self, context):
self.env_vars += self.fetch_env_from_optimus(context)
self.render_init_containers(context)
# call KubernetesPodOperator to handle the pod
return super().execute(context)
Expand Down
28 changes: 16 additions & 12 deletions ext/scheduler/airflow/dag/compiler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,32 +157,36 @@ func (m mockPluginRepo) GetByName(name string) (*plugin.Plugin, error) {
func setupPluginRepo() mockPluginRepo {
execUnit := new(mock.YamlMod)
execUnit.On("PluginInfo").Return(&plugin.Info{
Name: "bq-bq",
Image: "example.io/namespace/bq2bq-executor:latest",
Name: "bq-bq",
Image: "example.io/namespace/bq2bq-executor:latest",
Entrypoint: "python3 /opt/bumblebee/main.py",
}, nil)

transporterHook := "transporter"
hookUnit := new(mock.YamlMod)
hookUnit.On("PluginInfo").Return(&plugin.Info{
Name: transporterHook,
HookType: plugin.HookTypePre,
Image: "example.io/namespace/transporter-executor:latest",
DependsOn: []string{"predator"},
Name: transporterHook,
HookType: plugin.HookTypePre,
Image: "example.io/namespace/transporter-executor:latest",
Entrypoint: "java -cp /opt/transporter/transporter.jar:/opt/transporter/jolokia-jvm-agent.jar -javaagent:jolokia-jvm-agent.jar=port=7777,host=0.0.0.0 com.gojek.transporter.Main",
DependsOn: []string{"predator"},
}, nil)

predatorHook := "predator"
hookUnit2 := new(mock.YamlMod)
hookUnit2.On("PluginInfo").Return(&plugin.Info{
Name: predatorHook,
HookType: plugin.HookTypePost,
Image: "example.io/namespace/predator-image:latest",
Name: predatorHook,
HookType: plugin.HookTypePost,
Image: "example.io/namespace/predator-image:latest",
Entrypoint: "predator ${SUB_COMMAND} -s ${PREDATOR_URL} -u \"${BQ_PROJECT}.${BQ_DATASET}.${BQ_TABLE}\"",
}, nil)

hookUnit3 := new(mock.YamlMod)
hookUnit3.On("PluginInfo").Return(&plugin.Info{
Name: "failureHook",
HookType: plugin.HookTypeFail,
Image: "example.io/namespace/failure-hook-image:latest",
Name: "failureHook",
HookType: plugin.HookTypeFail,
Image: "example.io/namespace/failure-hook-image:latest",
Entrypoint: "sleep 5",
}, nil)

repo := mockPluginRepo{plugins: []*plugin.Plugin{
Expand Down
25 changes: 11 additions & 14 deletions ext/scheduler/airflow/dag/dag.py.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ IMAGE_PULL_POLICY = "IfNotPresent"
INIT_CONTAINER_IMAGE = "odpf/optimus:{{.Version}}"
INIT_CONTAINER_ENTRYPOINT = "/opt/entrypoint_init_container.sh"

def get_entrypoint_cmd(plugin_entrypoint):
path_config = JOB_DIR + "/in/.env"
path_secret = JOB_DIR + "/in/.secret"
entrypoint = "set -o allexport; source {path_config}; set +o allexport; cat {path_config}; ".format(path_config=path_config)
entrypoint += "set -o allexport; source {path_secret}; set +o allexport; ".format(path_secret=path_secret)
return entrypoint + plugin_entrypoint

volume = k8s.V1Volume(
name='asset-volume',
empty_dir=k8s.V1EmptyDirVolumeSource()
Expand Down Expand Up @@ -140,16 +147,11 @@ init_container = k8s.V1Container(

{{ $transformationName := print "transformation_" .Task.Name | DisplayName -}}
{{$transformationName}} = SuperKubernetesPodOperator(
optimus_hostname="{{$.Hostname}}",
optimus_projectname="{{$.Tenant.ProjectName.String}}",
optimus_namespacename="{{$.Tenant.NamespaceName.String}}",
optimus_jobname="{{.JobDetails.Name}}",
optimus_jobtype="{{$.ExecutorTask}}",
optimus_instancename="{{.Task.Name}}",
image_pull_policy=IMAGE_PULL_POLICY,
namespace=conf.get('kubernetes', 'namespace', fallback="default"),
image={{ .Task.Image | quote}},
cmds=[],
cmds=["/bin/sh"],
arguments=["-c", get_entrypoint_cmd("""{{.Task.Entrypoint}} """)],
name="{{ .Task.Name | replace "_" "-" }}",
deryrahman marked this conversation as resolved.
Show resolved Hide resolved
task_id={{ .Task.Name | quote}},
get_logs=True,
Expand Down Expand Up @@ -188,16 +190,11 @@ init_container_{{$hookName}} = k8s.V1Container(
)

hook_{{$hookName}} = SuperKubernetesPodOperator(
optimus_instancename="{{$hookName}}",
optimus_hostname="{{$.Hostname}}",
optimus_projectname="{{$.Tenant.ProjectName.String}}",
optimus_namespacename="{{$.Tenant.NamespaceName.String}}",
optimus_jobname="{{$.JobDetails.Name}}",
optimus_jobtype="{{$.ExecutorHook}}",
image_pull_policy=IMAGE_PULL_POLICY,
namespace=conf.get('kubernetes', 'namespace', fallback="default"),
image="{{ $t.Image }}",
cmds=[],
cmds=["/bin/sh"],
arguments=["-c", get_entrypoint_cmd("""{{ $t.Entrypoint }} """)],
name="hook_{{ $t.Name | replace "_" "-" }}",
task_id="hook_{{ $t.Name }}",
get_logs=True,
Expand Down
43 changes: 15 additions & 28 deletions ext/scheduler/airflow/dag/expected_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@
INIT_CONTAINER_IMAGE = "odpf/optimus:dev"
INIT_CONTAINER_ENTRYPOINT = "/opt/entrypoint_init_container.sh"

def get_entrypoint_cmd(plugin_entrypoint):
path_config = JOB_DIR + "/in/.env"
path_secret = JOB_DIR + "/in/.secret"
entrypoint = "set -o allexport; source {path_config}; set +o allexport; cat {path_config}; ".format(path_config=path_config)
entrypoint += "set -o allexport; source {path_secret}; set +o allexport; ".format(path_secret=path_secret)
return entrypoint + plugin_entrypoint

volume = k8s.V1Volume(
name='asset-volume',
empty_dir=k8s.V1EmptyDirVolumeSource()
Expand Down Expand Up @@ -109,16 +116,11 @@
)

transformation_bq__dash__bq = SuperKubernetesPodOperator(
optimus_hostname="http://optimus.example.com",
optimus_projectname="example-proj",
optimus_namespacename="billing",
optimus_jobname="infra.billing.weekly-status-reports",
optimus_jobtype="task",
optimus_instancename="bq-bq",
image_pull_policy=IMAGE_PULL_POLICY,
namespace=conf.get('kubernetes', 'namespace', fallback="default"),
image="example.io/namespace/bq2bq-executor:latest",
cmds=[],
cmds=["/bin/sh"],
arguments=["-c", get_entrypoint_cmd("""python3 /opt/bumblebee/main.py """)],
name="bq-bq",
task_id="bq-bq",
get_logs=True,
Expand Down Expand Up @@ -151,16 +153,11 @@
)

hook_transporter = SuperKubernetesPodOperator(
optimus_instancename="transporter",
optimus_hostname="http://optimus.example.com",
optimus_projectname="example-proj",
optimus_namespacename="billing",
optimus_jobname="infra.billing.weekly-status-reports",
optimus_jobtype="hook",
image_pull_policy=IMAGE_PULL_POLICY,
namespace=conf.get('kubernetes', 'namespace', fallback="default"),
image="example.io/namespace/transporter-executor:latest",
cmds=[],
cmds=["/bin/sh"],
arguments=["-c", get_entrypoint_cmd("""java -cp /opt/transporter/transporter.jar:/opt/transporter/jolokia-jvm-agent.jar -javaagent:jolokia-jvm-agent.jar=port=7777,host=0.0.0.0 com.gojek.transporter.Main """)],
name="hook_transporter",
task_id="hook_transporter",
get_logs=True,
Expand Down Expand Up @@ -189,16 +186,11 @@
)

hook_predator = SuperKubernetesPodOperator(
optimus_instancename="predator",
optimus_hostname="http://optimus.example.com",
optimus_projectname="example-proj",
optimus_namespacename="billing",
optimus_jobname="infra.billing.weekly-status-reports",
optimus_jobtype="hook",
image_pull_policy=IMAGE_PULL_POLICY,
namespace=conf.get('kubernetes', 'namespace', fallback="default"),
image="example.io/namespace/predator-image:latest",
cmds=[],
cmds=["/bin/sh"],
arguments=["-c", get_entrypoint_cmd("""predator ${SUB_COMMAND} -s ${PREDATOR_URL} -u "${BQ_PROJECT}.${BQ_DATASET}.${BQ_TABLE}" """)],
name="hook_predator",
task_id="hook_predator",
get_logs=True,
Expand Down Expand Up @@ -227,16 +219,11 @@
)

hook_failureHook = SuperKubernetesPodOperator(
optimus_instancename="failureHook",
optimus_hostname="http://optimus.example.com",
optimus_projectname="example-proj",
optimus_namespacename="billing",
optimus_jobname="infra.billing.weekly-status-reports",
optimus_jobtype="hook",
image_pull_policy=IMAGE_PULL_POLICY,
namespace=conf.get('kubernetes', 'namespace', fallback="default"),
image="example.io/namespace/failure-hook-image:latest",
cmds=[],
cmds=["/bin/sh"],
arguments=["-c", get_entrypoint_cmd("""sleep 5 """)],
name="hook_failureHook",
task_id="hook_failureHook",
get_logs=True,
Expand Down
16 changes: 10 additions & 6 deletions ext/scheduler/airflow/dag/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ type TemplateContext struct {
}

type Task struct {
Name string
Image string
Name string
Image string
Entrypoint string
}

func PrepareTask(job *scheduler.Job, pluginRepo PluginRepo) (Task, error) {
Expand All @@ -44,14 +45,16 @@ func PrepareTask(job *scheduler.Job, pluginRepo PluginRepo) (Task, error) {
info := plugin.Info()

return Task{
Name: info.Name,
Image: info.Image,
Name: info.Name,
Image: info.Image,
Entrypoint: info.Entrypoint,
}, nil
}

type Hook struct {
Name string
Image string
Entrypoint string
IsFailHook bool
}

Expand Down Expand Up @@ -81,8 +84,9 @@ func PrepareHooksForJob(job *scheduler.Job, pluginRepo PluginRepo) (Hooks, error

info := hook.Info()
hk := Hook{
Name: h.Name,
Image: info.Image,
Name: h.Name,
Image: info.Image,
Entrypoint: info.Entrypoint,
}
switch info.HookType {
case plugin.HookTypePre:
Expand Down
27 changes: 1 addition & 26 deletions internal/models/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (s *PluginRepository) GetHooks() []*plugin.Plugin {

func (s *PluginRepository) AddYaml(yamlMod plugin.YamlMod) error {
info := yamlMod.PluginInfo()
if err := validateYamlPluginInfo(info); err != nil {
if err := info.Validate(); err != nil {
return err
}

Expand Down Expand Up @@ -103,31 +103,6 @@ func (s *PluginRepository) AddBinary(drMod plugin.DependencyResolverMod) error {
return nil
}

func validateYamlPluginInfo(info *plugin.Info) error {
if info.Name == "" {
return errors.New("plugin name cannot be empty")
}

// image is a required field
if info.Image == "" {
return errors.New("plugin image cannot be empty")
}

// version is a required field
if info.PluginVersion == "" {
return errors.New("plugin version cannot be empty")
}

switch info.PluginType {
case plugin.TypeTask:
case plugin.TypeHook:
default:
return ErrUnsupportedPlugin
}

return nil
}

func NewPluginRepository() *PluginRepository {
return &PluginRepository{data: map[string]*plugin.Plugin{}}
}
3 changes: 2 additions & 1 deletion plugin/yaml/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func (p *PluginSpec) PluginInfo() *plugin.Info {
Name: p.Name,
Description: p.Description,
Image: p.Image,
Entrypoint: p.Entrypoint,
PluginType: p.PluginType,
PluginMods: []plugin.Mod{plugin.ModTypeCLI},
PluginVersion: p.PluginVersion,
Expand Down Expand Up @@ -100,7 +101,7 @@ func NewPluginSpec(pluginPath string) (*PluginSpec, error) {
return nil, err
}
var plugin PluginSpec
if err := yaml.Unmarshal(pluginBytes, &plugin); err != nil { // TODO: check if strict marshal is required
if err := yaml.UnmarshalStrict(pluginBytes, &plugin); err != nil {
return &plugin, err
}
return &plugin, nil
Expand Down
Loading