From 6a4cc41a9068b72c613145572de58e233e510e7d Mon Sep 17 00:00:00 2001 From: ThisIsClark Date: Mon, 6 Sep 2021 22:06:49 +0800 Subject: [PATCH] Remove failed task distributor --- .../distributor/failed_task_distributor.py | 67 ------------------- .../distributor/task_distributor.py | 1 - .../scheduler/schedule_manager.py | 44 ++---------- .../performance_collection_handler.py | 7 +- .../test_failed_task_distributor.py | 62 ----------------- .../test_performance_collection_handler.py | 8 ++- 6 files changed, 18 insertions(+), 171 deletions(-) delete mode 100644 delfin/leader_election/distributor/failed_task_distributor.py delete mode 100644 delfin/tests/unit/leader_election/distributor/test_failed_task_distributor.py diff --git a/delfin/leader_election/distributor/failed_task_distributor.py b/delfin/leader_election/distributor/failed_task_distributor.py deleted file mode 100644 index 4b83720e0..000000000 --- a/delfin/leader_election/distributor/failed_task_distributor.py +++ /dev/null @@ -1,67 +0,0 @@ -# Copyright 2021 The SODA Authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import six -from oslo_config import cfg -from oslo_log import log - -from delfin import db -from delfin.common.constants import TelemetryCollection -from delfin.task_manager import metrics_rpcapi as task_rpcapi - -CONF = cfg.CONF -LOG = log.getLogger(__name__) - - -class FailedTaskDistributor(object): - def __init__(self, ctx): - # create the object of periodic scheduler - self.task_rpcapi = task_rpcapi.TaskAPI() - self.ctx = ctx - - def __call__(self): - - try: - # Remove jobs from scheduler when marked for delete - filters = {'deleted': True} - failed_tasks = db.failed_task_get_all(self.ctx, filters=filters) - LOG.debug("Total failed_tasks found deleted " - "in this cycle:%s" % len(failed_tasks)) - for failed_task in failed_tasks: - self.task_rpcapi.remove_failed_job(self.ctx, failed_task['id'], - failed_task['executor']) - except Exception as e: - LOG.error("Failed to remove periodic scheduling job , reason: %s.", - six.text_type(e)) - try: - failed_tasks = db.failed_task_get_all(self.ctx) - for failed_task in failed_tasks: - # Todo Get executor for the job - LOG.debug('Assigning failed task for for id: ' - '%s' % failed_task['id']) - self.task_rpcapi.assign_failed_job(self.ctx, failed_task['id'], - failed_task['executor']) - - LOG.info('Assigned failed task for id: ' - '%s ' % failed_task['id']) - except Exception as e: - LOG.error("Failed to schedule retry tasks for performance " - "collection, reason: %s", six.text_type(e)) - else: - LOG.info("Schedule collection completed") - - @classmethod - def job_interval(cls): - return TelemetryCollection.FAILED_JOB_SCHEDULE_INTERVAL diff --git a/delfin/leader_election/distributor/task_distributor.py b/delfin/leader_election/distributor/task_distributor.py index 7ce43192d..aae7e8784 100644 --- a/delfin/leader_election/distributor/task_distributor.py +++ b/delfin/leader_election/distributor/task_distributor.py @@ -35,7 +35,6 @@ def distribute_new_job(self, task_id): partitioner = ConsistentHashing() partitioner.start() executor = partitioner.get_task_executor(task_id) - partitioner.stop() try: db.task_update(self.ctx, task_id, {'executor': executor}) LOG.info('Distribute a new job, id: %s' % task_id) diff --git a/delfin/task_manager/scheduler/schedule_manager.py b/delfin/task_manager/scheduler/schedule_manager.py index 766398c65..7f6c0534a 100644 --- a/delfin/task_manager/scheduler/schedule_manager.py +++ b/delfin/task_manager/scheduler/schedule_manager.py @@ -17,7 +17,6 @@ import six from apscheduler.schedulers.background import BackgroundScheduler from oslo_log import log -from oslo_utils import importutils from oslo_utils import uuidutils from delfin import context @@ -25,18 +24,12 @@ from delfin import service from delfin import utils from delfin.coordination import ConsistentHashing -from delfin.leader_election.distributor.failed_task_distributor \ - import FailedTaskDistributor from delfin.leader_election.distributor.task_distributor \ import TaskDistributor from delfin.task_manager import metrics_rpcapi as task_rpcapi LOG = log.getLogger(__name__) -SCHEDULER_BOOT_JOBS = [ - FailedTaskDistributor.__module__ + '.' + FailedTaskDistributor.__name__ -] - @six.add_metaclass(utils.Singleton) class SchedulerManager(object): @@ -48,11 +41,9 @@ def __init__(self, scheduler=None): scheduler = BackgroundScheduler() self.scheduler = scheduler self.scheduler_started = False - - self.boot_jobs = dict() - self.boot_jobs_scheduled = False self.ctx = context.get_admin_context() self.task_rpcapi = task_rpcapi.TaskAPI() + self.watch_job_id = None def start(self): """ Initialise the schedulers for periodic job creation @@ -96,27 +87,6 @@ def on_node_leave(self, event): distributor.distribute_new_job(task['id']) def schedule_boot_jobs(self): - if not self.boot_jobs_scheduled: - try: - for job in SCHEDULER_BOOT_JOBS: - job_class = importutils.import_class(job) - job_instance = job_class(self.ctx) - - # Create a jobs for periodic scheduling - job_id = uuidutils.generate_uuid() - self.scheduler.add_job(job_instance, 'interval', - seconds=job_class.job_interval(), - next_run_time=datetime.now(), - id=job_id) - # book keeping of jobs - self.boot_jobs[job_id] = job_instance - - except Exception as e: - # TODO: Currently failure of scheduler is failing task manager - # start flow, it is logged and ignored. - LOG.error("Failed to initialize periodic tasks, reason: %s.", - six.text_type(e)) - raise e # Recover the job in db self.recover_job() # Start the consumer of job creation message @@ -134,18 +104,16 @@ def schedule_boot_jobs(self): partitioner.start() partitioner.register_watcher_func(self.on_node_join, self.on_node_leave) + self.watch_job_id = uuidutils.generate_uuid() self.scheduler.add_job(partitioner.watch_group_change, 'interval', seconds=self.GROUP_CHANGE_DETECT_INTERVAL_SEC, - next_run_time=datetime.now()) + next_run_time=datetime.now(), + id=self.watch_job_id) def stop(self): """Cleanup periodic jobs""" - - for job_id, job in self.boot_jobs.items(): - self.scheduler.remove_job(job_id) - job.stop() - self.boot_jobs.clear() - self.boot_jobs_scheduled = False + if self.watch_job_id: + self.scheduler.remove_job(self.watch_job_id) def get_scheduler(self): return self.scheduler diff --git a/delfin/task_manager/scheduler/schedulers/telemetry/performance_collection_handler.py b/delfin/task_manager/scheduler/schedulers/telemetry/performance_collection_handler.py index f6b05890a..4a98b14ba 100644 --- a/delfin/task_manager/scheduler/schedulers/telemetry/performance_collection_handler.py +++ b/delfin/task_manager/scheduler/schedulers/telemetry/performance_collection_handler.py @@ -24,7 +24,7 @@ from delfin import exception from delfin.common.constants import TelemetryCollection from delfin.db.sqlalchemy.models import FailedTask -from delfin.task_manager import rpcapi as task_rpcapi +from delfin.task_manager import metrics_rpcapi as metrics_task_rpcapi from delfin.task_manager.scheduler import schedule_manager from delfin.task_manager.tasks.telemetry import PerformanceCollectionTask @@ -38,7 +38,7 @@ def __init__(self, ctx, task_id, storage_id, args, interval, executor): self.storage_id = storage_id self.args = args self.interval = interval - self.task_rpcapi = task_rpcapi.TaskAPI() + self.metric_task_rpcapi = metrics_task_rpcapi.TaskAPI() self.executor = executor self.scheduler = schedule_manager.SchedulerManager().get_scheduler() @@ -108,3 +108,6 @@ def _handle_task_failure(self, start_time, end_time): FailedTask.retry_count.name: 0, FailedTask.executor.name: self.executor} db.failed_task_create(self.ctx, failed_task) + self.metric_task_rpcapi.assign_failed_job(self.ctx, + failed_task['task_id'], + failed_task['executor']) diff --git a/delfin/tests/unit/leader_election/distributor/test_failed_task_distributor.py b/delfin/tests/unit/leader_election/distributor/test_failed_task_distributor.py deleted file mode 100644 index 50fd9b3ce..000000000 --- a/delfin/tests/unit/leader_election/distributor/test_failed_task_distributor.py +++ /dev/null @@ -1,62 +0,0 @@ -# Copyright 2021 The SODA Authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from unittest import mock -from datetime import datetime - - -from oslo_utils import uuidutils - -from delfin import context -from delfin import db -from delfin import test -from delfin.db.sqlalchemy.models import FailedTask -from delfin.leader_election.distributor.failed_task_distributor import \ - FailedTaskDistributor - -fake_executor = 'node1' -fake_failed_job = { - FailedTask.id.name: 43, - FailedTask.retry_count.name: 0, - FailedTask.result.name: "Init", - FailedTask.job_id.name: "fake_job_id", - FailedTask.task_id.name: uuidutils.generate_uuid(), - FailedTask.start_time.name: int(datetime.now().timestamp()), - FailedTask.end_time.name: int(datetime.now().timestamp()) + 20, - FailedTask.interval.name: 20, - FailedTask.deleted.name: False, - FailedTask.executor.name: fake_executor, -} - -fake_failed_jobs = [ - fake_failed_job, -] - - -class TestFailedTaskDistributor(test.TestCase): - - @mock.patch.object(db, 'failed_task_get_all', - mock.Mock(return_value=fake_failed_jobs)) - @mock.patch.object(db, 'failed_task_update', - mock.Mock(return_value=fake_failed_job)) - @mock.patch.object(db, 'failed_task_get', - mock.Mock(return_value=fake_failed_job)) - @mock.patch( - 'delfin.task_manager.metrics_rpcapi.TaskAPI.assign_failed_job') - def test_telemetry_failed_job_scheduling(self, mock_assign_job): - ctx = context.get_admin_context() - task_distributor = FailedTaskDistributor(ctx) - # call telemetry job scheduling - task_distributor() - self.assertEqual(mock_assign_job.call_count, 1) diff --git a/delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_performance_collection_handler.py b/delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_performance_collection_handler.py index 4fd6e3e0b..70ee576e2 100644 --- a/delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_performance_collection_handler.py +++ b/delfin/tests/unit/task_manager/scheduler/schedulers/telemetry/test_performance_collection_handler.py @@ -75,13 +75,17 @@ def test_performance_collection_success(self, mock_collect_telemetry, self.assertEqual(mock_collect_telemetry.call_count, 1) self.assertEqual(mock_task_update.call_count, 1) + @mock.patch('delfin.db.task_update') + @mock.patch('delfin.task_manager.metrics_rpcapi.TaskAPI.assign_failed_job') @mock.patch.object(db, 'task_get', mock.Mock(return_value=fake_telemetry_job)) @mock.patch('delfin.db.failed_task_create') @mock.patch('delfin.task_manager.tasks.telemetry' '.PerformanceCollectionTask.collect') def test_performance_collection_failure(self, mock_collect_telemetry, - mock_failed_task_create): + mock_failed_task_create, + mock_assign_failed_job, + mock_task_update): mock_collect_telemetry.return_value = TelemetryTaskStatus. \ TASK_EXEC_STATUS_FAILURE ctx = context.get_admin_context() @@ -92,6 +96,8 @@ def test_performance_collection_failure(self, mock_collect_telemetry, # Verify that failed task create is called if collect telemetry fails self.assertEqual(mock_failed_task_create.call_count, 1) + self.assertEqual(mock_assign_failed_job.call_count, 1) + self.assertEqual(mock_task_update.call_count, 1) @mock.patch.object(db, 'task_get', mock.Mock(return_value=fake_deleted_telemetry_job))