From 47b0b0b7f8fcfe72d611791ba0d7a350603ac296 Mon Sep 17 00:00:00 2001 From: Hironori Yamamoto Date: Thu, 7 Nov 2024 01:30:44 +0900 Subject: [PATCH] feat: add gokart_worker configurations as same as luigi one --- gokart/worker.py | 50 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/gokart/worker.py b/gokart/worker.py index 867bd24b..206ebe81 100644 --- a/gokart/worker.py +++ b/gokart/worker.py @@ -308,6 +308,56 @@ def run(self) -> None: super(ContextManagedTaskProcess, self).run() +class gokart_worker(luigi.Config): + id = luigi.Parameter(default='', description='Override the auto-generated worker_id') + ping_interval = luigi.FloatParameter(default=1.0, config_path=dict(section='core', name='worker-ping-interval')) + keep_alive = luigi.BoolParameter(default=False, config_path=dict(section='core', name='worker-keep-alive')) + count_uniques = luigi.BoolParameter( + default=False, + config_path=dict(section='core', name='worker-count-uniques'), + description='worker-count-uniques means that we will keep a ' 'worker alive only if it has a unique pending task, as ' 'well as having keep-alive true', + ) + count_last_scheduled = luigi.BoolParameter( + default=False, description='Keep a worker alive only if there are ' 'pending tasks which it was the last to ' 'schedule.' + ) + wait_interval = luigi.FloatParameter(default=1.0, config_path=dict(section='core', name='worker-wait-interval')) + wait_jitter = luigi.FloatParameter(default=5.0) + + max_keep_alive_idle_duration = luigi.TimeDeltaParameter(default=datetime.timedelta(0)) + + max_reschedules = luigi.IntParameter(default=1, config_path=dict(section='core', name='worker-max-reschedules')) + timeout = luigi.IntParameter(default=0, config_path=dict(section='core', name='worker-timeout')) + task_limit = luigi.IntParameter(default=None, config_path=dict(section='core', name='worker-task-limit')) + retry_external_tasks = luigi.BoolParameter( + default=False, + config_path=dict(section='core', name='retry-external-tasks'), + description='If true, incomplete external tasks will be ' 'retested for completion while Luigi is running.', + ) + send_failure_email = luigi.BoolParameter(default=True, description='If true, send e-mails directly from the worker' 'on failure') + no_install_shutdown_handler = luigi.BoolParameter(default=False, description='If true, the SIGUSR1 shutdown handler will' 'NOT be install on the worker') + check_unfulfilled_deps = luigi.BoolParameter(default=True, description='If true, check for completeness of ' 'dependencies before running a task') + check_complete_on_run = luigi.BoolParameter( + default=False, + description='If true, only mark tasks as done after running if they are complete. ' + 'Regardless of this setting, the worker will always check if external ' + 'tasks are complete before marking them as done.', + ) + force_multiprocessing = luigi.BoolParameter(default=False, description='If true, use multiprocessing also when ' 'running with 1 worker') + task_process_context = luigi.OptionalParameter( + default=None, + description='If set to a fully qualified class name, the class will ' + 'be instantiated with a TaskProcess as its constructor parameter and ' + 'applied as a context manager around its run() call, so this can be ' + 'used for obtaining high level customizable monitoring or logging of ' + 'each individual Task run.', + ) + cache_task_completion = luigi.BoolParameter( + default=False, + description='If true, cache the response of successful completion checks ' + 'of tasks assigned to a worker. This can especially speed up tasks with ' + 'dynamic dependencies but assumes that the completion status does not change ' + 'after it was true the first time.', + ) class Worker: """ Worker object communicates with a scheduler.