diff --git a/delfin/coordination.py b/delfin/coordination.py index 0b4756d2e..b45ba834f 100644 --- a/delfin/coordination.py +++ b/delfin/coordination.py @@ -15,12 +15,14 @@ import inspect import decorator +import tooz from oslo_config import cfg from oslo_log import log from oslo_utils import uuidutils import six from tooz import coordination from tooz import locking +from tooz import partitioner from delfin import cryptor from delfin import exception @@ -320,3 +322,31 @@ def _get_redis_backend_url(): .format(backend_type=CONF.coordination.backend_type, server=CONF.coordination.backend_server) return backend_url + + +class ConsistentHashing(Coordinator): + GROUP_NAME = 'partitioner_group' + + def __init__(self): + super(ConsistentHashing, self). \ + __init__(agent_id=CONF.host, prefix="") + + def join_group(self): + try: + self.coordinator.join_partitioned_group(self.GROUP_NAME) + except tooz.coordination.MemberAlreadyExist: + LOG.info('Member %s already in partitioner_group' % CONF.host) + + def get_task_executor(self, task_id): + part = partitioner.Partitioner(self.coordinator, self.GROUP_NAME) + members = part.members_for_object(task_id) + for member in members: + LOG.info('For task id %s, host should be %s' % (task_id, member)) + return member.decode('utf-8') + + def register_watcher_func(self, on_node_join, on_node_leave): + self.coordinator.watch_join_group(self.GROUP_NAME, on_node_join) + self.coordinator.watch_leave_group(self.GROUP_NAME, on_node_leave) + + def watch_group_change(self): + self.coordinator.run_watchers() diff --git a/delfin/leader_election/distributor/task_distributor.py b/delfin/leader_election/distributor/task_distributor.py index a2fbefc02..b6d6f02cb 100644 --- a/delfin/leader_election/distributor/task_distributor.py +++ b/delfin/leader_election/distributor/task_distributor.py @@ -19,6 +19,7 @@ from delfin import db from delfin.common.constants import TelemetryCollection +from delfin.coordination import ConsistentHashing from delfin.task_manager import metrics_rpcapi as task_rpcapi CONF = cfg.CONF @@ -47,7 +48,10 @@ def __call__(self): six.text_type(e)) def distribute_new_job(self, task_id): - executor = CONF.host + partitioner = ConsistentHashing() + partitioner.start() + executor = partitioner.get_task_executor(task_id) + partitioner.stop() try: db.task_update(self.ctx, task_id, {'executor': executor}) LOG.info('Distribute a new job, id: %s' % task_id) diff --git a/delfin/task_manager/metrics_manager.py b/delfin/task_manager/metrics_manager.py index 746966408..5ef306db4 100644 --- a/delfin/task_manager/metrics_manager.py +++ b/delfin/task_manager/metrics_manager.py @@ -17,6 +17,7 @@ from oslo_log import log from delfin import manager +from delfin.coordination import ConsistentHashing from delfin.task_manager.scheduler import schedule_manager from delfin.task_manager.scheduler.schedulers.telemetry.job_handler \ import FailedJobHandler @@ -38,6 +39,9 @@ def __init__(self, service_name=None, *args, **kwargs): scheduler = schedule_manager.SchedulerManager() scheduler.start() JobHandler.schedule_boot_jobs() + partitioner = ConsistentHashing() + partitioner.start() + partitioner.join_group() def assign_job(self, context, task_id): instance = JobHandler.get_instance(context, task_id) diff --git a/delfin/task_manager/scheduler/schedule_manager.py b/delfin/task_manager/scheduler/schedule_manager.py index e465e9d08..fc19e6822 100644 --- a/delfin/task_manager/scheduler/schedule_manager.py +++ b/delfin/task_manager/scheduler/schedule_manager.py @@ -21,12 +21,15 @@ from oslo_utils import uuidutils from delfin import context +from delfin import db from delfin import service from delfin import utils -from delfin.leader_election.distributor.failed_task_distributor\ +from delfin.coordination import ConsistentHashing +from delfin.leader_election.distributor.failed_task_distributor \ import FailedTaskDistributor from delfin.leader_election.distributor.task_distributor \ import TaskDistributor +from delfin.task_manager import metrics_rpcapi as task_rpcapi LOG = log.getLogger(__name__) @@ -38,6 +41,9 @@ @six.add_metaclass(utils.Singleton) class SchedulerManager(object): + + GROUP_CHANGE_DETECT_INTERVAL_SEC = 30 + def __init__(self, scheduler=None): if not scheduler: scheduler = BackgroundScheduler() @@ -47,6 +53,7 @@ def __init__(self, scheduler=None): self.boot_jobs = dict() self.boot_jobs_scheduled = False self.ctx = context.get_admin_context() + self.task_rpcapi = task_rpcapi.TaskAPI() def start(self): """ Initialise the schedulers for periodic job creation @@ -55,6 +62,40 @@ def start(self): self.scheduler.start() self.scheduler_started = True + def on_node_join(self, event): + # A new node joined the group, all the job would be re-distributed. + # If the job is already on the node, it would be ignore and would + # not be scheduled again + LOG.info('Member %s joined the group %s' % (event.member_id, + event.group_id)) + # Get all the jobs + tasks = db.task_get_all(self.ctx) + distributor = TaskDistributor(self.ctx) + partitioner = ConsistentHashing() + partitioner.start() + for task in tasks: + # Get the specific executor + origin_executor = task['executor'] + # If the target executor is different from current executor, + # remove the job from old executor and add it to new executor + new_executor = partitioner.get_task_executor(task['id']) + if new_executor != origin_executor: + LOG.info('Re-distribute job %s from %s to %s' % + (task['id'], origin_executor, new_executor)) + self.task_rpcapi.remove_job(self.ctx, task['id'], + task['executor']) + distributor.distribute_new_job(task['id']) + partitioner.stop() + + def on_node_leave(self, event): + LOG.info('Member %s left the group %s' % (event.member_id, + event.group_id)) + filters = {'executor': event.member_id.decode('utf-8')} + re_distribute_tasks = db.task_get_all(self.ctx, filters=filters) + distributor = TaskDistributor(self.ctx) + for task in re_distribute_tasks: + distributor.distribute_new_job(task['id']) + def schedule_boot_jobs(self): if not self.boot_jobs_scheduled: try: @@ -90,6 +131,13 @@ def schedule_boot_jobs(self): 'PerfJobManager', coordination=True) service.serve(job_generator) + partitioner = ConsistentHashing() + partitioner.start() + partitioner.register_watcher_func(self.on_node_join, + self.on_node_leave) + self.scheduler.add_job(partitioner.watch_group_change, 'interval', + seconds=self.GROUP_CHANGE_DETECT_INTERVAL_SEC, + next_run_time=datetime.now()) def stop(self): """Cleanup periodic jobs""" @@ -104,5 +152,7 @@ def get_scheduler(self): return self.scheduler def recover_job(self): - # TODO: would be implement when implement the consistent hashing - pass + all_tasks = db.task_get_all(self.ctx) + distributor = TaskDistributor(self.ctx) + for task in all_tasks: + distributor.distribute_new_job(task['id']) diff --git a/delfin/task_manager/scheduler/schedulers/telemetry/job_handler.py b/delfin/task_manager/scheduler/schedulers/telemetry/job_handler.py index 55e75b93d..f54979abf 100644 --- a/delfin/task_manager/scheduler/schedulers/telemetry/job_handler.py +++ b/delfin/task_manager/scheduler/schedulers/telemetry/job_handler.py @@ -129,8 +129,6 @@ def remove_job(self, task_id): job = db.task_get(self.ctx, task_id) job_id = job['job_id'] self.remove_scheduled_job(job_id) - db.task_delete(self.ctx, job['id']) - LOG.info("Removed job %s ", job['id']) except Exception as e: LOG.error("Failed to remove periodic scheduling job , reason: %s.", six.text_type(e)) diff --git a/delfin/tests/unit/leader_election/distributor/test_task_distributor.py b/delfin/tests/unit/leader_election/distributor/test_task_distributor.py index aaa754615..a6cbf83e5 100644 --- a/delfin/tests/unit/leader_election/distributor/test_task_distributor.py +++ b/delfin/tests/unit/leader_election/distributor/test_task_distributor.py @@ -40,12 +40,21 @@ class TestTaskDistributor(test.TestCase): + @mock.patch('delfin.coordination.ConsistentHashing.stop') + @mock.patch('delfin.coordination.ConsistentHashing.get_task_executor') + @mock.patch('delfin.coordination.ConsistentHashing.start') + @mock.patch('delfin.task_manager.metrics_rpcapi.TaskAPI.assign_job') @mock.patch.object(db, 'task_update') - @mock.patch( - 'delfin.task_manager.metrics_rpcapi.TaskAPI.assign_job') - def test_distribute_new_job(self, mock_task_update, mock_assign_job): + @mock.patch('delfin.coordination.ConsistentHashing.__init__', + mock.Mock(return_value=None)) + def test_distribute_new_job(self, mock_task_update, mock_assign_job, + mock_partitioner_start, + mock_get_task_executor, mock_partitioner_stop): ctx = context.get_admin_context() task_distributor = TaskDistributor(ctx) task_distributor.distribute_new_job('fake_task_id') self.assertEqual(mock_assign_job.call_count, 1) self.assertEqual(mock_task_update.call_count, 1) + self.assertEqual(mock_partitioner_start.call_count, 1) + self.assertEqual(mock_get_task_executor.call_count, 1) + self.assertEqual(mock_partitioner_stop.call_count, 1) diff --git a/delfin/tests/unit/task_manager/scheduler/test_scheduler.py b/delfin/tests/unit/task_manager/scheduler/test_scheduler.py index 0460e67d9..4088b4ee2 100644 --- a/delfin/tests/unit/task_manager/scheduler/test_scheduler.py +++ b/delfin/tests/unit/task_manager/scheduler/test_scheduler.py @@ -12,11 +12,33 @@ # See the License for the specific language governing permissions and # limitations under the License. +from unittest import mock + from apscheduler.schedulers.background import BackgroundScheduler +from delfin import db from delfin import test +from delfin.coordination import ConsistentHashing +from delfin.leader_election.distributor.task_distributor \ + import TaskDistributor +from delfin.task_manager.metrics_rpcapi import TaskAPI from delfin.task_manager.scheduler import schedule_manager +FAKE_TASKS = [ + { + 'id': 1, + 'executor': 'node1' + }, + { + 'id': 2, + 'executor': 'node2' + }, + { + 'id': 3, + 'executor': 'node1' + } +] + class TestScheduler(test.TestCase): @@ -28,3 +50,56 @@ def test_scheduler_manager_singleton(self): self.assertIsInstance(second_instance, BackgroundScheduler) self.assertEqual(first_instance, second_instance) + + @mock.patch.object(BackgroundScheduler, 'start') + def test_start(self, mock_scheduler_start): + manager = schedule_manager.SchedulerManager() + manager.start() + self.assertEqual(mock_scheduler_start.call_count, 1) + manager.start() + self.assertEqual(mock_scheduler_start.call_count, 1) + + @mock.patch('tooz.coordination.get_coordinator', mock.Mock()) + @mock.patch.object(ConsistentHashing, 'get_task_executor') + @mock.patch.object(TaskAPI, 'remove_job') + @mock.patch.object(TaskDistributor, 'distribute_new_job') + @mock.patch.object(db, 'task_get_all') + def test_on_node_join(self, mock_task_get_all, mock_distribute_new_job, + mock_remove_job, mock_get_task_executor): + node1_job_count = 0 + node2_job_count = 0 + for job in FAKE_TASKS: + if job['executor'] == 'node1': + node1_job_count += 1 + elif job['executor'] == 'node2': + node2_job_count += 1 + mock_task_get_all.return_value = FAKE_TASKS + mock_get_task_executor.return_value = 'node1' + manager = schedule_manager.SchedulerManager() + manager.on_node_join(mock.Mock(member_id=b'fake_member_id', + group_id='node1')) + self.assertEqual(mock_task_get_all.call_count, 1) + self.assertEqual(mock_distribute_new_job.call_count, + node1_job_count + node2_job_count) + self.assertEqual(mock_remove_job.call_count, node2_job_count) + self.assertEqual(mock_get_task_executor.call_count, + node1_job_count + node2_job_count) + + @mock.patch.object(TaskDistributor, 'distribute_new_job') + @mock.patch.object(db, 'task_get_all') + def test_on_node_leave(self, mock_task_get_all, mock_distribute_new_job): + mock_task_get_all.return_value = FAKE_TASKS + manager = schedule_manager.SchedulerManager() + manager.on_node_leave(mock.Mock(member_id=b'fake_member_id', + group_id='fake_group_id')) + self.assertEqual(mock_task_get_all.call_count, 1) + self.assertEqual(mock_distribute_new_job.call_count, len(FAKE_TASKS)) + + @mock.patch.object(TaskDistributor, 'distribute_new_job') + @mock.patch.object(db, 'task_get_all') + def test_recover_job(self, mock_task_get_all, mock_distribute_new_job): + mock_task_get_all.return_value = FAKE_TASKS + manager = schedule_manager.SchedulerManager() + manager.recover_job() + self.assertEqual(mock_task_get_all.call_count, 1) + self.assertEqual(mock_distribute_new_job.call_count, len(FAKE_TASKS)) diff --git a/delfin/tests/unit/test_coordination.py b/delfin/tests/unit/test_coordination.py index 9f37896bf..6ae27cdce 100644 --- a/delfin/tests/unit/test_coordination.py +++ b/delfin/tests/unit/test_coordination.py @@ -118,3 +118,33 @@ def func(foo, bar): bar.__getitem__.return_value = 8 func(foo, bar) get_lock.assert_called_with('lock-func-7-8') + + +class ConsistentHashingTestCase(test.TestCase): + + def setUp(self): + super(ConsistentHashingTestCase, self).setUp() + self.get_coordinator = self.mock_object(tooz_coordination, + 'get_coordinator') + + def test_join_group(self): + crd = self.get_coordinator.return_value + part = coordination.ConsistentHashing() + part.start() + part.join_group() + self.assertTrue(crd.join_partitioned_group.called) + + def test_register_watcher_func(self): + crd = self.get_coordinator.return_value + part = coordination.ConsistentHashing() + part.start() + part.register_watcher_func(mock.Mock(), mock.Mock()) + self.assertTrue(crd.watch_join_group.called) + self.assertTrue(crd.watch_leave_group.called) + + def test_watch_group_change(self): + crd = self.get_coordinator.return_value + part = coordination.ConsistentHashing() + part.start() + part.watch_group_change() + self.assertTrue(crd.run_watchers.called) diff --git a/requirements.txt b/requirements.txt index 167f80b36..252462f5a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -40,3 +40,4 @@ APScheduler~=3.6.3 flask kafka-python importlib-metadata==3.7.0; python_version < "3.8" +tenacity==6.3.1