Skip to content

Commit

Permalink
feat: add gokart_worker configurations as same as luigi one
Browse files Browse the repository at this point in the history
  • Loading branch information
hiro-o918 committed Nov 6, 2024
1 parent 61fd81b commit 47b0b0b
Showing 1 changed file with 50 additions and 0 deletions.
50 changes: 50 additions & 0 deletions gokart/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,56 @@ def run(self) -> None:
super(ContextManagedTaskProcess, self).run()


class gokart_worker(luigi.Config):
id = luigi.Parameter(default='', description='Override the auto-generated worker_id')
ping_interval = luigi.FloatParameter(default=1.0, config_path=dict(section='core', name='worker-ping-interval'))
keep_alive = luigi.BoolParameter(default=False, config_path=dict(section='core', name='worker-keep-alive'))
count_uniques = luigi.BoolParameter(
default=False,
config_path=dict(section='core', name='worker-count-uniques'),
description='worker-count-uniques means that we will keep a ' 'worker alive only if it has a unique pending task, as ' 'well as having keep-alive true',
)
count_last_scheduled = luigi.BoolParameter(
default=False, description='Keep a worker alive only if there are ' 'pending tasks which it was the last to ' 'schedule.'
)
wait_interval = luigi.FloatParameter(default=1.0, config_path=dict(section='core', name='worker-wait-interval'))
wait_jitter = luigi.FloatParameter(default=5.0)

max_keep_alive_idle_duration = luigi.TimeDeltaParameter(default=datetime.timedelta(0))

max_reschedules = luigi.IntParameter(default=1, config_path=dict(section='core', name='worker-max-reschedules'))
timeout = luigi.IntParameter(default=0, config_path=dict(section='core', name='worker-timeout'))
task_limit = luigi.IntParameter(default=None, config_path=dict(section='core', name='worker-task-limit'))
retry_external_tasks = luigi.BoolParameter(
default=False,
config_path=dict(section='core', name='retry-external-tasks'),
description='If true, incomplete external tasks will be ' 'retested for completion while Luigi is running.',
)
send_failure_email = luigi.BoolParameter(default=True, description='If true, send e-mails directly from the worker' 'on failure')
no_install_shutdown_handler = luigi.BoolParameter(default=False, description='If true, the SIGUSR1 shutdown handler will' 'NOT be install on the worker')
check_unfulfilled_deps = luigi.BoolParameter(default=True, description='If true, check for completeness of ' 'dependencies before running a task')
check_complete_on_run = luigi.BoolParameter(
default=False,
description='If true, only mark tasks as done after running if they are complete. '
'Regardless of this setting, the worker will always check if external '
'tasks are complete before marking them as done.',
)
force_multiprocessing = luigi.BoolParameter(default=False, description='If true, use multiprocessing also when ' 'running with 1 worker')
task_process_context = luigi.OptionalParameter(
default=None,
description='If set to a fully qualified class name, the class will '
'be instantiated with a TaskProcess as its constructor parameter and '
'applied as a context manager around its run() call, so this can be '
'used for obtaining high level customizable monitoring or logging of '
'each individual Task run.',
)
cache_task_completion = luigi.BoolParameter(
default=False,
description='If true, cache the response of successful completion checks '
'of tasks assigned to a worker. This can especially speed up tasks with '
'dynamic dependencies but assumes that the completion status does not change '
'after it was true the first time.',
)
class Worker:
"""
Worker object communicates with a scheduler.
Expand Down

0 comments on commit 47b0b0b

Please sign in to comment.