Skip to content

Commit

Permalink
Remove failed task distributor
Browse files Browse the repository at this point in the history
  • Loading branch information
ThisIsClark committed Sep 7, 2021
1 parent cd62e0b commit 6a4cc41
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 171 deletions.
67 changes: 0 additions & 67 deletions delfin/leader_election/distributor/failed_task_distributor.py

This file was deleted.

1 change: 0 additions & 1 deletion delfin/leader_election/distributor/task_distributor.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ def distribute_new_job(self, task_id):
partitioner = ConsistentHashing()
partitioner.start()
executor = partitioner.get_task_executor(task_id)
partitioner.stop()
try:
db.task_update(self.ctx, task_id, {'executor': executor})
LOG.info('Distribute a new job, id: %s' % task_id)
Expand Down
44 changes: 6 additions & 38 deletions delfin/task_manager/scheduler/schedule_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,19 @@
import six
from apscheduler.schedulers.background import BackgroundScheduler
from oslo_log import log
from oslo_utils import importutils
from oslo_utils import uuidutils

from delfin import context
from delfin import db
from delfin import service
from delfin import utils
from delfin.coordination import ConsistentHashing
from delfin.leader_election.distributor.failed_task_distributor \
import FailedTaskDistributor
from delfin.leader_election.distributor.task_distributor \
import TaskDistributor
from delfin.task_manager import metrics_rpcapi as task_rpcapi

LOG = log.getLogger(__name__)

SCHEDULER_BOOT_JOBS = [
FailedTaskDistributor.__module__ + '.' + FailedTaskDistributor.__name__
]


@six.add_metaclass(utils.Singleton)
class SchedulerManager(object):
Expand All @@ -48,11 +41,9 @@ def __init__(self, scheduler=None):
scheduler = BackgroundScheduler()
self.scheduler = scheduler
self.scheduler_started = False

self.boot_jobs = dict()
self.boot_jobs_scheduled = False
self.ctx = context.get_admin_context()
self.task_rpcapi = task_rpcapi.TaskAPI()
self.watch_job_id = None

def start(self):
""" Initialise the schedulers for periodic job creation
Expand Down Expand Up @@ -96,27 +87,6 @@ def on_node_leave(self, event):
distributor.distribute_new_job(task['id'])

def schedule_boot_jobs(self):
if not self.boot_jobs_scheduled:
try:
for job in SCHEDULER_BOOT_JOBS:
job_class = importutils.import_class(job)
job_instance = job_class(self.ctx)

# Create a jobs for periodic scheduling
job_id = uuidutils.generate_uuid()
self.scheduler.add_job(job_instance, 'interval',
seconds=job_class.job_interval(),
next_run_time=datetime.now(),
id=job_id)
# book keeping of jobs
self.boot_jobs[job_id] = job_instance

except Exception as e:
# TODO: Currently failure of scheduler is failing task manager
# start flow, it is logged and ignored.
LOG.error("Failed to initialize periodic tasks, reason: %s.",
six.text_type(e))
raise e
# Recover the job in db
self.recover_job()
# Start the consumer of job creation message
Expand All @@ -134,18 +104,16 @@ def schedule_boot_jobs(self):
partitioner.start()
partitioner.register_watcher_func(self.on_node_join,
self.on_node_leave)
self.watch_job_id = uuidutils.generate_uuid()
self.scheduler.add_job(partitioner.watch_group_change, 'interval',
seconds=self.GROUP_CHANGE_DETECT_INTERVAL_SEC,
next_run_time=datetime.now())
next_run_time=datetime.now(),
id=self.watch_job_id)

def stop(self):
"""Cleanup periodic jobs"""

for job_id, job in self.boot_jobs.items():
self.scheduler.remove_job(job_id)
job.stop()
self.boot_jobs.clear()
self.boot_jobs_scheduled = False
if self.watch_job_id:
self.scheduler.remove_job(self.watch_job_id)

def get_scheduler(self):
return self.scheduler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from delfin import exception
from delfin.common.constants import TelemetryCollection
from delfin.db.sqlalchemy.models import FailedTask
from delfin.task_manager import rpcapi as task_rpcapi
from delfin.task_manager import metrics_rpcapi as metrics_task_rpcapi
from delfin.task_manager.scheduler import schedule_manager
from delfin.task_manager.tasks.telemetry import PerformanceCollectionTask

Expand All @@ -38,7 +38,7 @@ def __init__(self, ctx, task_id, storage_id, args, interval, executor):
self.storage_id = storage_id
self.args = args
self.interval = interval
self.task_rpcapi = task_rpcapi.TaskAPI()
self.metric_task_rpcapi = metrics_task_rpcapi.TaskAPI()
self.executor = executor
self.scheduler = schedule_manager.SchedulerManager().get_scheduler()

Expand Down Expand Up @@ -108,3 +108,6 @@ def _handle_task_failure(self, start_time, end_time):
FailedTask.retry_count.name: 0,
FailedTask.executor.name: self.executor}
db.failed_task_create(self.ctx, failed_task)
self.metric_task_rpcapi.assign_failed_job(self.ctx,
failed_task['task_id'],
failed_task['executor'])

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,17 @@ def test_performance_collection_success(self, mock_collect_telemetry,
self.assertEqual(mock_collect_telemetry.call_count, 1)
self.assertEqual(mock_task_update.call_count, 1)

@mock.patch('delfin.db.task_update')
@mock.patch('delfin.task_manager.metrics_rpcapi.TaskAPI.assign_failed_job')
@mock.patch.object(db, 'task_get',
mock.Mock(return_value=fake_telemetry_job))
@mock.patch('delfin.db.failed_task_create')
@mock.patch('delfin.task_manager.tasks.telemetry'
'.PerformanceCollectionTask.collect')
def test_performance_collection_failure(self, mock_collect_telemetry,
mock_failed_task_create):
mock_failed_task_create,
mock_assign_failed_job,
mock_task_update):
mock_collect_telemetry.return_value = TelemetryTaskStatus. \
TASK_EXEC_STATUS_FAILURE
ctx = context.get_admin_context()
Expand All @@ -92,6 +96,8 @@ def test_performance_collection_failure(self, mock_collect_telemetry,

# Verify that failed task create is called if collect telemetry fails
self.assertEqual(mock_failed_task_create.call_count, 1)
self.assertEqual(mock_assign_failed_job.call_count, 1)
self.assertEqual(mock_task_update.call_count, 1)

@mock.patch.object(db, 'task_get',
mock.Mock(return_value=fake_deleted_telemetry_job))
Expand Down

0 comments on commit 6a4cc41

Please sign in to comment.